KafkaConsumer — kafka-python 2.2.6 documentation (original) (raw)

kafka-python

class kafka.KafkaConsumer(*topics, **configs)[source]

Consume records from a Kafka cluster.

The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

The consumer is not thread safe and should not be shared across threads.

Parameters:

*topics (str) – optional list of topics to subscribe to. If not set, call subscribe() orassign() before consuming records.

Keyword Arguments:

assign(partitions)[source]

Manually assign a list of TopicPartitions to this consumer.

Parameters:

partitions (list of TopicPartition) – Assignment for this instance.

Raises:

Warning

It is not possible to use both manual partition assignment withassign() and group assignment withsubscribe().

Note

This interface does not support incremental assignment and will replace the previous assignment (if there was one).

Note

Manual topic assignment through this method does not use the consumer’s group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change.

assignment()[source]

Get the TopicPartitions currently assigned to this consumer.

If partitions were directly assigned usingassign(), then this will simply return the same partitions that were previously assigned. If topics were subscribed using subscribe(), then this will give the set of topic partitions currently assigned to the consumer (which may be None if the assignment hasn’t happened yet, or if the partitions are in the process of being reassigned).

Returns:

{TopicPartition, …}

Return type:

set

beginning_offsets(partitions)[source]

Get the first offset for the given partitions.

This method does not change the current consumer position of the partitions.

Note

This method may block indefinitely if the partition does not exist.

Parameters:

partitions (list) – List of TopicPartition instances to fetch offsets for.

Returns:

The earliest available offsets for the given partitions.

Return type:

{TopicPartition: int}

Raises:

bootstrap_connected()[source]

Return True if the bootstrap is connected.

close(autocommit=True, timeout_ms=None)[source]

Close the consumer, waiting indefinitely for any needed cleanup.

Keyword Arguments:

commit(offsets=None, timeout_ms=None)[source]

Commit offsets to kafka, blocking until success or error.

This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. To avoid re-processing the last message read if a consumer is restarted, the committed offset should be the next message your application should consume, i.e.: last_offset + 1.

Blocks until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).

Currently only supports kafka-topic offset storage (not zookeeper).

Parameters:

offsets (dict , optional) – {TopicPartition: OffsetAndMetadata} dict to commit with the configured group_id. Defaults to currently consumed offsets for all subscribed partitions.

commit_async(offsets=None, callback=None)[source]

Commit offsets to kafka asynchronously, optionally firing callback.

This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. To avoid re-processing the last message read if a consumer is restarted, the committed offset should be the next message your application should consume, i.e.: last_offset + 1.

This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.

Parameters:

Returns:

kafka.future.Future

committed(partition, metadata=False, timeout_ms=None)[source]

Get the last committed offset for the given partition.

This offset will be used as the position for the consumer in the event of a failure.

This call will block to do a remote call to get the latest committed offsets from the server.

Parameters:

Returns:

The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.

Raises:

end_offsets(partitions)[source]

Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

This method does not change the current consumer position of the partitions.

Note

This method may block indefinitely if the partition does not exist.

Parameters:

partitions (list) – List of TopicPartition instances to fetch offsets for.

Returns:

The end offsets for the given partitions.

Return type:

{TopicPartition: int}

Raises:

highwater(partition)[source]

Last known highwater offset for a partition.

A highwater offset is the offset that will be assigned to the next message that is produced. It may be useful for calculating lag, by comparing with the reported position. Note that both position and highwater refer to the next offset – i.e., highwater offset is one greater than the newest available message.

Highwater offsets are returned in FetchResponse messages, so will not be available if no FetchRequests have been sent for this partition yet.

Parameters:

partition (TopicPartition) – Partition to check

Returns:

Offset if available

Return type:

int or None

metrics(raw=False)[source]

Get metrics on consumer performance.

This is ported from the Java Consumer, for details see:https://kafka.apache.org/documentation/#consumer_monitoring

Warning

This is an unstable interface. It may change in future releases without warning.

offsets_for_times(timestamps)[source]

Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

This is a blocking call. The consumer does not have to be assigned the partitions.

If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, None will be returned for that partition. None will also be returned for the partition if there are no messages in it.

Note

This method may block indefinitely if the partition does not exist.

Parameters:

timestamps (dict) – {TopicPartition: int} mapping from partition to the timestamp to look up. Unit should be milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC))

Returns:

mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp.

Return type:

{TopicPartition: OffsetAndTimestamp}

Raises:

partitions_for_topic(topic)[source]

This method first checks the local metadata cache for information about the topic. If the topic is not found (either because the topic does not exist, the user is not authorized to view the topic, or the metadata cache is not populated), then it will issue a metadata update call to the cluster.

Parameters:

topic (str) – Topic to check.

Returns:

Partition ids

Return type:

set

pause(*partitions)[source]

Suspend fetching from the requested partitions.

Future calls to poll() will not return any records from these partitions until they have been resumed usingresume().

Note: This method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.

Parameters:

*partitions (TopicPartition) – Partitions to pause.

paused()[source]

Get the partitions that were previously paused usingpause().

Returns:

{partition (TopicPartition), …}

Return type:

set

poll(timeout_ms=0, max_records=None, update_offsets=True)[source]

Fetch data from assigned topics / partitions.

Records are fetched and returned in batches by topic-partition. On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek() or automatically set as the last committed offset for the subscribed list of partitions.

Incompatible with iterator interface – use one or the other, not both.

Parameters:

Returns:

Topic to list of records since the last fetch for the

subscribed list of topics and partitions.

Return type:

dict

position(partition, timeout_ms=None)[source]

Get the offset of the next record that will be fetched

Parameters:

partition (TopicPartition) – Partition to check

Returns:

Offset or None

Return type:

int

resume(*partitions)[source]

Resume fetching from the specified (paused) partitions.

Parameters:

*partitions (TopicPartition) – Partitions to resume.

seek(partition, offset)[source]

Manually specify the fetch offset for a TopicPartition.

Overrides the fetch offsets that the consumer will use on the nextpoll(). If this API is invoked for the same partition more than once, the latest offset will be used on the nextpoll().

Note: You may lose data if this API is arbitrarily used in the middle of consumption to reset the fetch offsets.

Parameters:

Raises:

AssertionError – If offset is not an int >= 0; or if partition is not currently assigned.

seek_to_beginning(*partitions)[source]

Seek to the oldest available offset for partitions.

Parameters:

*partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions.

Raises:

AssertionError – If any partition is not currently assigned, or if no partitions are assigned.

seek_to_end(*partitions)[source]

Seek to the most recent available offset for partitions.

Parameters:

*partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions.

Raises:

AssertionError – If any partition is not currently assigned, or if no partitions are assigned.

subscribe(topics=(), pattern=None, listener=None)[source]

Subscribe to a list of topics, or a topic regex pattern.

Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).

This method is incompatible with assign().

Parameters:

Raises:

subscription()[source]

Get the current topic subscription.

Returns:

{topic, …}

Return type:

set

topics()[source]

Get all topics the user is authorized to view. This will always issue a remote call to the cluster to fetch the latest information.

Returns:

topics

Return type:

set

unsubscribe()[source]

Unsubscribe from all topics and clear all assigned partitions.