Getting started (original) (raw)
Getting started
Get started by connecting your application to KurrentDB.
Connecting to KurrentDB
For your application to start communicating with KurrentDB, you need to instantiate the client and configure it accordingly. Below are instructions for supported SDKs.
Insecure clusters
All our GRPC clients are secure by default and must be configured to connect to an insecure server via a connection string or the client's configuration.
Required packages
Install the client SDK package to your project.
Python
Install the kurrentdbclient
package from PyPI or use Poetry:
pip
pip install kurrentdbclient
Poetry
poetry add kurrentdbclient
NodeJS
Install the @kurrent/kurrentdb-client
package using NPM, Yarn or PNPM:
npm
npm install --save @kurrent/kurrentdb-client
yarn
yarn add @kurrent/kurrentdb-client
pnpm
pnpm add @kurrent/kurrentdb-client
TypeScript Declarations are included in the package.
Java
Add the kurrentdb-client
dependency to your Maven or Gradle project.
Maven
<dependency>
<groupId>io.kurrent</groupId>
<artifactId>kurrentdb-client</artifactId>
<version>1.0.0</version>
</dependency>
Gradle
implementation 'io.kurrent:kurrentdb-client:1.0.0'
For the most recent version of the KurrentDB client package, see Maven Central.
.NET
Add the KurrentDB.Client
package to your project:
dotnet add package KurrentDB.Client
Go
Install the kurrentdb
package using Go modules:
go get github.com/kurrent-io/KurrentDB-Client-Go/kurrentdb
Rust
No additional configuration is needed having Rust installed. Go check https://rustup.rs.
Connection string
Each SDK has its own way of configuring the client, but the connection string can always be used. The KurrentDB connection string supports two schemas: kurrentdb://
for connecting to a single-node server, and kurrentdb+discover://
for connecting to a multi-node cluster. The difference between the two schemas is that when using kurrentdb://
, the client will connect directly to the node; with kurrentdb+discover://
schema the client will use the gossip protocol to retrieve the cluster information and choose the right node to connect to. Since version 22.10, ESDB supports gossip on single-node deployments, so kurrentdb+discover://
schema can be used for connecting to any topology.
The connection string has the following format:
There, cluster.dns.name
is the name of a DNS A
record that points to all the cluster nodes. Alternatively, you can list cluster nodes separated by comma instead of the cluster DNS name:
kurrentdb+discover://admin:[email protected]:2113,node2.dns.name:2113,node3.dns.name:2113
There are a number of query parameters that can be used in the connection string to instruct the cluster how and where the connection should be established. All query parameters are optional.
Parameter | Accepted values | Default | Description |
---|---|---|---|
tls | true, false | true | Use secure connection, set to false when connecting to a non-secure server or cluster. |
connectionName | Any string | None | Connection name |
maxDiscoverAttempts | Number | 10 | Number of attempts to discover the cluster. |
discoveryInterval | Number | 100 | Cluster discovery polling interval in milliseconds. |
gossipTimeout | Number | 5 | Gossip timeout in seconds, when the gossip call times out, it will be retried. |
nodePreference | leader, follower, random, readOnlyReplica | leader | Preferred node role. When creating a client for write operations, always use leader. |
tlsVerifyCert | true, false | true | In secure mode, set to true when using an untrusted connection to the node if you don't have the CA file available. Don't use in production. |
tlsCaFile | String, file path | None | Path to the CA file when connecting to a secure cluster with a certificate that's not signed by a trusted CA. |
defaultDeadline | Number | None | Default timeout for client operations, in milliseconds. Most clients allow overriding the deadline per operation. |
keepAliveInterval | Number | 10 | Interval between keep-alive ping calls, in seconds. |
keepAliveTimeout | Number | 10 | Keep-alive ping call timeout, in seconds. |
userCertFile | String, file path | None | User certificate file for X.509 authentication. |
userKeyFile | String, file path | None | Key file for the user certificate used for X.509 authentication. |
When connecting to an insecure instance, specify tls=false
parameter. For example, for a node running locally use kurrentdb://localhost:2113?tls=false
. Note that usernames and passwords aren't provided there because insecure deployments don't support authentication and authorisation.
Creating a client
First, create a client and get it connected to the database.
Python
client = KurrentDBClient(
uri=connection_string
)
JavaScript
const client = KurrentDBClient.connectionString`{connectionString}`;
TypeScript
const client = KurrentDBClient.connectionString`{connectionString}`;
Java
KurrentDBClientSettings settings = KurrentDBConnectionString.parseOrThrow("{connectionString}");
KurrentDBClient client = KurrentDBClient.create(settings);
C#
const string connectionString = "kurrentdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";
var settings = KurrentDBClientSettings.Create(connectionString);
var client = new KurrentDBClient(settings);
Go
settings, err := kurrentdb.ParseConnectionString("{connectionString}")
if err != nil {
panic(err)
}
db, err := kurrentdb.NewClient(settings)
Rust
let settings = "{connectionString}".parse()?;
let client = Client::new(settings)?;
The client instance can be used as a singleton across the whole application. It doesn't need to open or close the connection.
Creating an event
You can write anything to KurrentDB as events. The client needs a byte array as the event payload. Normally, you'd use a serialized object, and it's up to you to choose the serialization method.
Server-side projections
User-defined server-side projections require events to be serialized in JSON format.
We use JSON for serialization in the documentation examples.
The code snippet below creates an event object instance, serializes it, and adds it as a payload to the EventData
structure, which the client can then write to the database.
Python
new_event = NewEvent(
id=uuid4(),
type="TestEvent",
data=b"I wrote my first event",
)
JavaScript
const event = jsonEvent({
type: "TestEvent",
data: {
entityId: uuid(),
importantData: "I wrote my first event!",
},
});
TypeScript
type TestEvent = JSONEventType<
"TestEvent",
{
entityId: string;
importantData: string;
}
>;
const event = jsonEvent<TestEvent>({
type: "TestEvent",
data: {
entityId: uuid(),
importantData: "I wrote my first event!",
},
});
Java
TestEvent event = new TestEvent();
JsonMapper jsonMapper = new JsonMapper();
event.setId(UUID.randomUUID().toString());
event.setImportantData("I wrote my first event!");
EventData eventData = EventData
.builderAsJson("TestEvent", jsonMapper.writeValueAsBytes(event))
.build();
C#
var evt = new TestEvent {
EntityId = Guid.NewGuid().ToString("N"),
ImportantData = "I wrote my first event!"
};
var eventData = new EventData(
Uuid.NewUuid(),
"TestEvent",
JsonSerializer.SerializeToUtf8Bytes(evt)
);
Go
testEvent := TestEvent{
Id: uuid.NewString(),
ImportantData: "I wrote my first event!",
}
data, err := json.Marshal(testEvent)
if err != nil {
panic(err)
}
eventData := kurrentdb.EventData{
ContentType: kurrentdb.ContentTypeJson,
EventType: "TestEvent",
Data: data,
}
Rust
let event = TestEvent {
id: Uuid::new_v4().to_string(),
important_data: "I wrote my first event!".to_string(),
};
let event_data = EventData::json("TestEvent", &event)?.id(Uuid::new_v4());
Appending events
Each event in the database has its own unique identifier (UUID). The database uses it to ensure idempotent writes, but it only works if you specify the stream revision when appending events to the stream.
In the snippet below, we append the event to the stream some-stream
.
Python
client.append_to_stream(
stream_name=stream_name,
events=[new_event],
current_version=StreamState.ANY,
)
JavaScript
await client.appendToStream(STREAM_NAME, event);
TypeScript
await client.appendToStream(STREAM_NAME, event);
Java
client.appendToStream("some-stream", eventData)
.get();
C#
await client.AppendToStreamAsync(
"some-stream",
StreamState.Any,
new[] { eventData },
cancellationToken: cancellationToken
);
Go
_, err = db.AppendToStream(context.Background(), "some-stream", kurrentdb.AppendToStreamOptions{}, eventData)
Rust
client
.append_to_stream("some-stream", &Default::default(), event_data)
.await?;
Here we are appending events without checking if the stream exists or if the stream version matches the expected event version. See more advanced scenarios in appending events documentation.
Reading events
Finally, we can read events back from the some-stream
stream.
Python
events = client.read_stream(stream_name)
JavaScript
const events = client.readStream(STREAM_NAME, {
direction: FORWARDS,
fromRevision: START,
maxCount: 10,
});
TypeScript
const events = client.readStream<TestEvent>(STREAM_NAME, {
direction: FORWARDS,
fromRevision: START,
maxCount: 10,
});
Java
ReadStreamOptions options = ReadStreamOptions.get()
.forwards()
.fromStart()
.maxCount(10);
ReadResult result = client.readStream("some-stream", options)
.get();
C#
var result = client.ReadStreamAsync(
Direction.Forwards,
"some-stream",
StreamPosition.Start,
cancellationToken: cancellationToken
);
var events = await result.ToListAsync(cancellationToken);
Go
stream, err := db.ReadStream(context.Background(), "some-stream", kurrentdb.ReadStreamOptions{}, 10)
if err != nil {
panic(err)
}
defer stream.Close()
for {
event, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
panic(err)
}
// Doing something productive with the event
fmt.Println(event)
}
Rust
let options = ReadStreamOptions::default().max_count(10);
let mut stream = client.read_stream("some-stream", &options).await?;
while let Some(event) = stream.next().await? {
// Doing something productive with the events.
}
When you read events from the stream, you get a collection of ResolvedEvent
structures. The event payload is returned as a byte array and needs to be deserialized. See more advanced scenarios in reading events documentation.