slogkafka package - github.com/samber/slog-kafka - Go Packages (original) (raw)

slog: Kafka handler

tag Go Version GoDoc Build Status Go report Coverage Contributors License

A Kafka Handler for slog Go library.

See also:

🚀 Install

go get github.com/samber/slog-kafka

Compatibility: go >= 1.21

No breaking changes will be made to exported APIs before v2.0.0.

💡 Usage

GoDoc: https://pkg.go.dev/github.com/samber/slog-kafka

Handler options
type Option struct {
    // log level (default: debug)
    Level     slog.Leveler

    // Kafka Writer
    KafkaWriter *kafka.Writer

    // optional: customize Kafka event builder
    Converter Converter
}
Supported attributes

The following attributes are interpreted by slogkafka.DefaultConverter:

Atribute name slog.Kind Underlying type
"user" group (see below)
"error" any error
"request" any *http.Request
other attributes *

Other attributes will be injected in extra field.

Users must be of type slog.Group. Eg:

slog.Group("user",
    slog.String("id", "user-123"),
    slog.String("username", "samber"),
    slog.Time("created_at", time.Now()),
)
Example
import (
    "context"
    "fmt"
    "time"

    slogkafka "github.com/samber/slog-kafka"
    "github.com/segmentio/kafka-go"

    "log/slog"
)

func main() {
    // docker-compose up -d

    uri := "127.0.0.1:9092"

    dialer := &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
    }

    conn, err := dialer.DialContext(context.Background(), "tcp", uri)
    if err != nil {
        panic(err)
    }

    err = conn.CreateTopics(kafka.TopicConfig{
        Topic:             "logs",
        NumPartitions:     12,
        ReplicationFactor: 1,
    })
    if err != nil {
        panic(err)
    }

    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{uri},
        Topic:   "logs",
        Dialer:  dialer,

        Async:       true,
        Balancer:    &kafka.Hash{},
        MaxAttempts: 3,

        Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
            fmt.Printf(msg+"\n", args...)
        }),
        ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
            fmt.Printf(msg+"\n", args...)
        }),
    })

    defer writer.Close()
    defer conn.Close()

    logger := slog.New(slogkafka.Option{Level: slog.LevelDebug, KafkaWriter: writer}.NewKafkaHandler())
    logger = logger.With("release", "v1.0.0")

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("error", fmt.Errorf("an error")).
        Error("a message")
}

Kafka message:

{
  "level": "ERROR",
    "logger": "samber/slog-kafka",
    "message": "a message",
    "timestamp": "2023-04-30T01:33:21.676768Z",
    "error": {
        "error": "an error",
        "kind": "*errors.errorString",
        "stack": null
    },
    "extra": {
        "release": "v1.0.0"
    },
    "user": {
        "created_at": "2023-04-30T01:33:21.676704Z",
        "id": "user-123"
    }
}

🤝 Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test

👤 Contributors

Contributors

💫 Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

📝 License

Copyright © 2023 Samuel Berthe.

This project is MIT licensed.