GitHub - Protocol-Lattice/GoEventBus: A lock-free, ultra-fast event bus for Go (original) (raw)

GoEventBus

A blazing‑fast, in‑memory, lock‑free event bus for Go—ideal for low‑latency pipelines, microservices, and game loops.

Go Report Card

📚 Table of Contents

Features

Why GoEventBus?

Modern Go apps demand lightning‑fast, non‑blocking communication—but channels can bottleneck and external brokers add latency, complexity and ops overhead. GoEventBus is your in‑process, lock‑free solution:

Whether you’re building real‑time analytics, high‑throughput microservices, or game engines, GoEventBus keeps your events moving at Go‑speed.

Installation

go get github.com/Protocol-Lattice/GoEventBus

Quick Start

package main

import ( "context" "fmt" "log"

"github.com/Protocol-Lattice/GoEventBus"

)

// Define a typed projection as a struct type HouseWasSold struct{}

func main() { // Create a dispatcher mapping projections (string or struct) to handlers dispatcher := GoEventBus.Dispatcher{ "user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { userID := args["id"].(string) fmt.Println("User created with ID:", userID) return GoEventBus.Result{Message: "handled user_created"}, nil }, HouseWasSold{}: func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) { address := args["address"].(string) price := args["price"].(int) fmt.Printf("House sold at %s for $%d\n", address, price) return GoEventBus.Result{Message: "handled HouseWasSold"}, nil }, }

// Initialise an EventStore with a 64K ring buffer and DropOldest overrun policy
store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest)

// Enable asynchronous processing
store.Async = true

// Enqueue a string-based event
_ = store.Subscribe(context.Background(), GoEventBus.Event{
    ID:         "evt1",
    Projection: "user_created",
    Args:       map[string]any{"id": "12345"},
})

// Enqueue a struct-based event
_ = store.Subscribe(context.Background(), GoEventBus.Event{
    ID:         "evt2",
    Projection: HouseWasSold{},
    Args:       map[string]any{"address": "123 Main St", "price": 500000},
})

// Process pending events
store.Publish()

// Wait for all async handlers to finish
if err := store.Drain(context.Background()); err != nil {
    log.Fatalf("Failed to drain EventStore: %v", err)
}

// Retrieve metrics
published, processed, errors := store.Metrics()
fmt.Printf("published=%d processed=%d errors=%d\n", published, processed, errors)

}

Transactions

GoEventBus now supports atomic transactions, allowing you to group multiple events and commit them together. This ensures that either all events are successfully published and handled, or none are.

package main

import ( "context" "log"

"github.com/Protocol-Lattice/GoEventBus"

)

func main() { // Begin a new transaction on the existing EventStore

// Create a dispatcher mapping projections to handlers
dispatcher := GoEventBus.Dispatcher{
    "user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) {
        return GoEventBus.Result{Message: "handled user_created"}, nil
    },
    "send_email": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) {
        log.Println("Hello")
        return GoEventBus.Result{Message: "handled send_email"}, nil
    },
}

// Initialise an EventStore with a 64K ring buffer and DropOldest policy
store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest)
tx := store.BeginTransaction()

// Buffer multiple events
tx.Publish(GoEventBus.Event{
    ID:         "evtA",
    Projection: "user_created",
    Args:       map[string]any{"id": "12345"},
})
tx.Publish(GoEventBus.Event{
    ID:         "evtB",
    Projection: "send_email",
    Args:       map[string]any{"template": "welcome", "userID": "12345"},
})
tx.Rollback()

// Commit the transaction atomically
if err := tx.Commit(context.Background()); err != nil {
    log.Fatalf("transaction failed: %v", err)
}

}

API Reference

type Result

type Result struct { Message string // Outcome message from handler }

type Dispatcher

type Dispatcher map[interface{}]func(context.Context, map[string]any) (Result, error)

A map from projection keys to handler functions. Handlers receive a context.Context and an argument map, and return a Result and an error.

type Event

type Event struct { ID string // Unique identifier for the event Projection interface{} // Key to look up the handler in the dispatcher Args map[string]any // Payload data for the event }

type Transaction

type Transaction struct { store *EventStore events []Event startHead uint64 // head position when transaction began

}

type OverrunPolicy

type OverrunPolicy int

const ( DropOldest OverrunPolicy = iota // Default: discard oldest events Block // Block until space is available ReturnError // Fail fast with ErrBufferFull )

type EventStore

type EventStore struct { dispatcher *Dispatcher // Pointer to the dispatcher map size uint64 // Buffer size (power of two) buf []unsafe.Pointer // Ring buffer of event pointers events []Event // Backing slice for Event data head uint64 // Atomic write index tail uint64 // Atomic read index

// Config flags
Async          bool
OverrunPolicy  OverrunPolicy

// Counters
publishedCount uint64
processedCount uint64
errorCount     uint64

}

NewEventStore

func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStore

Creates a new EventStore with the provided dispatcher, ring buffer size, and overrun policy.

Subscribe

func (es *EventStore) Subscribe(ctx context.Context, e Event) error

Atomically enqueues an Event for later publication, applying back‑pressure according to OverrunPolicy. If OverrunPolicy is ReturnError and the buffer is full, the function returns ErrBufferFull.

Publish

func (es *EventStore) Publish()

Dispatches all events from the last published position to the current head. If Async is true, handlers run in separate goroutines; otherwise they run in the caller's goroutine.

Drain

func (es *EventStore) Drain(ctx context.Context) error

Blocks until all in-flight asynchronous handlers complete, then stops the worker pool. Returns an error if the provided context.Context is canceled or its deadline is exceeded.

Close

func (es *EventStore) Close(ctx context.Context) error

Drains all pending asynchronous handlers and shuts down the EventStore. Blocks until all in-flight handlers complete or the provided context.Context is canceled. Returns an error if the context’s deadline is exceeded or it is otherwise canceled.

Metrics

func (es *EventStore) Metrics() (published, processed, errors uint64)

Returns the total count of published, processed, and errored events.

Schedule

func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.Timer

Schedules an Event to be subscribed and published at a specific time t.

ScheduleAfter

func (es *EventStore) ScheduleAfter(ctx context.Context, d time.Duration, e Event) *time.Timer

A convenience wrapper around Schedule that fires an event after a specified duration d.

Back-pressure and Overrun Policies

GoEventBus provides three strategies for handling a saturated ring buffer:

Policy Behaviour When to use
DropOldest Atomically advances the read index, discarding the oldest event to make room for the new one. Low‑latency scenarios where the newest data is most valuable and occasional loss is acceptable.
Block Causes Subscribe to block (respecting its context) until space becomes available. Workloads that must not lose events but can tolerate the latency of back‑pressure.
ReturnError Subscribe returns ErrBufferFull immediately, allowing the caller to decide what to do. Pipelines where upstream logic controls retries and failures explicitly.

DropOldest is the default behaviour and matches releases prior to April 2025.

💡 Use Cases

GoEventBus is ideal for scenarios where low‑latency, high‑throughput, and non‑blocking event dispatching is needed:

Benchmarks

All benchmarks were run with Go’s testing harness (go test -bench .) on an -8 procs configuration. Numbers below are from the April 2025 release.

Benchmark Iterations ns/op
BenchmarkSubscribe-8 27,080,376 40.37
BenchmarkSubscribeParallel-8 26,418,999 38.42
BenchmarkPublish-8 295,661,464 3.910
BenchmarkPublishAfterPrefill-8 252,943,526 4.585
BenchmarkSubscribeLargePayload-8 1,613,017 771.5
BenchmarkPublishLargePayload-8 296,434,225 3.910
BenchmarkEventStore_Async-8 2,816,988 436.5
BenchmarkEventStore_Sync-8 2,638,519 428.5
BenchmarkFastHTTPSync-8 6,275,112 163.8
BenchmarkFastHTTPAsync-8 1,954,884 662.0
BenchmarkFastHTTPParallel-8 4,489,274 262.3

Contributing

Contributions, issues, and feature requests are welcome! Feel free to check the issues page.

License

Distributed under the MIT License. See LICENSE for more information.