Navigating Compatibility: A Guide to Kafka Broker API Versions


Apache Kafka, renowned for its distributed streaming capabilities, relies on a well-defined set of APIs to facilitate communication between clients and brokers. Understanding the compatibility between Kafka clients and broker API versions is crucial for maintaining a stable and efficient streaming environment. In this blog post, we’ll delve into the realm of Kafka Broker API versions, exploring their significance, how to check them, and providing practical examples for seamless integration.

In this tutorial, we will describe and how to check compatibility between Kafka clients and broker versions. We hope to clear up common misunderstandings along the way, such as whether or not clients and brokers need to be running the same version?

One more thing before we begin, if you are looking to learn how to determine which Kafka version you are running and not API compatibility between clients and brokers, you may find the determine Kafka version tutorial more helpful than this one.

Table of Contents

Kafka Broker API Version Overview

The Role of Kafka Broker API Versions:

Kafka Broker API versions play a pivotal role in ensuring that clients and brokers can communicate effectively. As the Kafka ecosystem evolves, new features and improvements are introduced, leading to updates in the API specifications. Each API version corresponds to a set of features, and it’s essential for clients to be compatible with the broker’s supported API versions to prevent potential communication issues. Otherwise, without compatibility of different versions of brokers and clients, there would be a tight coupling between clients and versions which would make operations from a downtime perspective; i.e. you would have to maintain the same version of clients and brokers at all times.

Checking Kafka Broker API Versions:

The kafka-broker-api-versions.sh script serves as a valuable tool for checking the API versions supported by a Kafka broker. This script is available in the bin/ directory of Apache Kafka download.

Let’s explore how to use this script with a practical example:

./kafka-broker-api-versions.sh --bootstrap-server localhost:9092

This script connects to the specified Kafka broker (localhost:9092 in the example) and retrieves detailed information about the supported API versions. The output includes details such as api_key, min_version, and max_version for each API, providing a comprehensive overview of the broker’s capabilities.

Understanding the Output:

The output of the script resembles the following JSON structure:

{
  "controllerId": 0,
  "versions": [
    {
      "api_key": 0,
      "min_version": 0,
      "max_version": 5
    },
    {
      "api_key": 1,
      "min_version": 0,
      "max_version": 3
    }, 
...
  ]
}

  • controllerId: Represents the ID of the controller broker.
  • versions: An array containing information about each supported API version.
    • api_key: Identifies the API.
    • min_version: Specifies the minimum supported protocol version.
    • max_version: Specifies the maximum supported protocol version.

Now, that’s what output may suppose to look like above, but as we’ll see in examples below, it’s a bit different in reality. But the general idea is still the same, so let’s go with it for now.

Ensuring Compatibility:

  1. Client Version Check:
    • Before deploying or upgrading Kafka clients, run the kafka-broker-api-versions.sh script to check the broker’s supported API versions.
    • Ensure that the client version aligns with the broker’s min_version and max_version for each API.
  2. Upgrading Kafka Brokers:
    • When upgrading Kafka brokers, be aware of changes in API versions.
    • Gradually update brokers to minimize downtime and ensure a smooth transition.
  3. Monitoring and Alerts:
    • You may wish to implement monitoring solutions about any discrepancies in API versions between clients and brokers. Check JMX metrics starting with “MessageConversions*”; i.e. MessageConversionsPerSec. As described below forward and backward compatibility between clients and brokers is a feature, but there is a translation cost in the conversion. It’s usually not huge, but you can keep an eye on it in larger environments.
Kafka Broker API Version Example Walkthrough with video

Kafka Broker API Version Example

To cover the various examples of Kafka topic administrative tasks, we need Kafka cluster.  Shocking, I know. There are various ways we can do this, but let’s go with the most simple which for me, means running a Kafka cluster in docker using docker-compose.

If you want to follow along with examples below, here are the summarized requirements and steps.

1. Requirements

If you wish to follow along, here are the requirements to complete this example in your own environment.

  1. Download and extract Apache Kafka so we have access to kafka-topics.sh command in the bin/ directory (Verification will be shown below). Any Kafka version >= 2.0 should be fine. We’re going to use two different client versions in examples below.
  2. Start Docker (if it is not already running)
  3. git clone https://github.com/supergloo/kafka-examples/
  4. cd kafka-examples/kafka-broker-api-version
  5. docker-compose -f kafka-kafka-broker-api-version-example.yml up -d (wait 5-30 seconds for everything to come online) (Again, video walk through shown below)

Here’s an example on Mac OS to show Docker is running

Docker is running for Kafka broker api examples

Here is an example of my two terminals with docker-compose and running the kafka-broker-api-versions.sh command.

kafka-broker-api-versions.sh example 1

2. Example kafka-broker-api-versions.sh client commands

In this example, let’s use a 3.3 client with a 2.8 version broker. The “kafka-broker-api-version-example.yml” docker-compose file we are using is image wurstmeister/kafka:2.13-2.8.1 or in short Kafka 2.8.1.

When I use a 3.3 client, I see the following output

$ pwd
/Users/toddmcg/dev/kafka_2.13-3.3.2/bin
$ ./kafka-broker-api-versions.sh --bootstrap-server localhost:9092
localhost:9092 (id: 1001 rack: null) -> (
	Produce(0): 0 to 9 [usable: 9],
	Fetch(1): 0 to 12 [usable: 12],
	ListOffsets(2): 0 to 6 [usable: 6],
	Metadata(3): 0 to 11 [usable: 11],
	LeaderAndIsr(4): 0 to 5 [usable: 5],
	StopReplica(5): 0 to 3 [usable: 3],
	UpdateMetadata(6): 0 to 7 [usable: 7],
	ControlledShutdown(7): 0 to 3 [usable: 3],
	OffsetCommit(8): 0 to 8 [usable: 8],
	OffsetFetch(9): 0 to 7 [usable: 7],
	FindCoordinator(10): 0 to 3 [usable: 3],
	JoinGroup(11): 0 to 7 [usable: 7],
	Heartbeat(12): 0 to 4 [usable: 4],
	LeaveGroup(13): 0 to 4 [usable: 4],
	SyncGroup(14): 0 to 5 [usable: 5],
	DescribeGroups(15): 0 to 5 [usable: 5],
	ListGroups(16): 0 to 4 [usable: 4],
	SaslHandshake(17): 0 to 1 [usable: 1],
	ApiVersions(18): 0 to 3 [usable: 3],
	CreateTopics(19): 0 to 7 [usable: 7],
	DeleteTopics(20): 0 to 6 [usable: 6],
	DeleteRecords(21): 0 to 2 [usable: 2],
	InitProducerId(22): 0 to 4 [usable: 4],
	OffsetForLeaderEpoch(23): 0 to 4 [usable: 4],
	AddPartitionsToTxn(24): 0 to 3 [usable: 3],
	AddOffsetsToTxn(25): 0 to 3 [usable: 3],
	EndTxn(26): 0 to 3 [usable: 3],
	WriteTxnMarkers(27): 0 to 1 [usable: 1],
	TxnOffsetCommit(28): 0 to 3 [usable: 3],
	DescribeAcls(29): 0 to 2 [usable: 2],
	CreateAcls(30): 0 to 2 [usable: 2],
	DeleteAcls(31): 0 to 2 [usable: 2],
	DescribeConfigs(32): 0 to 4 [usable: 4],
	AlterConfigs(33): 0 to 2 [usable: 2],
	AlterReplicaLogDirs(34): 0 to 2 [usable: 2],
	DescribeLogDirs(35): 0 to 2 [usable: 2],
	SaslAuthenticate(36): 0 to 2 [usable: 2],
	CreatePartitions(37): 0 to 3 [usable: 3],
	CreateDelegationToken(38): 0 to 2 [usable: 2],
	RenewDelegationToken(39): 0 to 2 [usable: 2],
	ExpireDelegationToken(40): 0 to 2 [usable: 2],
	DescribeDelegationToken(41): 0 to 2 [usable: 2],
	DeleteGroups(42): 0 to 2 [usable: 2],
	ElectLeaders(43): 0 to 2 [usable: 2],
	IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
	AlterPartitionReassignments(45): 0 [usable: 0],
	ListPartitionReassignments(46): 0 [usable: 0],
	OffsetDelete(47): 0 [usable: 0],
	DescribeClientQuotas(48): 0 to 1 [usable: 1],
	AlterClientQuotas(49): 0 to 1 [usable: 1],
	DescribeUserScramCredentials(50): 0 [usable: 0],
	AlterUserScramCredentials(51): 0 [usable: 0],
	AlterPartition(56): 0 [usable: 0],
	UpdateFeatures(57): 0 [usable: 0],
	DescribeCluster(60): 0 [usable: 0],
	DescribeProducers(61): 0 [usable: 0],
	DescribeTransactions(65): UNSUPPORTED,
	ListTransactions(66): UNSUPPORTED,
	AllocateProducerIds(67): UNSUPPORTED
)

We will break down this output more in a short while, but for now, I’d call your attention to the end of the output where there are capabilities listed as “UNSUPPORTED”. You can probably guess what this means when we are using a new version of a client and an older version of a broker. In this case, client version 3.3.2 with broker version 2.8.1.

Let’s make another comparison with a 2.4 client version and still use the 2.8.1 version of broker.

$ pwd
/Users/toddmcg/dev/kafka_2.11-2.4.1/bin
$ ./kafka-broker-api-versions.sh --bootstrap-server localhost:9092
localhost:9092 (id: 1001 rack: null) -> (
	Produce(0): 0 to 9 [usable: 8],
	Fetch(1): 0 to 12 [usable: 11],
	ListOffsets(2): 0 to 6 [usable: 5],
	Metadata(3): 0 to 11 [usable: 9],
	LeaderAndIsr(4): 0 to 5 [usable: 4],
	StopReplica(5): 0 to 3 [usable: 2],
	UpdateMetadata(6): 0 to 7 [usable: 6],
	ControlledShutdown(7): 0 to 3 [usable: 3],
	OffsetCommit(8): 0 to 8 [usable: 8],
	OffsetFetch(9): 0 to 7 [usable: 6],
	FindCoordinator(10): 0 to 3 [usable: 3],
	JoinGroup(11): 0 to 7 [usable: 6],
	Heartbeat(12): 0 to 4 [usable: 4],
	LeaveGroup(13): 0 to 4 [usable: 4],
	SyncGroup(14): 0 to 5 [usable: 4],
	DescribeGroups(15): 0 to 5 [usable: 5],
	ListGroups(16): 0 to 4 [usable: 3],
	SaslHandshake(17): 0 to 1 [usable: 1],
	ApiVersions(18): 0 to 3 [usable: 3],
	CreateTopics(19): 0 to 7 [usable: 5],
	DeleteTopics(20): 0 to 6 [usable: 4],
	DeleteRecords(21): 0 to 2 [usable: 1],
	InitProducerId(22): 0 to 4 [usable: 2],
	OffsetForLeaderEpoch(23): 0 to 4 [usable: 3],
	AddPartitionsToTxn(24): 0 to 3 [usable: 1],
	AddOffsetsToTxn(25): 0 to 3 [usable: 1],
	EndTxn(26): 0 to 3 [usable: 1],
	WriteTxnMarkers(27): 0 to 1 [usable: 0],
	TxnOffsetCommit(28): 0 to 3 [usable: 2],
	DescribeAcls(29): 0 to 2 [usable: 1],
	CreateAcls(30): 0 to 2 [usable: 1],
	DeleteAcls(31): 0 to 2 [usable: 1],
	DescribeConfigs(32): 0 to 4 [usable: 2],
	AlterConfigs(33): 0 to 2 [usable: 1],
	AlterReplicaLogDirs(34): 0 to 2 [usable: 1],
	DescribeLogDirs(35): 0 to 2 [usable: 1],
	SaslAuthenticate(36): 0 to 2 [usable: 1],
	CreatePartitions(37): 0 to 3 [usable: 1],
	CreateDelegationToken(38): 0 to 2 [usable: 2],
	RenewDelegationToken(39): 0 to 2 [usable: 1],
	ExpireDelegationToken(40): 0 to 2 [usable: 1],
	DescribeDelegationToken(41): 0 to 2 [usable: 1],
	DeleteGroups(42): 0 to 2 [usable: 2],
	ElectLeaders(43): 0 to 2 [usable: 2],
	IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
	AlterPartitionReassignments(45): 0 [usable: 0],
	ListPartitionReassignments(46): 0 [usable: 0],
	OffsetDelete(47): 0 [usable: 0],
	UNKNOWN(48): 0 to 1,
	UNKNOWN(49): 0 to 1,
	UNKNOWN(50): 0,
	UNKNOWN(51): 0,
	UNKNOWN(56): 0,
	UNKNOWN(57): 0,
	UNKNOWN(60): 0,
	UNKNOWN(61): 0
)

Again, we’ll break down the output more next, but for now, notice the “UNKNOWN” lines at the end. Again, hopefully as expected, the results show the older version of the client does not know some of the newer features of the broker.

Here’s a short video of me running through these commands showing differences.

Screencast running through different steps to determine broker and client api differences

3. Understanding the kafka-broker-api-verions.sh output

Ok, at this point we’ve seen differences in outputs when using different versions of clients and brokers, but what about the other output.

The best way to describe is go through some examples, so here we go. In one of the outputs above, we see something like:

	AddOffsetsToTxn(25): 0 to 3 [usable: 1],
	EndTxn(26): 0 to 3 [usable: 1],
	WriteTxnMarkers(27): 0 to 1 [usable: 0],
	TxnOffsetCommit(28): 0 to 3 [usable: 2],

Each one of these items represent particular functionality which has associated versions and whether or not the client can support it.

For example, AddOffsetsToTxn has 4 versions of which the client can use 1. This exact same for next line, EndTxn and so on. So where can we learn more about these feature flags?

Answer: start here https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/README.md

and open the associated JSON files.

For example, for AddOffsetsToTxn, see https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/AddOffsetsToTxnResponse.json

In the Response JSON files, you’ll see more info on the API key and different version capabilities such as:

{
  "apiKey": 25,
  "type": "response",
  "name": "AddOffsetsToTxnResponse",
  // Starting in version 1, on quota violation brokers send out responses before throttling.
  //
  // Version 2 adds the support for new error code PRODUCER_FENCED.
  //
  // Version 3 enables flexible versions.
  "validVersions": "0-3",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The response error code, or 0 if there was no error." }
  ]
} 

For those who want to dive deeper, the README.md file linked above is a good resource.

Frequently Asked Questions

How can I check client version of kafka-broker-api-versions.sh?

Add the –version flag. For example, in 2.4 client example:

$ ./kafka-broker-api-versions.sh --bootstrap-server localhost:9092 --version
2.4.1 (Commit:c57222ae8cd7866b)
$ pwd
/Users/toddmcg/dev/kafka_2.11-2.4.1/bin

How can I check the broker version using kafka-broker-api-versions.sh?

You can’t as far as I know. But, there is a check kafka version tutorial you may be interested in.

Is there a Windows version of kafka-broker-api-versions.sh?

Yes, check in the bat/ directory and you’ll find a kafka-broker-api-versions.bat file. As with most CLI scripts, both the .bat and .sh scripts end up calling a Java or Scala class. For example, if you view the source of the scripts, you’ll see something similar to the folowing:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.BrokerApiVersionsCommand "$@"

and you can go view the source code of kafka.admin.BrokerApiVersionsCommand class if you’d like.

Can Kafka clients and brokers convert messages between different versions?

As we saw in examples show above, there is support for compatibility between different versions of clients and brokers.

In Apache Kafka, there is support for backward and forward compatibility, meaning that newer versions of Kafka brokers can generally accept messages from older versions of clients and vice versa.

Kafka relies on a concept known as version negotiation to determine the highest protocol version that is mutually supported by both the client and the broker.

Here’s how version negotiation works in Kafka:

  1. Client Requests Connection:
    • When a client establishes a connection to a Kafka broker, it includes the version it supports in its request.
  2. Broker Evaluates Supported Versions:
    • The Kafka broker evaluates the supported versions based on the client’s request and its own capabilities.
  3. Negotiation for Highest Compatible Version:
    • The broker and the client negotiate to determine the highest protocol version that both support.
  4. Communication Using Negotiated Version:
    • The client and broker then communicate using the negotiated version of the protocol.

This version negotiation mechanism allows Kafka to maintain backward compatibility, enabling newer clients to communicate with older brokers and vice versa.

As you’d expect, certain features or optimizations introduced in newer versions may not be available when communicating with older versions. We saw this in examples above.

While Kafka supports version compatibility, it’s generally recommended to keep clients and brokers at a similar version to take advantage of the latest features and improvements. Regularly updating both clients and brokers ensures that you benefit from bug fixes, performance enhancements, and new functionality.

If there’s a need to transition to a new version, follow a phased approach by updating clients and brokers gradually to minimize potential compatibility issues and disruptions in the Kafka cluster. Additionally, thorough testing in a non-production environment is crucial to identify and address any compatibility challenges before rolling out updates in a production setting.

In Kafka, should I upgrade brokers or client versions first?

Ok, now that we know there is forward and backward compatibility between brokers and clients, which should you upgrade first? According to Kafka documentation at https://kafka.apache.org/protocol.html#protocol_compatibility, “The intended upgrade path is that new features would first be rolled out on the server (with the older clients not making use of them) and then as newer clients are deployed these new features would gradually be taken advantage of”

Conclusion

Understanding Kafka Broker API versions is integral to maintaining a robust and scalable streaming infrastructure. The kafka-broker-api-versions.sh script serves as a valuable ally in ensuring that clients and brokers seamlessly communicate by providing insights into the supported API versions. By incorporating these practices into your Kafka management routine, you can navigate the dynamic landscape of API evolution and guarantee a reliable streaming experience for your applications.

See also  Kafka Producer in Scala
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment