What is a KTable in Kafka Streams? (original) (raw)
KTable
This module defines the KTable, explains how it differs from a KStream, and covers its basic operations, as well as its GlobalKTable variant.
Update Streams
The module Basic Operations defined event streams and mentioned that keys across records in event streams are completely independent of one another, even if they are identical.
Update Streams are the exact opposite: if a new record comes in with the same key as an existing record, the existing record will be overwritten.
This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.
Defining a KTable
To define a KTable, you use a StreamsBuilder, as with a KStream, but you call builder.table instead of builder.stream. With the builder.table method, you provide an inputTopic, along with a Materialized configuration object specifying your SerDes (this replaces the Consumed object that you use with a KStream):
StreamsBuilder builder = new StreamsBuilder(); KTable<String, String> firstKTable = builder.table(inputTopic, Materialized.with(Serdes.String(), Serdes.String()));
KTable Operations
The KTable API has operations similar to those of the KStream API, including mapping and filtering.
Mapping
As with KStream, mapValues transforms values and map lets you transform both keys and values.
firstKTable.mapValues(value -> ..) firstKTable.map((key,value) -> ..)
Filtering
As with KStream, the filter operation lets you supply a predicate, and only records that match the predicate are forwarded to the next node in the topology:
firstKTable.filter((key, value) -> ..)
GlobalKTable
A GlobalKTable is built using the GlobalKTable method on the StreamBuilder. As with a regular KTable, you pass in a Materialized configuration with the SerDes:
StreamsBuilder builder = new StreamsBuilder(); GlobalKTable<String, String> globalKTable = builder.globalTable(inputTopic, Materialized.with(Serdes.String(), Serdes.String()));
The main difference between a KTable and a GlobalKTable is that a KTable shards data between Kafka Streams instances, while a GlobalKTable extends a full copy of the data to each instance. You typically use a GlobalKTable with lookup data. There are also some idiosyncrasies regarding joins between a GlobalKTable and a KStream; we’ll cover these later in the course.
Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.