Kafka Streams Memory Management for Confluent Platform (original) (raw)

You can specify the total memory (RAM) size used for internal caching and compacting of records. This caching happens before the records are written to state stores or forwarded downstream to other nodes.

The record caches are implemented slightly different in the DSL and Processor API.

Record caches in the DSL

You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is leveraged by the following KTable instances:

For such KTable instances, the record cache is used for:

Use the following example to understand the behaviors with and without record caching. In this example, the input is aKStream<String, Integer> with the records <K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>. The focus in this example is on the records with key == A.

The cache size is specified by using the statestore.cache.max.bytesparameter, which is a global setting per processing topology:

// Enable record cache of size 10 MB. Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10 * 1024 * 1024L);

This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance withT threads and C bytes allocated for caching, each thread will have an even C/T bytes to construct its own cache and use as it sees fit among its tasks. This means that there are as many caches as there are threads, but no sharing of caches across threads happens.

The basic API for the cache is made of put() and get() calls. Records are evicted using a simple LRU scheme after the cache size is reached. The first time a keyed record R1 = <K1, V1>finishes processing at a node, it is marked as dirty in the cache. Any other keyed record R2 = <K1, V2> with the same key K1 that is processed on that node during that time will overwrite <K1, V1>, this is referred to as “being compacted”. This has the same effect asKafka’s log compaction, but happens earlier, while the records are still in memory, and within your client-side application, rather than on the server-side (i.e. the Apache Kafka® broker). After flushing, R2 is forwarded to the next processing node and then written to the local state store.

The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or statestore.cache.max.bytes (cache pressure) hits. Bothcommit.interval.ms and statestore.cache.max.bytes are global parameters. As such, it is not possible to specify different parameters for individual nodes.

Here are example settings for both parameters based on desired scenarios.

The effect of these two configurations is described in the figure below. The records are shown using 4 keys: blue, red, yellow, and green. Assume the cache has space for only 3 keys.

../../_images/streams-cache-and-commit-interval.png

Record caches in the Processor API

You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is used for internal caching and compacting of output records before they are written from a stateful processor node to its state stores.

The record cache in the Processor API does not cache or compact any output records that are being forwarded downstream. This means that all downstream processor nodes can see all records, whereas the state stores see a reduced number of records. This does not impact correctness of the system, but is a performance optimization for the state stores. For example, with the Processor API you can store a record in a state store while forwarding a different value downstream.

Following from the example first shown in section State stores, to enable caching, you can add the withCachingEnabled call.

StoreBuilder countStoreBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("Counts"), Serdes.String(), Serdes.Long()) .withCachingEnabled()

RocksDB

Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include block_cache_size, write_buffer_size andmax_write_buffer_number. These can be specified through the rocksdb.config.setter configuration.

The Kafka Streams record cache and the RocksDB cache are not mutually exclusive. By default, both are enabled in a Kafka Streams app.

The record cache (on heap) is particularly useful for optimizing writes by reducing the number of updates to local state and changelog topics. The RocksDB block cache (off heap) optimizes reads.

Confluent recommends changing RocksDB’s default memory allocator, because the default allocator may lead to increased memory consumption. To change the memory allocator to jemalloc, set the LD_PRELOAD environment variable before you start your Kafka Streams application:

example: install jemalloc (on Debian)

apt install -y libjemalloc-dev

set LD_PRELOAD before you start your Kafka Streams application

export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so"

The RocksDB block cache only serves reads and doesn’t soak up writes. Writes always go into the memtable. Write traffic to disk is determined by the memtable being flushed, which happens during a commit or when the memtables are “full”. By default, you need 3 16MB memtables to fill up before flushing. You may be able to determine the right size by leveraging the RocksDB statistics to determine which limit you hit first: the commit interval or the memtable size limit.

The default RocksDB block-cache size is 50 MB per store, but the default size of the Kafka Streams record cache is 10 MB for caching for the entire instance. If you have a large number of stores, this 50 MB default can be too high.

As of 5.3.0, the memory usage across all instances can be bounded, limiting the total off-heap memory of your Kafka Streams application. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a sharedWriteBufferManager and count its memory against the block cache, and then pass the same Cache object to each instance. See RocksDB Memory Usagefor details. An example RocksDBConfigSetter implementing this is shown below:

public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

static { RocksDB.loadLibrary(); }

// See #1 below private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO); private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);

@Override public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

// These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);

// These options are recommended to be set when bounding the total memory
// See #2 below
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
// See #3 below
tableConfig.setBlockSize(BLOCK_SIZE);
options.setMaxWriteBufferNumber(N_MEMTABLES);
options.setWriteBufferSize(MEMTABLE_SIZE);
// Enable compression (optional). Compression can decrease the required storage
// and increase the CPU usage of the machine. For CompressionType values, see
// https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html.
options.setCompressionType(CompressionType.LZ4_COMPRESSION);

options.setTableFormatConfig(tableConfig);

}

@Override public void close(final String storeName, final Options options) { // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance. } }

Footnotes:

  1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for “high priority” (aka index and filter) blocks, preventing them from being evicted by data blocks. The boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. For more information, see the full signature of the LRUCache constructor.
  2. This must be set in order for INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the RocksDB docs.
  3. You may want to modify the default block size per these instructions from the RocksDB GitHub. A larger block size means index blocks will be smaller, but the cached data blocks may contain more cold data that would otherwise be evicted.

While Confluent recommends setting at least the above configs, the specific options that yield the best performance are workload dependent, and you should consider experimenting with these to determine the best choices for your specific use case. Keep in mind that the optimal configs for one app may not apply to one with a different topology or input topic. In addition to the recommended configs above, you may want to consider using partitioned index filters as described by theRocksDB docs.

Other memory usage

There are other modules inside Kafka that allocate memory during runtime. They include the following:

Tip

Iterators should be closed explicitly to release resources: Store iterators (e.g., KeyValueIterator and WindowStoreIterator) must be closed explicitly upon completeness to release resources such as open file handlers and in-memory read buffers, or use try-with-resources statement (available since JDK7) for this Closeable class.

Otherwise, stream application’s memory usage keeps increasing when running until it hits an OOM.