KCL 1.x and 2.x information (original) (raw)

Important

Amazon Kinesis Client Library (KCL) versions 1.x and 2.x are outdated. KCL 1.x will reach end-of-support on January 30, 2026. We strongly recommend that you migrate your KCL applications using version 1.x to the latest KCL version before January 30, 2026. To find the latest KCL version, see Amazon Kinesis Client Library page on GitHub. For information about the latest KCL versions, see Use Kinesis Client Library. For information about migrating from KCL 1.x to KCL 3.x, see Migrating from KCL 1.x to KCL 3.x.

One of the methods of developing custom consumer applications that can process data from KDS data streams is to use the Kinesis Client Library (KCL).

Topics
Note

For both KCL 1.x and KCL 2.x, it is recommended that you upgrade to the latest KCL 1.x version or KCL 2.x version, depending on your usage scenario. Both KCL 1.x and KCL 2.x are regularly updated with newer releases that include the latest dependency and security patches, bug fixes, and backward-compatible new features. For more information, see https://github.com/awslabs/amazon-kinesis-client/releases.

KCL helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing. These include load balancing across multiple consumer application instances, responding to consumer application instance failures, checkpointing processed records, and reacting to resharding. The KCL takes care of all of these subtasks so that you can focus your efforts on writing your custom record-processing logic.

The KCL is different from the Kinesis Data Streams APIs that are available in the AWS SDKs. The Kinesis Data Streams APIs help you manage many aspects of Kinesis Data Streams, including creating streams, resharding, and putting and getting records. The KCL provides a layer of abstraction around all these subtasks, specifically so that you can focus on your consumer application’s custom data processing logic. For information about the Kinesis Data Streams API, see the Amazon Kinesis API Reference.

Important

The KCL is a Java library. Support for languages other than Java is provided using a multi-language interface called the MultiLangDaemon. This daemon is Java-based and runs in the background when you are using a KCL language other than Java. For example, if you install the KCL for Python and write your consumer application entirely in Python, you still need Java installed on your system because of the MultiLangDaemon. Further, MultiLangDaemon has some default settings that you might need to customize for your use case, for example, the AWS region that it connects to. For more information about the MultiLangDaemon on GitHub, see KCL MultiLangDaemon project.

The KCL acts as an intermediary between your record processing logic and Kinesis Data Streams.

Currently, you can use either of the following supported versions of KCL to build your custom consumer applications:

You can use either KCL 1.x or KCL 2.x to build consumer applications that use shared throughput. For more information, see Develop custom consumers with shared throughput using KCL.

To build consumer applications that use dedicated throughput (enhanced fan-out consumers), you can only use KCL 2.x. For more information, see Develop enhanced fan-out consumers with dedicated throughput.

For information about the differences between KCL 1.x and KCL 2.x, and instructions on how to migrate from KCL 1.x to KCL 2.x, see Migrate consumers from KCL 1.x to KCL 2.x.

Important

Each KCL consumer application instance has one worker.
The worker initializes and oversees various tasks, including syncing shard and lease information, tracking shard assignments, and processing data from the shards. A worker provides KCL with the configuration information for the consumer application, such as the name of the data stream whose data records this KCL consumer application is going to process and the AWS credentials that are needed to access this data stream. The worker also kick starts that specific KCL consumer application instance to deliver data records from the data stream to the record processors.

Important

Every worker will contend to hold all available leases for all available shards in a data stream. But only one worker will successfully hold each lease at any one time.
For example, if you have a consumer application instance A with worker A that is processing a data stream with 4 shards, worker A can hold leases to shards 1, 2, 3, and 4 at the same time. But if you have two consumer application instances: A and B with worker A and worker B, and these instances are processing a data stream with 4 shards, worker A and worker B cannot both hold the lease to shard 1 at the same time. One worker holds the lease to a particular shard until it is ready to stop processing this shard’s data records or until it fails. When one worker stops holding the lease, another worker takes up and holds the lease.
For more information, (these are the Java KCL repositories), see https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java for KCL 1.x and https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java for KCL 2.x.

Topics

What is a lease table

For each Amazon Kinesis Data Streams application, KCL uses a unique lease table (stored in a Amazon DynamoDB table) to keep track of the shards in a KDS data stream that are being leased and processed by the workers of the KCL consumer application.

Important

KCL uses the name of the consumer application to create the name of the lease table that this consumer application uses, therefore each consumer application name must be unique.

You can view the lease table using the Amazon DynamoDB console while the consumer application is running.

If the lease table for your KCL consumer application does not exist when the application starts up, one of the workers creates the lease table for this application.

Important

Your account is charged for the costs associated with the DynamoDB table, in addition to the costs associated with Kinesis Data Streams itself.

Each row in the lease table represents a shard that is being processed by the workers of your consumer application. If your KCL consumer application processes only one data stream, then leaseKey which is the hash key for the lease table is the shard ID. If you are Process multiple data streams with the same KCL 2.x for Java consumer application, then the structure of the leaseKey looks like this:account-id:StreamName:streamCreationTimestamp:ShardId. For example,111111111:multiStreamTest-1:12345:shardId-000000000336.

In addition to the shard ID, each row also includes the following data:

Note

This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3. For more information aboutPeriodicShardSyncManager and periodic synchronization between leases and shards, see How a lease table is synchronized with the shards in a Kinesis data stream.

Note

This data is present in the lease table for every shard starting with KCL 1.14 and KCL 2.3.

Note

This data is only present in the lease table if you are Process multiple data streams with the same KCL 2.x for Java consumer application. This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later.

Note

This data is only present in the lease table if you are Process multiple data streams with the same KCL 2.x for Java consumer application. This is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later.

Throughput

If your Amazon Kinesis Data Streams application receives provisioned-throughput exceptions, you should increase the provisioned throughput for the DynamoDB table. The KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second, but this might not be sufficient for your application. For example, if your Amazon Kinesis Data Streams application does frequent checkpointing or operates on a stream that is composed of many shards, you might need more throughput.

For information about provisioned throughput in DynamoDB, see Read/Write Capacity Mode and Working with Tables and Data in the Amazon DynamoDB Developer Guide.

How a lease table is synchronized with the shards in a Kinesis data stream

Workers in KCL consumer applications use leases to process shards from a given data stream. The information on what worker is leasing what shard at any given time is stored in a lease table. The lease table must remain in sync with the latest shard information from the data stream while the KCL consumer application is running. KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service during the consumer application bootstraping (either when the consumer application is initialized or restarted) and also whenever a shard that is being processed reaches an end (resharding). In other words, the workers or a KCL consumer application are synchronized with the data stream that they are processing during the initial consumer application bootstrap and whenever the consumer application encounters a data stream reshard event.

Topics

Synchronization in KCL 1.0 - 1.13 and KCL 2.0 - 2.2

In KCL 1.0 - 1.13 and KCL 2.0 - 2.2, during consumer application's bootstraping and also during each data stream reshard event, KCL synchronizes the lease table with the shards information acquired from the Kinesis Data Streams service by invoking the ListShards or theDescribeStream discovery APIs. In all the KCL versions listed above, each worker of a KCL consumer application completes the following steps to perform the lease/shard synchronization process during the consumer application's bootstrapping and at each stream reshard event:

Synchronization in KCL 2.x, starting with KCL 2.3 and later

Starting with the latest supported versions of KCL 2.x (KCL 2.3) and later, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application.

Synchronization in KCL 1.x, starting with KCL 1.14 and later

Starting with the latest supported versions of KCL 1.x (KCL 1.14) and later, the library now supports the following changes to the synchronization process. These lease/shard synchronization changes significantly reduce the number of API calls made by KCL consumer applications to the Kinesis Data Streams service and optimize the lease management in your KCL consumer application.

This section describes the following changes in KCL 2.x for Java that enable you to create KCL consumer applications that can process more than one data stream at the same time.

Important

Multistream processing is only supported in KCL 2.x for Java, starting with KCL 2.3 for Java and later.

Multistream processing is NOT supported for any other languages in which KCL 2.x can be implemented.

Multistream processing is NOT supported in any versions of KCL 1.x.

package software.amazon.kinesis.common;  
import lombok.Data;  
import lombok.experimental.Accessors;  
@Data  
@Accessors(fluent = true)  
public class StreamConfig {  
    private final StreamIdentifier streamIdentifier;  
    private final InitialPositionInStreamExtended initialPositionInStreamExtended;  
    private String consumerArn;  
}  
                  

Note that the StreamIdentifier andInitialPositionInStreamExtended are required fields, whileconsumerArn is optional. You must provide theconsumerArn only if you are using KCL 2.x to implement an enhanced fan-out consumer application.
For more information about StreamIdentifier, see https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129. To create a StreamIdentifier, we recommend that you create a multistream instance from the streamArn and thestreamCreationEpoch that is available in v2.5.0 and later. In KCL v2.3 and v2.4, which don't support streamArm, create a multistream instance by using the formataccount-id:StreamName:streamCreationTimestamp. This format will be deprecated and no longer supported starting with the next major release.
MultistreamTracker also includes a strategy for deleting leases of old streams in the lease table (formerStreamsLeasesDeletionStrategy). Notice that the strategy CANNOT be changed during the consumer application runtime. For more information, see https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java

 /**  
     * Constructor to initialize ConfigsBuilder with StreamName  
     * @param streamName  
     * @param applicationName  
     * @param kinesisClient  
     * @param dynamoDBClient  
     * @param cloudWatchClient  
     * @param workerIdentifier  
     * @param shardRecordProcessorFactory  
     */  
    public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,  
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,  
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,  
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {  
        this.appStreamTracker = Either.right(streamName);  
        this.applicationName = applicationName;  
        this.kinesisClient = kinesisClient;  
        this.dynamoDBClient = dynamoDBClient;  
        this.cloudWatchClient = cloudWatchClient;  
        this.workerIdentifier = workerIdentifier;  
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;  
    }  

Or you can initialize ConfigsBuilder with MultiStreamTracker if you want to implement a KCL consumer application that processes multiple streams at the same time.

* Constructor to initialize ConfigsBuilder with MultiStreamTracker  
     * @param multiStreamTracker  
     * @param applicationName  
     * @param kinesisClient  
     * @param dynamoDBClient  
     * @param cloudWatchClient  
     * @param workerIdentifier  
     * @param shardRecordProcessorFactory  
     */  
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,  
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,  
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,  
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {  
        this.appStreamTracker = Either.left(multiStreamTracker);  
        this.applicationName = applicationName;  
        this.kinesisClient = kinesisClient;  
        this.dynamoDBClient = dynamoDBClient;  
        this.cloudWatchClient = cloudWatchClient;  
        this.workerIdentifier = workerIdentifier;  
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;  
    }  
                  
Important

When your existing KCL consumer application is configured to process only one data stream, the leaseKey (which is the hash key for the lease table) is the shard ID. If you reconfigure this existing KCL consumer application to process multiple data streams, it breaks your lease table, because with multistream support, the leaseKey structure must be as follows:account-id:StreamName:StreamCreationTimestamp:ShardId.

You can integrate your Kinesis data streams with the AWS Glue Schema Registry. The AWS Glue Schema Registry allows you to centrally discover, control, and evolve schemas, while ensuring data produced is continuously validated by a registered schema. A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. The AWS GlueSchema Registry lets you improve end-to-end data quality and data governance within your streaming applications. For more information, see AWS Glue Schema Registry. One of the ways to set up this integration is through the KCL in Java.

Important

Currently, Kinesis Data Streams and AWS Glue Schema Registry integration is only supported for the Kinesis data streams that use KCL 2.3 consumers implemented in Java. Multi-language support is not provided. KCL 1.0 consumers are not supported. KCL 2.x consumers prior to KCL 2.3 are not supported.

For detailed instructions on how to set up integration of Kinesis Data Streams with Schema Registry using the KCL, see the "Interacting with Data Using the KPL/KCL Libraries" section in Use Case: Integrating Amazon Kinesis Data Streams with the AWS Glue Schema Registry.