Running Brooklin with Confluent's Schema Registry
Seems common to purchase a domain and then do nothing with it for a year, so it's time to start migrating from my previous blog home of 11 years to a platform with more flexibility and tooling.
Following up on this post where we set up brooklin to mirror Kafka clusters with SSL enabled. At the end of the post, I was lamenting the difficulty that came with trying to do this with Confluent Schema Registry backed topics.
A cursory google search on this problem yielded no real promising results. A hodgepodge of suggestions from mirroring the _schema topic to using a Schema Registry SMT abounded. Surely this is a solved problem?
Turns out it is possible, but not in an intuitive way. Consider this image from Confluent on how they suggest to do this with their enterprise licensed Replicator:
We're swapping out Replicator for Brooklin, but the concept for the one way mirror is the same. We make Schema Registry B a slave to Schema Registry A. Both of them point use Kafka based leader election pointing to Kafka A. At this point, Brooklin can do the copy from A to B with no issue.
The other confusing issue is that you do not need serializers set for Brooklin. The Avro encoded data has a well defined wire protocol. Brooklin passes this byte[] into its envelope and hands it off on the other end. There was a pull request to allow other serializers/deserializers, but I cannot figure out why anyone would want or need to do this. In fact, it threw me off the trail for most of the afternoon trying to get the normal usage of KafkaAvroSerializer/Deserializer to work with Brooklin.
Using the above KafkaAvro classes, the consumer would deserialize it into a GenericRecord and then the BrooklinEnvelope would throw away the payload as it was not an instance of byte[]. On the producer side, it would try to serialize the already serialized byte array resulting in a "bytes" schema being generated and trying to overwrite the existing schema.
Here is a sample working config for brooklin server.properties:
############################# Server Basics #############################
brooklin.server.coordinator.cluster=brooklin-cluster
brooklin.server.coordinator.zkAddress=localhost:2181
brooklin.server.httpPort=32311
brooklin.server.connectorNames=file,test,kafkaMirroringConnector
brooklin.server.transportProviderNames=kafkaTransportProvider
brooklin.server.csvMetricsDir=/tmp/brooklin-example/
########################### Transport provider configs ######################
# This is the destination Kafka Server
# Source server info is provided via REST interface when connected
# two mini cluster setup src 9093, dest is 9095
brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=localhost:9095
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer
brooklin.server.transportProvider.kafkaTransportProvider.security.protocol=ssl
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.key.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.location=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.password=omitted
brooklin.server.transportProvider.kafkaTransportProvider.ssl.endpoint.identification.algorithm=
brooklin.server.transportProvider.kafkaTransportProvider.ssl.enabled.protocols=TLSv1.2
brooklin.server.transportProvider.kafkaTransportProvider.ssl.keystore.type=JKS
brooklin.server.transportProvider.kafkaTransportProvider.ssl.truststore.type=JKS
########################### Kafka Mirroring connector Configs ######################
# Source Kafka cluster consumer settings, must have "consumer" prepended to each actual Kafka Consumer configuration item.
# bootstrap servers are discovered and parsed from the REST call -s item
brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.kafkaMirroringConnector.consumer.security.protocol=ssl
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.location=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.key.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.location=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.password=omitted
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.endpoint.identification.algorithm=
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.enabled.protocols=TLSv1.2
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.keystore.type=JKS
brooklin.server.connector.kafkaMirroringConnector.consumer.ssl.truststore.type=JKS
# currently unused connectors, but kept for posterity
########################### File connector Configs ######################
brooklin.server.connector.file.factoryClassName=com.linkedin.datastream.connectors.file.FileConnectorFactory
brooklin.server.connector.file.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
brooklin.server.connector.file.strategy.maxTasks=1
########################### Test event producing connector Configs ######################
brooklin.server.connector.test.factoryClassName=com.linkedin.datastream.connectors.TestEventProducingConnectorFactory
brooklin.server.connector.test.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.LoadbalancingStrategyFactory
brooklin.server.connector.test.strategy.TasksPerDatastream = 4