GitHub - oagudo/outbox: Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker. (original) (raw)

Release Software License GitHub Actions codecov Go Report Card Go Reference

Lightweight library for the transactional outbox pattern in Go, not tied to any specific relational database or broker.

Key Features

Usage

The library consists of two main components:

  1. Writer: Stores your entity and corresponding message atomically within a transaction
  2. Reader: Publishes stored messages to your message broker in the background

The Writer

The Writer ensures your entity and outbox message are stored together atomically:

// Setup database connection db, _ := sql.Open("pgx", "postgres://user:password@localhost:5432/outbox?sslmode=disable")

// Create a DBContext and Writer instance dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres) writer := outbox.NewWriter(dbCtx)

// In your business logic: // // Create your entity and outbox message entity := Entity{ ID: uuid.New(), CreatedAt: time.Now().UTC(), }

payload, _ := json.Marshal(entity) metadata := json.RawMessage({"trace_id":"abc123","correlation_id":"xyz789"}) msg := outbox.NewMessage(payload, outbox.WithCreatedAt(entity.CreatedAt), outbox.WithMetadata(metadata))

// Write message and entity in a single transaction err = writer.Write(ctx, msg, // This user-defined callback executes queries within the // same transaction that stores the outbox message func(ctx context.Context, txQueryer outbox.TxQueryer) error { _, err := txQueryer.ExecContext(ctx, "INSERT INTO entity (id, created_at) VALUES ($1, $2)", entity.ID, entity.CreatedAt, ) return err })

🚀 Optimistic Publishing (Optional)

Optimistic publishing attempts to publish messages immediately after transaction commit, reducing latency while maintaining guaranteed delivery through the background reader as fallback.

How It Works

  1. Transaction commits (entity + outbox message stored)
  2. Immediate publish attempt to broker (asynchronously, will not block the incoming request)
  3. On success: message is removed from outbox
  4. On failure: background reader handles delivery later

Configuration

// Create publisher (see Reader section below) publisher := &messagePublisher{}

// Enable optimistic publishing in writer writer := outbox.NewWriter(dbCtx, outbox.WithOptimisticPublisher(publisher))

Important considerations:

The Reader

The Reader periodically checks for unsent messages and publishes them to your message broker:

// Create a message publisher implementation type messagePublisher struct { // Your message broker client (e.g., Kafka, RabbitMQ) } func (p *messagePublisher) Publish(ctx context.Context, msg *outbox.Message) error { // Publish the message to your broker. See examples below for specific implementations return nil }

// Create and start the reader reader := outbox.NewReader( dbCtx, // Database context &messagePublisher{}, // Publisher implementation outbox.WithInterval(5time.Second), // Polling interval (default: 10s) outbox.WithReadBatchSize(200), // Read batch size (default: 100) outbox.WithDeleteBatchSize(50), // Delete batch size (default: 20) outbox.WithMaxAttempts(300), // Discard after 300 attempts (default: MaxInt32) outbox.WithExponentialDelay( // Delay between attempts (default: Exponential; can also use Fixed or Custom) 500time.Millisecond, // Initial delay (default: 200ms) 30*time.Minute), // Maximum delay (default: 1h) ) reader.Start() defer reader.Stop(context.Background()) // Stop during application shutdown

// Monitor standard processing errors (publish / update / delete / read). go func() { for err := range reader.Errors() { switch e := err.(type) { case *outbox.PublishError: log.Printf("Failed to publish message | ID: %s | Error: %v", e.Message.ID, e.Err)

    case *outbox.UpdateError:
        log.Printf("Failed to update message | ID: %s | Error: %v",
            e.Message.ID, e.Err)

    case *outbox.DeleteError:
        log.Printf("Batch message deletion failed | Count: %d | Error: %v",
            len(e.Messages), e.Err)
        for _, msg := range e.Messages {
            log.Printf("Failed to delete message | ID: %s", msg.ID)
        }

    case *outbox.ReadError:
        log.Printf("Failed to read outbox messages | Error: %v", e.Err)

    default:
        log.Printf("Unexpected error occurred | Error: %v", e)
    }
}

}()

// Monitor discarded messages (hit the max-attempts threshold). go func() { for msg := range reader.DiscardedMessages() { log.Printf("outbox message %s discarded after %d attempts", msg.ID, msg.TimesAttempted) // Example next steps: // • forward to a dead-letter topic // • raise an alert / metric // • persist for manual inspection } }()

Database Setup

1. Choose Your Database Dialect

The library supports multiple relational databases. Configure the appropriate SQLDialect when creating the DBContext. Supported dialects are PostgreSQL, MySQL, MariaDB, SQLite, Oracle and SQL Server.

// Example creating a DBContext with MySQL dialect dbCtx := outbox.NewDBContext(db, outbox.SQLDialectMySQL)

2. Create the Outbox Table

The outbox table stores messages that need to be published to your message broker. Choose your database below:

🐘 PostgreSQL

CREATE TABLE IF NOT EXISTS outbox ( id UUID PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL, scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL, metadata BYTEA, payload BYTEA NOT NULL, times_attempted INTEGER NOT NULL );

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at); CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);

📊 MySQL

CREATE TABLE IF NOT EXISTS outbox ( id BINARY(16) PRIMARY KEY, created_at TIMESTAMP(3) NOT NULL, scheduled_at TIMESTAMP(3) NOT NULL, metadata BLOB, payload BLOB NOT NULL, times_attempted INT NOT NULL );

CREATE INDEX idx_outbox_created_at ON outbox (created_at); CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

🐬 MariaDB

CREATE TABLE IF NOT EXISTS outbox ( id UUID PRIMARY KEY, created_at TIMESTAMP(3) NOT NULL, scheduled_at TIMESTAMP(3) NOT NULL, metadata BLOB, payload BLOB NOT NULL, times_attempted INT NOT NULL );

CREATE INDEX idx_outbox_created_at ON outbox (created_at); CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

🗃️ SQLite

CREATE TABLE IF NOT EXISTS outbox ( id TEXT PRIMARY KEY, created_at DATETIME NOT NULL, scheduled_at DATETIME NOT NULL, metadata BLOB, payload BLOB NOT NULL, times_attempted INTEGER NOT NULL );

CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at); CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);

🏛️ Oracle

CREATE TABLE outbox ( id RAW(16) PRIMARY KEY, created_at TIMESTAMP WITH TIME ZONE NOT NULL, scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL, metadata BLOB, payload BLOB NOT NULL, times_attempted NUMBER(10) NOT NULL );

CREATE INDEX idx_outbox_created_at ON outbox (created_at); CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

🪟 SQL Server

CREATE TABLE outbox ( id UNIQUEIDENTIFIER PRIMARY KEY, created_at DATETIMEOFFSET(3) NOT NULL, scheduled_at DATETIMEOFFSET(3) NOT NULL, metadata VARBINARY(MAX), payload VARBINARY(MAX) NOT NULL, times_attempted INT NOT NULL );

CREATE INDEX idx_outbox_created_at ON outbox (created_at); CREATE INDEX idx_outbox_scheduled_at ON outbox (scheduled_at);

Examples

Complete working examples for different databases and message brokers:

To run an example:

cd examples/postgres-kafka # or examples/oracle-nats or examples/mysql-rabitmq ../../scripts/up-and-wait.sh go run service.go

In another terminal trigger a POST to trigger entity creation

curl -X POST http://localhost:8080/entity

FAQ

What happens when multiple instances of my service use the library?

When running multiple instances of your service, each with its own reader, be aware that:

The optimistic publisher feature can significantly reduce the number of duplicates. With optimistic publisher messages are delivered as soon as they are committed, so readers will usually see no messages in the outbox table.

Also note that even in single instance deployments, message duplicates can still occur (e.g. if the service crashes right after successfully publishing to the broker). However, these duplicates are less frequent than when you are running multiple reader instances.

How to instantiate a DBContext when using pgxpool ?

You can use stdlib.OpenDBFromPool function to get a *sql.DB from a *pgxpool.Pool.

import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "github.com/oagudo/outbox" )

// ... pool, _ := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) db := stdlib.OpenDBFromPool(pool) dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.