GitHub - jet/FsKafka: Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x (original) (raw)
F# friendly wrapper for Confluent.Kafka
, with minimal dependencies or additional abstractions (but see related repos).
FsKafka
wraps Confluent.Kafka
to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends onConfluent.Kafka [1.9.2],librdkafka.redist [1.9.2] (pinned to ensure we use a tested pairing),Serilog
(but no specific Serilog sinks, i.e. you configure to emit to NLog
etc) andNewtonsoft.Json
(used internally to parse Broker-provided Statistics for logging purposes).
Usage
FsKafka is delivered as a Nuget package targeting netstandard2.0
and F# >= 4.5.
dotnet add package FsKafka
or for paket
, use:
Related repos
- See the Propulsion repo for extended Producers and Consumers.
- See the Jet dotnet new templates repo's
proProjector
template (in-k
mode) for example producer logic using theBatchedProducer
and theproConsumer
template for examples of using theBatchedConsumer
fromFsKafka
, alongside the extended modes inPropulsion
. - See the Equinox QuickStart for examples of using this library to project to Kafka from
Equinox.Cosmos
and/orEquinox.EventStore
.
CONTRIBUTING
Contributions of all sizes are warmly welcomed. See our contribution guide
TEMPLATES
The best place to start, sample-wise is from the dotnet new
templates stored in a dedicated repo.
BUILDING
The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on a TEST_KAFKA_BROKER
environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
build, including tests on netcoreapp3.1
export TEST_KAFKA_BROKER=":9092" dotnet build build.proj -v n
FAQ
How do I get rid of all ~~~~the breaking off polling
... resuming polling
spam?
- The
BatchedConsumer
implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter
, and as such can be silenced by including the following in one'sLoggerConfiguration()
:.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
What is this, why does it exist, where did it come from, is anyone using it ?
This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.
Equinox places some key constraints on all components and dependencies:-
- batteries-included examples of end-to-end functionality within the Equinox remit; samples should have clean consistent wiring
- pick a well-established base library, try not to add new concepts
- low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
- aim to add any resilience features as patches to upstream repos
- thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library
Minimal producer example
#r "nuget:FsKafka" open Confluent.Kafka open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.) let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching) let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
let key = Guid.NewGuid().ToString() let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
Minimal batched consumer example
#r "nuget:FsKafka" open Confluent.Kafka open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async { for m in messages do printfn "Received: %s" m.Message.Value }
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async { use consumer = BatchedConsumer.Start(log, cfg, handler) return! consumer.AwaitShutdown() } |> Async.RunSynchronously
Minimal batched consumer example with monitor
#r "nuget:FsKafka" open Confluent.Kafka open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async { for m in messages do printfn "Received: %s" m.Message.Value }
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async { use consumer = BatchedConsumer.Start(log, cfg, handler) use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId) return! consumer.AwaitShutdown() } |> Async.RunSynchronously
Running (and awaiting) a pair of consumers until either throws
#r "nuget:FsKafka" open Confluent.Kafka open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async { for m in messages do printfn "Received: %s" m.Message.Value }
let config topic = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", [topic], "MyGroupId", AutoOffsetReset.Earliest)
let cfg1, cfg2 = config "MyTopicA", config "MyTopicB"
async { use consumer1 = BatchedConsumer.Start(log, cfg1, handler) use consumer2 = BatchedConsumer.Start(log, cfg2, handler) use _ = KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId) use _ = KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId) return! Async.Parallel [consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()] } |> Async.RunSynchronously