Dynamically Updating Kafka Cluster Settings with the Java API
Kafka comes with a handful of bash scripts that wrap Scala code to perform a variety of actions on a Kafka cluster. kafka-configs adds/deletes dynamically available configuration items.
There are three tiers of config items in Kafka:
- read-only: read from server.propeties on start up
- per-broker: can be set on a per broker basis
- cluster-wide: applies to all brokers in the cluster
Note that cluster wide options can also be applied at a per broker level. As Kafka documentation states, this is generally only for testing. It has not been proven useful in practice yet.
Per broker updates specify a broker id to kafka-configs. In order to apply something cluster wide, the --entity-default flag. This is peculiarly named and it will be relevant later on.
The command looks something like this
# single broker
kafka-configs --boostrap-server localhost:9092 --command-config /path/to/properties --alter --entity-type broker --entity-name 0 --add-config compression.type=none
# cluster wide
kafka-configs --boostrap-server localhost:9092 --command-config /path/to/properties --alter --entity-type broker --entity-default --add-config compression.type=none
Updating configuration dynamically via the Java Admin API is relatively straight forward:
private String performEntityConfigAdd(String host, String entity, ConfigResource.Type entityType,
Map<String, String> addConfig) {
String returnMsg = "SUCCESS";
try {
getAdminClient(host);
Map<ConfigResource, Collection<AlterConfigOp>> entityUpdateMap = new HashMap<>();
ConfigResource configResource = new ConfigResource(entityType, entity);
List<AlterConfigOp> alterConfigOps =
addConfig.entrySet().stream()
.map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET))
.collect(Collectors.toList());
entityUpdateMap.put(configResource, alterConfigOps);
adminClient.incrementalAlterConfigs(entityUpdateMap,
new AlterConfigsOptions().timeoutMs(TIMEOUT_MS)).all().get();
}
catch (InterruptedException | ExecutionException e) {
returnMsg = e.getMessage();
LOG.error(returnMsg);
adminClient = null;
}
return returnMsg;
}
The above code snippet is generic and could be used for either a Topic or a Broker. What is not intuitive is how to actually have this apply cluster wide to all brokers. There is no mechanism in the API to make such a call, the above is all we have. The epiphany came looking at ConfigResource, specifically isDefault():
/**
* Returns true if this is the default resource of a resource type.
* Resource name is empty for the default resource.
*/
public boolean isDefault() {
return name.isEmpty();
}
Called from here KafkaAdminClient:
/**
* Returns the broker id pertaining to the given resource, or null if the resource is not associated
* with a particular broker.
*/
private Integer nodeFor(ConfigResource resource) {
if ((resource.type() == ConfigResource.Type.BROKER && !resource.isDefault())
|| resource.type() == ConfigResource.Type.BROKER_LOGGER) {
return Integer.valueOf(resource.name());
} else {
return null;
}
}
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
final AlterConfigsOptions options) {
final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
// We must make a separate AlterConfigs request for every BROKER resource we want to alter
// and send the request to that specific broker. Other resources are grouped together into
// a single request that may be sent to any broker.
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
Integer node = nodeFor(resource);
if (node != null) {
NodeProvider nodeProvider = new ConstantNodeIdProvider(node);
allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
unifiedRequestResources.add(resource);
}
if (!unifiedRequestResources.isEmpty())
allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
return new AlterConfigsResult(new HashMap<>(allFutures));
}
What is a default resource? Metadata for the kafka cluster is stored in ZooKeeper. For broker configuration the znode paths are:
/config/broker/brokerId
/config/broker/<default>
From the kafka documentation:
All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing). If a config value is defined at different levels, the following order of precedence is used:
- Dynamic per-broker config stored in ZooKeeper
- Dynamic cluster-wide default config stored in ZooKeeper
- Static broker config from server.properties
- Kafka default, see broker configs
In order to specify a configuration at the cluster level, we must pass in an empty string to ConfigResource.
That is so unintuitive, I imagine the Java API must have come much later in the ecosystem and they kept the convention from the shell script and the Scala code.