Getting started (original) (raw)

Getting started

Get started by connecting your application to KurrentDB.

Connecting to KurrentDB

For your application to start communicating with KurrentDB, you need to instantiate the client and configure it accordingly. Below are instructions for supported SDKs.

Insecure clusters

All our GRPC clients are secure by default and must be configured to connect to an insecure server via a connection string or the client's configuration.

Required packages

Install the client SDK package to your project.

Python

Install the kurrentdbclient package from PyPI or use Poetry:

pip

pip install kurrentdbclient

Poetry

poetry add kurrentdbclient

NodeJS

Install the @kurrent/kurrentdb-client package using NPM, Yarn or PNPM:

npm

npm install --save @kurrent/kurrentdb-client

yarn

yarn add @kurrent/kurrentdb-client

pnpm

pnpm add @kurrent/kurrentdb-client

TypeScript Declarations are included in the package.

Java

Add the kurrentdb-client dependency to your Maven or Gradle project.

Maven

<dependency>
  <groupId>io.kurrent</groupId>
  <artifactId>kurrentdb-client</artifactId>
  <version>1.0.0</version>
</dependency>

Gradle

implementation 'io.kurrent:kurrentdb-client:1.0.0'

For the most recent version of the KurrentDB client package, see Maven Central.

.NET

Add the KurrentDB.Client package to your project:

dotnet add package KurrentDB.Client

Go

Install the kurrentdb package using Go modules:

go get github.com/kurrent-io/KurrentDB-Client-Go/kurrentdb

Rust

No additional configuration is needed having Rust installed. Go check https://rustup.rs.

Connection string

Each SDK has its own way of configuring the client, but the connection string can always be used. The KurrentDB connection string supports two schemas: kurrentdb:// for connecting to a single-node server, and kurrentdb+discover:// for connecting to a multi-node cluster. The difference between the two schemas is that when using kurrentdb://, the client will connect directly to the node; with kurrentdb+discover:// schema the client will use the gossip protocol to retrieve the cluster information and choose the right node to connect to. Since version 22.10, ESDB supports gossip on single-node deployments, so kurrentdb+discover:// schema can be used for connecting to any topology.

The connection string has the following format:

There, cluster.dns.name is the name of a DNS A record that points to all the cluster nodes. Alternatively, you can list cluster nodes separated by comma instead of the cluster DNS name:

kurrentdb+discover://admin:[email protected]:2113,node2.dns.name:2113,node3.dns.name:2113

There are a number of query parameters that can be used in the connection string to instruct the cluster how and where the connection should be established. All query parameters are optional.

Parameter Accepted values Default Description
tls true, false true Use secure connection, set to false when connecting to a non-secure server or cluster.
connectionName Any string None Connection name
maxDiscoverAttempts Number 10 Number of attempts to discover the cluster.
discoveryInterval Number 100 Cluster discovery polling interval in milliseconds.
gossipTimeout Number 5 Gossip timeout in seconds, when the gossip call times out, it will be retried.
nodePreference leader, follower, random, readOnlyReplica leader Preferred node role. When creating a client for write operations, always use leader.
tlsVerifyCert true, false true In secure mode, set to true when using an untrusted connection to the node if you don't have the CA file available. Don't use in production.
tlsCaFile String, file path None Path to the CA file when connecting to a secure cluster with a certificate that's not signed by a trusted CA.
defaultDeadline Number None Default timeout for client operations, in milliseconds. Most clients allow overriding the deadline per operation.
keepAliveInterval Number 10 Interval between keep-alive ping calls, in seconds.
keepAliveTimeout Number 10 Keep-alive ping call timeout, in seconds.
userCertFile String, file path None User certificate file for X.509 authentication.
userKeyFile String, file path None Key file for the user certificate used for X.509 authentication.

When connecting to an insecure instance, specify tls=false parameter. For example, for a node running locally use kurrentdb://localhost:2113?tls=false. Note that usernames and passwords aren't provided there because insecure deployments don't support authentication and authorisation.

Creating a client

First, create a client and get it connected to the database.

Python

client = KurrentDBClient(
    uri=connection_string
)

JavaScript

const client = KurrentDBClient.connectionString`{connectionString}`;

TypeScript

const client = KurrentDBClient.connectionString`{connectionString}`;

Java

KurrentDBClientSettings settings = KurrentDBConnectionString.parseOrThrow("{connectionString}");
KurrentDBClient client = KurrentDBClient.create(settings);

C#

const string connectionString = "kurrentdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";

var settings = KurrentDBClientSettings.Create(connectionString);

var client = new KurrentDBClient(settings);

Go

settings, err := kurrentdb.ParseConnectionString("{connectionString}")

if err != nil {
    panic(err)
}

db, err := kurrentdb.NewClient(settings)

Rust

let settings = "{connectionString}".parse()?;
let client = Client::new(settings)?;

The client instance can be used as a singleton across the whole application. It doesn't need to open or close the connection.

Creating an event

You can write anything to KurrentDB as events. The client needs a byte array as the event payload. Normally, you'd use a serialized object, and it's up to you to choose the serialization method.

Server-side projections

User-defined server-side projections require events to be serialized in JSON format.

We use JSON for serialization in the documentation examples.

The code snippet below creates an event object instance, serializes it, and adds it as a payload to the EventData structure, which the client can then write to the database.

Python

new_event = NewEvent(
    id=uuid4(),
    type="TestEvent",
    data=b"I wrote my first event",
)

JavaScript

const event = jsonEvent({
  type: "TestEvent",
  data: {
    entityId: uuid(),
    importantData: "I wrote my first event!",
  },
});

TypeScript

type TestEvent = JSONEventType<
  "TestEvent",
  {
    entityId: string;
    importantData: string;
  }
>;

const event = jsonEvent<TestEvent>({
  type: "TestEvent",
  data: {
    entityId: uuid(),
    importantData: "I wrote my first event!",
  },
});

Java

TestEvent event = new TestEvent();
JsonMapper jsonMapper = new JsonMapper();
event.setId(UUID.randomUUID().toString());
event.setImportantData("I wrote my first event!");

EventData eventData = EventData
        .builderAsJson("TestEvent", jsonMapper.writeValueAsBytes(event))
        .build();

C#

var evt = new TestEvent {
    EntityId      = Guid.NewGuid().ToString("N"),
    ImportantData = "I wrote my first event!"
};

var eventData = new EventData(
    Uuid.NewUuid(),
    "TestEvent",
    JsonSerializer.SerializeToUtf8Bytes(evt)
);

Go

testEvent := TestEvent{
    Id:            uuid.NewString(),
    ImportantData: "I wrote my first event!",
}

data, err := json.Marshal(testEvent)

if err != nil {
    panic(err)
}

eventData := kurrentdb.EventData{
    ContentType: kurrentdb.ContentTypeJson,
    EventType:   "TestEvent",
    Data:        data,
}

Rust

let event = TestEvent {
    id: Uuid::new_v4().to_string(),
    important_data: "I wrote my first event!".to_string(),
};

let event_data = EventData::json("TestEvent", &event)?.id(Uuid::new_v4());

Appending events

Each event in the database has its own unique identifier (UUID). The database uses it to ensure idempotent writes, but it only works if you specify the stream revision when appending events to the stream.

In the snippet below, we append the event to the stream some-stream.

Python

client.append_to_stream(
    stream_name=stream_name,
    events=[new_event],
    current_version=StreamState.ANY,
)

JavaScript

await client.appendToStream(STREAM_NAME, event);

TypeScript

await client.appendToStream(STREAM_NAME, event);

Java

client.appendToStream("some-stream", eventData)
        .get();

C#

await client.AppendToStreamAsync(
    "some-stream",
    StreamState.Any,
    new[] { eventData },
    cancellationToken: cancellationToken
);

Go

_, err = db.AppendToStream(context.Background(), "some-stream", kurrentdb.AppendToStreamOptions{}, eventData)

Rust

client
    .append_to_stream("some-stream", &Default::default(), event_data)
    .await?;

Here we are appending events without checking if the stream exists or if the stream version matches the expected event version. See more advanced scenarios in appending events documentation.

Reading events

Finally, we can read events back from the some-stream stream.

Python

events = client.read_stream(stream_name)

JavaScript

const events = client.readStream(STREAM_NAME, {
  direction: FORWARDS,
  fromRevision: START,
  maxCount: 10,
});

TypeScript

const events = client.readStream<TestEvent>(STREAM_NAME, {
  direction: FORWARDS,
  fromRevision: START,
  maxCount: 10,
});

Java

ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromStart()
        .maxCount(10);

ReadResult result = client.readStream("some-stream", options)
        .get();

C#

var result = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    StreamPosition.Start,
    cancellationToken: cancellationToken
);

var events = await result.ToListAsync(cancellationToken);

Go

stream, err := db.ReadStream(context.Background(), "some-stream", kurrentdb.ReadStreamOptions{}, 10)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    // Doing something productive with the event
    fmt.Println(event)
}

Rust

let options = ReadStreamOptions::default().max_count(10);
let mut stream = client.read_stream("some-stream", &options).await?;

while let Some(event) = stream.next().await? {
    // Doing something productive with the events.
}

When you read events from the stream, you get a collection of ResolvedEvent structures. The event payload is returned as a byte array and needs to be deserialized. See more advanced scenarios in reading events documentation.