Running MirrorMaker 2.0 with Confluent's Schema Registry

MirrorMaker 2.0 came with the Kafka 2.4 release. I have not personally used MM1, but I have heard it was not great. MM2 seeks to solve the problems with MM1.

I am using the same local two cluster setup when I tested Brooklin earlier this year.

There are a handful of ways to run MM2, but I am focusing on using the MM2 "cluster" high level configuration file and let MM handle the Connect framework.

MM2's configuration is much simpler than Brooklin and feels more intuitive, as it follows Kafka ecosystem conventions.

clusters = cluster1, cluster2

cluster1.bootstrap.servers = localhost:9093
cluster2.bootstrap.servers = localhost:9095

# enable and configure individual replication flows
cluster1->cluster2.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
cluster1->cluster2.topics = .*

cluster2->cluster1.enabled = true
cluster2->cluster1.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=1

# ssl items for all connectors
# internally, MM2 will make sure these settings are percolated to (source|target).(consumer|producer).X 
security.protocol=ssl

ssl.keystore.location=omitted
ssl.keystore.password=omitted
ssl.key.password=omitted
ssl.truststore.location=omitted
ssl.truststore.password=omitted
ssl.endpoint.identification.algorithm=

ssl.enabled.protocols=TLSv1.2
ssl.keystore.type=JKS
ssl.truststore.type=JKS

Kick off the MM2 cluster using our property file above:

$> connect-mirror-maker config/mm2.properties

Let's write some data to a test topic in cluster 1:

$> kafka-console-producer --bootstrap-server localhost:9093 --producer.config client.properties --topic test1
sampleData
sampleData2

$> kafka-topics --bootstrap-server localhost:9093 --command-config client.properties --list

__consumer_offsets
_schemas
cluster2.checkpoints.internal
cluster2.heartbeats
heartbeats
mm2-configs.cluster2.internal
mm2-offset-syncs.cluster2.internal
mm2-offsets.cluster2.internal
mm2-status.cluster2.internal
test1

Now let's checkout replication in cluster 2:

$> kafka-topics --bootstrap-server localhost:9095 --command-config client.properties --list

__consumer_offsets
cluster1._schemas
cluster1.checkpoints.internal
cluster1.heartbeats
cluster1.test1
heartbeats
mm2-configs.cluster1.internal
mm2-offset-syncs.cluster1.internal
mm2-offsets.cluster1.internal
mm2-status.cluster1.internal

If you don't see "cluster1.test1" immediately, never fear. I have noticed that there is a Scheduler in MM2 and it seems to fire periodically between 5 and 10 minutes. I presume this is configurable, but I haven't tracked it down yet. When new topics are created there is some delay between the time of initial creation/write and replication. Whereas, if topics already exist, replication is nearly instantaneous.

Try to read from the replicated topic:

$> kafka-console-consumer --bootstrap-server localhost:9095 --consumer.config client.properties --topic cluster1.test1 --from-beginning
sampleData
sampleData2

Success!

Now open up another terminal and run both the producer and consumer at the same time. You'll see the replication immediately.

Now that we've done basic read/write, it's time to use the Schema Registry.

The key here, as it was with Brooklin, is that cluster2's SR must be set to slave mode and cluster1's SR be set to master. "master.eligibility=false" for cluster2 and "master.eligibility=true" for cluster1.

Write a message to cluster 1, topic avro-test-topic:

$> kafka-avro-console-producer --broker-list localhost:9093 --topic avro-test-topic --producer.config client.properties --property value.schema='{type:record,name:myrecord,fields:[{name:f1,type:string}]}' --property schema.registry.url=https://localhost:8085
{"f1":"testValue"}

Confirm the regular topic was created in cluster1:

$> kafka-topics --bootstrap-server localhost:9093 --command-config client.properties --list

__consumer_offsets
_schemas
avro-test-topic
cluster2.checkpoints.internal
cluster2.heartbeats
heartbeats
mm2-configs.cluster2.internal
mm2-offset-syncs.cluster2.internal
mm2-offsets.cluster2.internal
mm2-status.cluster2.internal
test1

Confirm it got replicated to cluster2:

$> kafka-topics --bootstrap-server localhost:9095 --command-config client.properties --list

__consumer_offsets
cluster1._schemas
cluster1.avro-test-topic
cluster1.checkpoints.internal
cluster1.heartbeats
cluster1.test1
heartbeats
mm2-configs.cluster1.internal
mm2-offset-syncs.cluster1.internal
mm2-offsets.cluster1.internal
mm2-status.cluster1.internal

Read from the replicated topic:

$> kafka-avro-console-consumer --bootstrap-server localhost:9095 --topic cluster1.avro-test-topic --from-beginning --consumer.config client.properties --property schema.registry.url=https://localhost:8087
{"f1":"testValue"}

Success!