Kafka comes with a handful of bash scripts that wrap Scala code to perform a variety of actions on a Kafka cluster. kafka-configs adds/deletes dynamically available configuration items.

There are three tiers of config items in Kafka:

  • read-only: read from server.propeties on start up
  • per-broker: can be set on a per broker basis
  • cluster-wide: applies to all brokers in the cluster

Note that cluster wide options can also be applied at a per broker level. As Kafka documentation states, this is generally only for testing. It has not been proven useful in practice yet.

Per broker updates specify a broker id to kafka-configs. In order to apply something cluster wide, the --entity-default flag. This is peculiarly named and it will be relevant later on.

The command looks something like this

# single broker
kafka-configs --boostrap-server localhost:9092 --command-config /path/to/properties --alter --entity-type broker --entity-name 0 --add-config compression.type=none
# cluster wide
kafka-configs --boostrap-server localhost:9092 --command-config /path/to/properties --alter --entity-type broker --entity-default --add-config compression.type=none

Updating configuration dynamically via the Java Admin API is relatively straight forward:

private String performEntityConfigAdd(String host, String entity, ConfigResource.Type entityType,
                             Map<String, String> addConfig) {
   String returnMsg = "SUCCESS";
   try {
      getAdminClient(host);
      Map<ConfigResource, Collection<AlterConfigOp>> entityUpdateMap = new HashMap<>();
      ConfigResource configResource = new ConfigResource(entityType, entity);
      List<AlterConfigOp> alterConfigOps =
            addConfig.entrySet().stream()
                  .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET))
                  .collect(Collectors.toList());
      entityUpdateMap.put(configResource, alterConfigOps);
      adminClient.incrementalAlterConfigs(entityUpdateMap,
            new AlterConfigsOptions().timeoutMs(TIMEOUT_MS)).all().get();
   }
   catch (InterruptedException | ExecutionException e) {
      returnMsg = e.getMessage();
      LOG.error(returnMsg);
      adminClient = null;
   }
   return returnMsg;
}

The above code snippet is generic and could be used for either a Topic or a Broker. What is not intuitive is how to actually have this apply cluster wide to all brokers. There is no mechanism in the API to make such a call, the above is all we have. The epiphany came looking at ConfigResource, specifically isDefault():

    /**
     * Returns true if this is the default resource of a resource type.
     * Resource name is empty for the default resource.
     */
    public boolean isDefault() {
        return name.isEmpty();
    }

Called from here KafkaAdminClient:

/**  
 * Returns the broker id pertaining to the given resource, or null if the resource is not associated
 * with a particular broker.
 */
private Integer nodeFor(ConfigResource resource) {
    if ((resource.type() == ConfigResource.Type.BROKER && !resource.isDefault())
            || resource.type() == ConfigResource.Type.BROKER_LOGGER) {
        return Integer.valueOf(resource.name());
    } else {
        return null;
    }
}
@Override
public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs,
                                                             final AlterConfigsOptions options) {
    final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
    // We must make a separate AlterConfigs request for every BROKER resource we want to alter
    // and send the request to that specific broker. Other resources are grouped together into
    // a single request that may be sent to any broker.
    final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();

    for (ConfigResource resource : configs.keySet()) {
        Integer node = nodeFor(resource);
        if (node != null) {
            NodeProvider nodeProvider = new ConstantNodeIdProvider(node);
            allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
        } else 
            unifiedRequestResources.add(resource);
    }
    if (!unifiedRequestResources.isEmpty())
        allFutures.putAll(incrementalAlterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));

    return new AlterConfigsResult(new HashMap<>(allFutures));
} 

What is a default resource? Metadata for the kafka cluster is stored in ZooKeeper. For broker configuration the znode paths are:

/config/broker/brokerId
/config/broker/<default>

From the kafka documentation:

All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing). If a config value is defined at different levels, the following order of precedence is used:

  • Dynamic per-broker config stored in ZooKeeper
  • Dynamic cluster-wide default config stored in ZooKeeper
  • Static broker config from server.properties
  • Kafka default, see broker configs

In order to specify a configuration at the cluster level, we must pass in an empty string to ConfigResource.

That is so unintuitive, I imagine the Java API must have come much later in the ecosystem and they kept the convention from the shell script and the Scala code.