Replicate Multi-Datacenter Topics Across Kafka Clusters in Confluent Platform (original) (raw)

Confluent Replicator allows you to easily and reliably replicate topics from one Kafka cluster to another. In addition to copying the messages, Replicator will create topics as needed preserving the topic configuration in the source cluster. This includes preserving the number of partitions, the replication factor, and any configuration overrides specified for individual topics. Replicator is implemented as a connector.

Features

Replicator supports the following features:

Limitations

Multi-Datacenter Use Cases

Replicator can be deployed across clusters and in multiple datacenters. Multi-datacenter deployments enable use-cases such as:

Replication of events in Kafka topics from one cluster to another is the foundation of Confluent’s multi datacenter architecture.

Replication can be done with Confluent Replicator or using the open source Kafka MirrorMaker. Replicator can be used for replication of topic data as well as migrating schemas in Schema Registry.

This documentation focuses on Replicator, including architecture, quick start tutorial, how to configure and run Replicator in different contexts, tuning and monitoring, cross-cluster failover, and more. A section on how to migrate from MirrorMaker to Replicator is also included.

Some of the general thinking on deployment strategies can also apply to MirrorMaker, but if you are primarily interested in MirrorMaker, see Mirroring data between clusters in the Kafka documentation.

Architecture

The diagram below shows the Replicator architecture. Replicator uses the Kafka Connect APIs and Workers to provide high availability, load-balancing and centralized management.

../../_images/replicator_components.png

Replicator Architecture

Tip

You can deploy Replicator near the destination cluster or the origin cluster, and it will work either way. However, a best practice is to deploy Replicator closer to the destination cluster for reliability and performance over networks. Therefore, if the destination cluster is Confluent Cloud, we recommend that you deploy Replicator on an instance in the same region as your Confluent Cloud cluster. However, if the origin cluster does not permit external connections, you may deploy Replicator in the origin cluster. (See also Migrate Topics on Confluent Cloud Clusters.)

Example Deployment

In a typical multi-datacenter deployment, data from two geographically distributed Kafka clusters located in separate datacenters is aggregated in a separate cluster located in another datacenter. The origin of the copied data is referred to as the “source” cluster while the target of the copied data is referred to as the “destination.”

Each source cluster requires a separate instance of Replicator. For convenience you can run them in the same Connect cluster, located in the aggregate datacenter.

../../_images/replicator1.png

Replication to an Aggregate Cluster

Guidelines for Getting Started

Follow these guidelines to configure a multi-datacenter deployment using Replicator:

  1. Use the Replicator quick start to set up replication between two Kafka clusters.
  2. Learn how to install and configure Replicator and other Confluent Platform components in multi datacenter environments.
  3. Before running Replicator in production, make sure you read the monitoring and tuning guide.
  4. For a practical guide to designing and configuring multiple Kafka clusters to be resilient in case of a disaster scenario, see the Disaster Recovery white paper. This white paper provides a plan for failover, failback, and ultimately successful recovery.

Demos and Examples

After completing the Replicator quick start, explore these hands-on working examples of Replicator in multi-datacenter deployments, for which you can download the demo from GitHub and run yourself. Refer to the diagram below to determine the Replicator examples that correspond to your deployment scenario.

../../_images/replicator-demos.png

  1. Kafka on-premises to Kafka on-premises
  2. Kafka on-premises to Confluent Cloud
  3. Confluent Cloud to Confluent Cloud

Topic Renaming

By default, the replicator is configured to use the same topic name in both the source and destination clusters. This works fine if you are only replicating from a single cluster. When copying data from multiple clusters to a single destination (i.e. the aggregate use case), you should use a separate topic for each source cluster in case there are any configuration differences between the topics in the source clusters.

It is possible to use the same Kafka cluster as the source and destination as long as you ensure that the replicated topic name is different. This is not a recommended pattern since generally you should prefer Kafka’s built-in replication within the same cluster, but it may be useful in some cases (e.g. testing).

Starting with Confluent Platform 5.0, Replicator protects against circular replication through the use of provenance headers. This guarantees that if two Replicator instances are configured to run, one replicating from DC1 to DC2 and the second instance configured to replicate from DC2 to DC1, Replicator will ensure that messages replicated to DC2 are not replicated back to DC1, and vice versa. As a result, Replicator safely runs in each direction.

Although Replicator can enable applications in different datacenters to access topics with the same names, you should design client applications with a topic naming strategy that takes into consideration a number of factors.

If you plan to have the same topic name span datacenters, be aware that in this configuration:

In some cases, you may not want to use the same topic name in each datacenter. For example, in cases where:

In these cases, refer to the appendix on “Topic Naming Strategies to Prevent Cyclic Repetition” in the Disaster Recovery white paper.

Security and ACL Configurations

ACLs Overview

Replicator supports communication with secure Kafka over TLS/SSL for both the source and destination clusters. Replicator also supports TLS/SSL or SASL for authentication. Differing security configurations can be used on the source and destination clusters.

All properties documented here are additive (i.e. you can apply both TLS/SSL Encryption and SASL Plain authentication properties) except for security.protocol. The following table can be used to determine the correct value for this:

Encryption Authentication security.protocol
TLS/SSL None SSL
TLS/SSL TLS/SSL SSL
TLS/SSL SASL SASL_SSL
Plaintext SASL SASL_PLAINTEXT

You can configure Replicator connections to source and destination Kafka with:

To configure security on the source cluster, see the connector configurations for Source Kafka: Security. To configure security on the destination cluster, see the connector configurations Destination Kafka: Security and the general security configuration for Connect workers here.

See also

To see the required security configuration parameters for Replicator consolidated in one place, try out the docker-compose environments in GitHub confluentinc/examples.

When using SASL or TLS/SSL authentication and ACL is enabled on source or destination or both, Replicator requires the ACLs described in the following sections.

ACL commands are provided for both Confluent Platform (Confluent Platform CLI Command Reference) andConfluent Cloud.

For more information on configuring ACLs, see Use Access Control Lists (ACLs) for Authorization in Confluent Platform.

Principal Users (Confluent Platform) and Service Accounts (Confluent Cloud)

Commands to configure ACLs are given here for both Confluent Platform and Confluent Cloud.

On Confluent Platform, associate ACLs with a service principal.

On Confluent Cloud, associate ACLs with a Confluent Cloud service account.

To create a service account for Confluent Cloud, run the following the command:

confluent iam service-account create --description ""

For example:

confluent iam service-account create my-first-cluster-test-acls --description "test ACLs on Cloud"

+-------------+----------------------------+ | Id | 123456 | | Resource ID | ab-123abc | | Name | my-first-cluster-test-acls | | Description | test ACLs on Cloud | +-------------+----------------------------+

Save the service account ID to use in the following commands to create the ACLs.

Tip

You can retrieve service account IDs by listing them with confluent iam service-account list.

ACLs for License Management

For license management, you need the following ACLs:

Cluster Resource Operation
Destination (or other cluster configured with confluent.topic.bootstrap.servers) TOPIC - _confluent-command All

Commands to configure the above ACLs on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic _confluent-command

ACLs to Read from the Source Cluster

To read from the source cluster, you need the following ACLs:

Cluster Resource Operation
Source CLUSTER Describe
Source TOPIC - all topics Replicator will replicate Describe
Source TOPIC - all topics Replicator will replicate Read
Source GROUP - The consumer group name is determined by the Replicator name or by the src.consumer.group.id property Read

Commands to configure the above ACLs on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation READ --topic

ACLs to Write to the Destination Cluster

To write to the destination cluster, you need the following ACLs:

Cluster Resource Operation
Destination CLUSTER Describe
Destination TOPIC - all topics Replicator will replicate Describe
Destination TOPIC - all topics Replicator will replicate Write
Destination TOPIC - all topics Replicator will replicate Read

Commands to configure the above ACLs on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation WRITE --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation READ --topic

ACLs for Topic Creation and Config Sync

If using the topic creation and config sync features of Replicator (enabled by default), you need the following ACLs:

Cluster Resource Operation
Source TOPIC - all topics Replicator will replicate DescribeConfigs
Destination TOPIC - all topics Replicator will replicate Create
Destination TOPIC - all topics Replicator will replicate DescribeConfigs
Destination TOPIC - all topics Replicator will replicate AlterConfigs

For configuration options relating to topic creation and config sync, see Destination Topics.

Commands to configure the above ACLs on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBECONFIGS --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBECONFIGS --topic kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation CREATE --cluster kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALTERCONFIGS --cluster

ACLs for Offset Translation

If using the offset translation feature of Replicator (enabled by default), you need the following ACLs:

Cluster Resource Operation
Source TOPIC - __consumer_timestamps All
Destination GROUP - All consumer groups that will be translated All

For configuration options relating to offset translation, see consumer_offset_translation.

Commands to configure the above ACLs on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic {_consumer_timestamps} kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --topic {}

ACLs for the Timestamp Interceptor

Any clients instrumented with the Replicator timestamp interceptor must also have the following ACLs:

Cluster Resource Operation
Source TOPIC - __consumer_timestamps Write
Source TOPIC - __consumer_timestamps Describe

Commands to configure the above on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation WRITE --topic __consumer_timestamps kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation DESCRIBE --topic __consumer_timestamps

ACLs for Source Offset Management

If using the source offset management feature of Replicator (enabled by default), you need the following ACLs:

Cluster Resource Operation
Source GROUP - The consumer group name is determined by the Replicator name or by the src.consumer.group.id property All

For configuration options relating to offset management see Offset Management in the Replicator Configuration Reference.

Commands to configure the above ACLs on Confluent Platform:

kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User: --operation ALL --group

Replicator with RBAC

When using RBAC, Replicator clients should use token authentication as described in Configure Clients for SASL/OAUTHBEARER authentication in Confluent Platform. These configurations should be prefixed with the usual Replicator prefixes of src.kafka. and dest.kafka.. An example configuration for source and destination cluster that are RBAC enabled is below:

src.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
username="sourceUser
password="xxx"
metadataServerUrls="http://sourceHost:8090"; src.kafka.security.protocol=SASL_PLAINTEXT src.kafka.sasl.mechanism=OAUTHBEARER src.kafka.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

dest.kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
username="destUser
password="xxx"
metadataServerUrls="http://destHost:8090"; dest.kafka.security.protocol=SASL_PLAINTEXT dest.kafka.sasl.mechanism=OAUTHBEARER dest.kafka.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

for Replicator executable these should not be prefixed and should be placed in files referred to by --consumer.config and --producer.config as shown below:

in --consumer.config

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
username="sourceUser
password="xxx"
metadataServerUrls="http://sourceHost:8090"; security.protocol=SASL_PLAINTEXT sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

in --producer.config

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
username="destUser
password="xxx"
metadataServerUrls="http://destHost:8090"; security.protocol=SASL_PLAINTEXT sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler

The configuration above requires that the Metadata Service (MDS) is running. For more information on this see Configure Metadata Service (MDS) in Confluent Platform

Important

The rolebindings below are based on the following assumptions:

Replicator requires the rolebindings listed below.

For the backing Connect cluster:

Cluster Resource Role
Destination GROUP - name given by the group.id property ResourceOwner
Destination TOPIC - name given by the offset.storage.topic property (default connect-offsets) ResourceOwner
Destination TOPIC - name given by the config.storage.topic property (default connect-configs) ResourceOwner
Destination TOPIC - name given by the status.storage.topic property (default connect-status) ResourceOwner

For license management:

Cluster Resource Role
Destination (or other cluster configured with confluent.topic.bootstrap.servers) TOPIC - _confluent-command DeveloperRead
Destination (or other cluster configured with confluent.topic.bootstrap.servers) TOPIC - _confluent-command DeveloperWrite
Destination (or other cluster configured with confluent.topic.bootstrap.servers) TOPIC - _confluent-command DeveloperManage

To read from the source cluster:

Cluster Resource Role
Source TOPIC - all topics that Replicator will replicate DeveloperRead
Source TOPIC - all topics that Replicator will replicate DeveloperManage

To write to the destination cluster:

Cluster Resource Role
Destination TOPIC - all topics that Replicator will replicate ResourceOwner

Important

If not using the topic configuration sync feature of Replicator (enabled by default) then the following roles can be used in place of ResourceOwner:

Cluster Resource Role
Destination TOPIC - all topics that Replicator will replicate DeveloperRead
Destination TOPIC - all topics that Replicator will replicate DeveloperWrite
Destination TOPIC - all topics that Replicator will replicate DeveloperManage

If using the offset translation feature of Replicator (enabled by default):

Cluster Resource Role
Source TOPIC - __consumer_timestamps DeveloperRead
Source TOPIC - __consumer_timestamps DeveloperManage
Destination GROUP - All consumer groups that will be translated (if you do not know these use the literal * to allow all) DeveloperRead

Also, any consumers on the source cluster using the Replicator timestamp interceptor will require:

Cluster Resource Role
Source TOPIC - __consumer_timestamps DeveloperWrite
Source TOPIC - __consumer_timestamps DeveloperManage

For configuration options relating to offset translation see Consumer Offset Translation.

If using the source offset management feature of Replicator (enabled by default):

Cluster Resource Role
Source GROUP - The consumer group name is determined by the Replicator name or by the src.consumer.group.id property ResourceOwner

If using the schema migration feature of Replicator (disabled by default):

Cluster Resource Role
Source TOPIC - underlying Schema Registry topic (default _schemas) DeveloperRead
Destination CLUSTER - Schema Registry cluster ClusterAdmin

For more information on configuring RBAC, see Use Role-Based Access Control (RBAC) for Authorization in Confluent Platform.

Replicating messages with schemas

Replicator does not support an “active-active” Schema Registry setup. It only supports migration (either one-time or continuous) from an active Schema Registry to a passive Schema Registry.

Starting with Confluent Platform 7.0.0, Schema Linking is available in preview on Confluent Platform, as described in Link Schemas on Confluent Platform.This is the recommended method of migrating schemas. For migrating schemas from one Confluent Cloud cluster to another, use cloud specific Schema Linking.

For pre Confluent Platform 7.0.0 releases, use Replicator with Schema Translationto migrate schemas from a self-managed cluster to a target cluster which is either self-managed or in Confluent Cloud. (This was first available in Confluent Platform 5.2.0.)

To learn more about schema migration, see Use Schema Registry to Migrate Schemas in Confluent Platform and Link Schemas on Confluent Platform.

Schema ID Validation and Replicator

By default, Replicator is configured with topic.config.sync=true. If the source cluster has a topic with Validate Broker-side Schemas IDs in Confluent Platform enabled (confluent.value.schema.validation=true), then Replicator will copy this property to the destination cluster’s replicated topic.

When using Replicator to replicate data from one cluster of brokers to another, you would typically want to avoid another validation on the secondary cluster to skip the overhead of doing so.

Therefore, you might want to either disable Schema ID Validation on the source cluster before replicating to the destination, or set topic.config.sync=false on Replicator and explicitly set the configurations you want on the destination cluster broker properties files.

Requirements

From a high level, Replicator works like a consumer group with the partitions of the replicated topics from the source cluster divided between the connector’s tasks. Replicator periodically polls the source cluster for changes to the configuration of replicated topics and the number of partitions, and updates the destination cluster accordingly by creating topics or updating configuration. For this to work correctly, the following is required:

Tip

For best performance, run Replicator as close to the destination cluster as possible to provide a low latency connection for Kafka Connect operations within Replicator.

Compatibility

For data transfer Replicator maintains the same compatibility matrix as Java clients, detailed in Kafka Java clients. However some Replicator features have different compatibility requirements:

Known Issues, Limitations, and Best Practices

Replicator Connector

Replicator is implemented as a Kafka connector, and listed in Supported Connectors. Some general information related to connectors may apply in some cases, but most of the information you’ll need to work with Replicator is in this Replicator specific documentation.

Important

This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional steps required to install.

If you are using Confluent Platform using only Confluent Community components, you can install the connector using the Confluent Hub Client (recommended) or you can manually download the ZIP file.

Getting Started Guides

References

Security

Migration Guides