KafkaProducer — kafka-python 2.2.5 documentation (original) (raw)

class kafka.KafkaProducer(**configs)[source]

A Kafka client that publishes records to the Kafka cluster.

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

The producer consists of a RecordAccumulator which holds records that haven’t yet been transmitted to the server, and a Sender background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

send() is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

The ‘acks’ config controls the criteria under which requests are considered complete. The “all” setting will result in blocking on the full commit of the record, the slowest but most durable setting.

If the request fails, the producer can automatically retry, unless ‘retries’ is configured to 0. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details:https://kafka.apache.org/documentation.html#semantics).

The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the ‘batch_size’ config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).

By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set ‘linger_ms’ to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle’s algorithm in TCP. Note that records that arrive close together in time will generally batch together even with linger_ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.

The key_serializer and value_serializer instruct how to turn the key and value objects the user provides into bytes.

From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka’s delivery semantics from at least once to exactly once delivery. In particular, producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.

To enable idempotence, the enable_idempotence configuration must be set to True. If set, the retries config will default to float(‘inf’) and the acks config will default to ‘all’. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave theretries config unset, as it will be defaulted to float(‘inf’). Additionally, if a send() returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

To use the transactional producer and the attendant APIs, you must set thetransactional_id configuration property. If the transactional_id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, thereplication.factor should be at least 3, and the min.insync.replicasfor these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.

The purpose of the transactional_id is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a partitioned application.

Keyword Arguments:

abort_transaction()[source]

Aborts the ongoing transaction.

Raises: ProducerFencedError if another producer with the same

transactional_id is active.

begin_transaction()[source]

Should be called before the start of each new transaction.

Note that prior to the first invocation of this method, you must invoke init_transactions() exactly one time.

Raises:

ProducerFencedError if another producer is with the same – transactional_id is active.

bootstrap_connected()[source]

Return True if the bootstrap is connected.

close(timeout=None, null_logger=False)[source]

Close this producer.

Parameters:

timeout (float , optional) – timeout in seconds to wait for completion.

commit_transaction()[source]

Commits the ongoing transaction.

Raises: ProducerFencedError if another producer with the same

transactional_id is active.

flush(timeout=None)[source]

Invoking this method makes all buffered records immediately available to send (even if linger_ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.is_done() == True). A request is considered completed when either it is successfully acknowledged according to the ‘acks’ configuration for the producer, or it results in an error.

Other threads can continue sending messages while one thread is blocked waiting for a flush call to complete; however, no guarantee is made about the completion of messages sent after the flush call begins.

Parameters:

timeout (float , optional) – timeout in seconds to wait for completion.

Raises:

KafkaTimeoutError – failure to flush buffered records within the provided timeout

init_transactions()[source]

Needs to be called before any other methods when the transactional.id is set in the configuration.

This method does the following:

  1. Ensures any transactions initiated by previous instances of the producer with the same transactional_id are completed. If the previous instance had failed with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion.
  2. Gets the internal producer id and epoch, used in all future transactional messages issued by the producer.

Note that this method will raise KafkaTimeoutError if the transactional state cannot be initialized before expiration of max_block_ms.

Retrying after a KafkaTimeoutError will continue to wait for the prior request to succeed or fail. Retrying after any other exception will start a new initialization attempt. Retrying after a successful initialization will do nothing.

Raises:

metrics(raw=False)[source]

Get metrics on producer performance.

This is ported from the Java Producer, for details see:https://kafka.apache.org/documentation/#producer_monitoring

Warning

This is an unstable interface. It may change in future releases without warning.

partitions_for(topic)[source]

Returns set of all known partitions for the topic.

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)[source]

Publish a message to a topic.

Parameters:

Returns:

resolves to RecordMetadata

Return type:

FutureRecordMetadata

Raises:

send_offsets_to_transaction(offsets, consumer_group_id)[source]

Sends a list of consumed offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered consumed only if the transaction is committed successfully.

This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern.

Parameters:

Raises: