How do Kafka administrators perform administrative and diagnostic collection actions of Kafka Consumer Groups?
This post explores a Kafka Groups operations admin tool called
kafka-consumer-groups.sh. This popular, command-line tool included in Apache Kafka distributions. There are other examples of both open source and 3rd party tools not included with Apache Kafka which can also be used for Kafka Consumer Group administrative tasks, but for this post, let’s focus on
kafka-consumer-groups.sh. The concepts are what matter because they can easily be applied to other tools.
First, please recall Kafka Consumer Groups present a mechanism to distribute consumption from Apache Kafka across multiple individual consumers collaborating together. It is implementation of parallelism or how to scale out. Multiple Consumer Groups can be separated and still subscribe to the same topics as well. The Kafka Consumer and Kafka Consumer Group orchestration is built-into Kafka, and unlike other distributed systems, does not require a 3rd party orchestrator.
If you are not already familiar with basic concepts of Kafka Consumer Groups, please read the what are Kafka Consumer Groups tutorial before proceeding here. This tutorial assumes you are familiar with Kafka Consumer Group functionality such as how consumers are assigned to partitions.
In this tutorial, we will start with an overview and then describe the environment setup used to present the examples. Then, we will go through a variety of specific examples.
Table of Contents
- Listing all Kafka Consumer Groups Example
- Describe a Kafka Consumer Group Example
- Resetting a Kafka Consumer Group Example
- Deleting Kafka Consumer Group Example
kafka-consumer-groups.sh is a command line tool distributed with Apache Kafka. It is found in the
bin/ directory. At a high level, the tool describes itself as helping “list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets”. Sounds good. Let’s do that.
Kafka Consumer Groups Example Environment Setup
To cover the various examples of Consumer Group administrative tasks, we need Kafka cluster with at least one topic with data. There are various ways we can do this including a fairly recent tutorial on Kafka Test Generation for Joins, but in this tutorial, let’s use the utility
kcat (previously known as
kafkacat) and Docker-ized Kafka cluster. Both of these have been covered on other tutorials on this site, but let me know if you have any questions.
- Download and extract Apache Kafka so we have access to
kafka-consumer-groups.shcommand in the bin directory
kcatif not already installed from https://github.com/edenhill/kcat
- Start Docker (if it is not already running)
- git clone https://github.com/conduktor/kafka-stack-docker-compose.git
- cd kafka-stack-docker-compose
- docker-compose -f zk-single-kafka-multiple.yml up -d (wait 5-30 seconds for everything to come online)
- Verify steps 1-5 with
kcat -L -b localhost:9092to ensure we are ready to proceed
~/dev/kafka-stack-docker-compose [master] $ docker-compose -f zk-single-kafka-multiple.yml up -d Creating network "kafka-stack-docker-compose_default" with the default driver Creating zoo1 ... done Creating kafka1 ... done Creating kafka3 ... done Creating kafka2 ... done ~/dev/kafka-stack-docker-compose [master] $ kcat -L -b localhost:9092 Metadata for all topics (from broker -1: localhost:9092/bootstrap): 3 brokers: broker 2 at 127.0.0.1:9093 broker 3 at 127.0.0.1:9094 broker 1 at 127.0.0.1:9092 (controller) 0 topics: ~/dev/kafka-stack-docker-compose [master] $
Also, I have downloaded and extracted Apache Kafka into my ~/dev directory as shown next.
~/dev/kafka-stack-docker-compose [master] $ ls ~/dev/kafka_2.11-2.4.1/bin config.out kafka-consumer-groups.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh connect-distributed.sh kafka-consumer-perf-test.sh kafka-reassign-partitions.sh trogdor.sh connect-mirror-maker.sh kafka-delegation-tokens.sh kafka-replica-verification.sh windows connect-standalone.sh kafka-delete-records.sh kafka-run-class.sh zookeeper-security-migration.sh kafka-acls.sh kafka-dump-log.sh kafka-server-start.sh zookeeper-server-start.sh kafka-broker-api-versions.sh kafka-leader-election.sh kafka-server-stop.sh zookeeper-server-stop.sh kafka-configs.sh kafka-log-dirs.sh kafka-streams-application-reset.sh zookeeper-shell.sh kafka-console-consumer.sh kafka-mirror-maker.sh kafka-topics.sh kafka-console-producer.sh kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh
Ok, next step is creating a topic and pipe some events to it. We need this so our Kafka Consumers subscribe to it and run our consumer group administration examples. Let’s do that now.
Let’s keep it simple and pipe an existing log file to a new topic called
First, let’s create the
syslog topic with 3 partitions.
~/dev/kafka_2.11-2.4.1/bin $ ./kafka-topics.sh --create --topic syslog --partitions 3 --bootstrap-server localhost:9092
Next, let’s pipe data to the
syslog topic. I’m running on a Mac, so I have a /var/log/system.log file. Depending on your setup, you may need to use a different log file. Any log file will do, as the actual data in these examples do not matter. We only care about the offset locations.
To create the
syslog topic, I ran
cat /var/log/system.log | kcat -b localhost:9092 -t syslog as shown in the following and then exited with Control-C after a few seconds. As you can see, I now have a
syslog topic after listing topics by running
kcat -L -b localhost:9092.
~/dev/kafka-stack-docker-compose [master] $ cat /var/log/system.log | kcat -b localhost:9092 -t syslog % Auto-selecting Producer mode (use -P or -C to override) ^C% ERROR: Program terminated while producing message of 109 bytes ~/dev/kafka-stack-docker-compose [master] $ kcat -L -b localhost:9092 Metadata for all topics (from broker -1: localhost:9092/bootstrap): 3 brokers: broker 2 at 127.0.0.1:9093 broker 3 at 127.0.0.1:9094 (controller) broker 1 at 127.0.0.1:9092 1 topics: topic "syslog" with 3 partitions: partition 0, leader 1, replicas: 1, isrs: 1 partition 1, leader 2, replicas: 2, isrs: 2 partition 2, leader 3, replicas: 3, isrs: 3 ~/dev/kafka-stack-docker-compose [master] $
And finally last step is to show how I’m going to be running various Kafka Consumers examples in order to show the
Let’s start with a screenshot of terminal tabs
I’m going to have 3 terminal tabs open to start. In tab 3, I’m in the
kafka_2.11-2.4.1/bin directory to execute the
kafka-consumer-groups.sh examples. In tab 1 and tab 2, I’ll be running Kafka Consumers via
kcat, but you could use something like
kafka-console-consumer.sh too if you’d like. I just wanted to let you know. You don’t have to do it this way. You can do it your way. It’s your call because YOU are a big time blog reader. Do what you do.
Listing all Kafka Consumer Groups Example
In both tab 1 and tab 2, I’m running
kcat -b localhost:9092 -G mygroup -o beginning syslog which will creates two consumers in the “mygroup” Consumer Group.
In tab 3, let’s list all the consumer groups
~/dev/kafka_2.11-2.4.1/bin $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list mygroup
Ok, we see we have one group called “mygroup”, but can we find out anymore?
Describe a Kafka Consumer Group Example
Let’s find out more about the “mygroup”.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup --members GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS mygroup rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1 rdkafka 2 mygroup rdkafka-d3a68893-29d6-4080-80bd-1c0330cb1a69 /172.22.0.1 rdkafka 1
--members options, along with the specific
--group we can now see quite a bit more such as knowing there are two active Consumers in the group and which partitions each consumer is assigned to. Consumer-ID rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf is assigned to 2 partitions while the other Consumer is assigned to 1.
For review of how consumers are assigned to partitions when running in groups, see the previous tutorial on Kafka Consumer Group fundamentals.
If we don’t pass in the
--members arg, we can see different info for each consumer.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup Picked up JAVA_TOOL_OPTIONS: -Dlog4j2.formatMsgNoLookups=true GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID mygroup syslog 0 - 0 - rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1 rdkafka mygroup syslog 1 89 89 0 rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1 rdkafka mygroup syslog 2 - 0 - rdkafka-d3a68893-29d6-4080-80bd-1c0330cb1a69 /172.22.0.1 rdkafka
Resetting a Kafka Consumer Group Example
From the previous example, we can see the
mygroup has no lag. We see it two different ways: LAG = 0 and CURRENT-OFFSET = LOG-ENG-OFFSET. How do we reset the consumption back to the beginning of the topic without restarting either of the two consumers? Let’s try it.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --execute --reset-offsets --to-earliest --all-topics Error: Assignments can only be reset if the group 'mygroup' is inactive, but the current state is Stable. GROUP TOPIC PARTITION NEW-OFFSET
Ok, looks like we cannot and consumers need to be stopped. So, I’m going to go to tab 1 and tab 2 and stop both consumers.
Now, when I try to reset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --execute --reset-offsets --to-earliest --all-topics GROUP TOPIC PARTITION NEW-OFFSET mygroup syslog 1 0
All good in the neighborhood.
If the consumers are now restarted WITHOUT the previous argument to start the beginning; i.e.
-o beginning, the first consumer should still read from the beginning because we reset the offset back to 0.
This can be verified, by restarting the Consumers in tab 1 and tab 2 with
kcat -b localhost:9092 -G mygroup syslog. Whichever tab you run this command in first, it will output the
syslog topic from the beginning.
While this example showed resetting Consumer Group to the beginning, you can get as specific as you’d like on resetting offsets.
Deleting Kafka Consumer Group Example
How about performing clean-up and removing a Consumer Group? Let’s try while consumers are still running.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group mygroup Error: Deletion of some consumer groups failed: * Group 'mygroup' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
Nope. Let’s stop both consumers and try again
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group mygroup Deletion of requested consumer groups ('mygroup') was successful.
and let’s go back to the beginning and see if there any groups
~/dev/kafka_2.11-2.4.1/bin $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list ~/dev/kafka_2.11-2.4.1/bin $
No more mygroup.
Hope you found this exploration of Kafka Consumer Group administration with
kafka-consumer-groups.sh helpful. If you have any suggestions for improvements or if you’d like to see any other examples or approaches using different tools, let me know in the comments below.