How do Kafka administrators perform administrative and diagnostic collection actions of Kafka Consumer Groups?
This post explores a Kafka Groups operations admin tool called kafka-consumer-groups.sh
. This popular, command-line tool included in Apache Kafka distributions. There are other examples of both open source and 3rd party tools not included with Apache Kafka which can also be used for Kafka Consumer Group administrative tasks, but for this post, let’s focus on kafka-consumer-groups.sh
. The concepts are what matter because they can easily be applied to other tools.
First, please recall Kafka Consumer Groups present a mechanism to distribute consumption from Apache Kafka across multiple individual consumers collaborating together. It is implementation of parallelism or how to scale out. Multiple Consumer Groups can be separated and still subscribe to the same topics as well. The Kafka Consumer and Kafka Consumer Group orchestration is built-into Kafka, and unlike other distributed systems, does not require a 3rd party orchestrator.
Table of Contents
- What is Kafka Consumer Group?
- Overview
- Listing all Kafka Consumer Groups Example
- Describe a Kafka Consumer Group Example
- Resetting a Kafka Consumer Group Example
- Kafka Delete Consumer Group Example
- Conclusion
- Resources
What is Kafka Consumer Group?
If you are not already familiar with basic concepts of Kafka Consumer Groups, please read the what are Kafka Consumer Groups tutorial first. This tutorial assumes you are familiar with Kafka Consumer Group functionality such as how consumers are assigned to partitions.
In this tutorial, we will start with an overview and then describe the environment setup used to present the examples. Then, we will go through a variety of specific examples.
Overview
As mentioned, kafka-consumer-groups.sh
is a command line tool distributed with Apache Kafka. It is found in the bin/
directory. At a high level, the tool describes itself as helping “list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets”. Sounds good. Let’s do that.
Kafka Consumer Groups Example Environment Setup
To cover the various examples of Consumer Group administrative tasks, we need Kafka cluster with at least one topic with data. There are various ways we can do this including a fairly recent tutorial on Kafka Test Generation for Joins, but in this tutorial, let’s use the utility kcat
(previously known as kafkacat
) and Docker-ized Kafka cluster. Both of these have been covered on other tutorials on this site, but let me know if you have any questions.
Requirements
- Download and extract Apache Kafka so we have access to
kafka-consumer-groups.sh
command in the bin directory - Install
kcat
if not already installed from https://github.com/edenhill/kcat - Start Docker (if it is not already running)
- git clone https://github.com/conduktor/kafka-stack-docker-compose.git
- cd kafka-stack-docker-compose
- docker-compose -f zk-single-kafka-multiple.yml up -d (wait 5-30 seconds for everything to come online)
- Verify steps 1-5 with
kcat -L -b localhost:9092
to ensure we are ready to proceed
~/dev/kafka-stack-docker-compose [master] $ docker-compose -f zk-single-kafka-multiple.yml up -d
Creating network "kafka-stack-docker-compose_default" with the default driver
Creating zoo1 ... done
Creating kafka1 ... done
Creating kafka3 ... done
Creating kafka2 ... done
~/dev/kafka-stack-docker-compose [master] $ kcat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
3 brokers:
broker 2 at 127.0.0.1:9093
broker 3 at 127.0.0.1:9094
broker 1 at 127.0.0.1:9092 (controller)
0 topics:
~/dev/kafka-stack-docker-compose [master] $
Also, I have downloaded and extracted Apache Kafka into my ~/dev directory as shown next.
~/dev/kafka-stack-docker-compose [master] $ ls ~/dev/kafka_2.11-2.4.1/bin
config.out kafka-consumer-groups.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh
connect-distributed.sh kafka-consumer-perf-test.sh kafka-reassign-partitions.sh trogdor.sh
connect-mirror-maker.sh kafka-delegation-tokens.sh kafka-replica-verification.sh windows
connect-standalone.sh kafka-delete-records.sh kafka-run-class.sh zookeeper-security-migration.sh
kafka-acls.sh kafka-dump-log.sh kafka-server-start.sh zookeeper-server-start.sh
kafka-broker-api-versions.sh kafka-leader-election.sh kafka-server-stop.sh zookeeper-server-stop.sh
kafka-configs.sh kafka-log-dirs.sh kafka-streams-application-reset.sh zookeeper-shell.sh
kafka-console-consumer.sh kafka-mirror-maker.sh kafka-topics.sh
kafka-console-producer.sh kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh
Ok, next step is creating a topic and pipe some events to it. We need this so our Kafka Consumers subscribe to it and run our consumer group administration examples. Let’s do that now.
Let’s keep it simple and pipe an existing log file to a new topic called syslog
.
First, let’s create the syslog
topic with 3 partitions.
~/dev/kafka_2.11-2.4.1/bin $ ./kafka-topics.sh --create --topic syslog --partitions 3 --bootstrap-server localhost:9092
Next, let’s pipe data to the syslog
topic. I’m running on a Mac, so I have a /var/log/system.log file. Depending on your setup, you may need to use a different log file. Any log file will do, as the actual data in these examples do not matter. We only care about the offset locations.
To create the syslog
topic, I ran cat /var/log/system.log | kcat -b localhost:9092 -t syslog
as shown in the following and then exited with Control-C after a few seconds. As you can see, I now have a syslog
topic after listing topics by running kcat -L -b localhost:9092
.
~/dev/kafka-stack-docker-compose [master] $ cat /var/log/system.log | kcat -b localhost:9092 -t syslog
% Auto-selecting Producer mode (use -P or -C to override)
^C% ERROR: Program terminated while producing message of 109 bytes
~/dev/kafka-stack-docker-compose [master] $ kcat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
3 brokers:
broker 2 at 127.0.0.1:9093
broker 3 at 127.0.0.1:9094 (controller)
broker 1 at 127.0.0.1:9092
1 topics:
topic "syslog" with 3 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
partition 1, leader 2, replicas: 2, isrs: 2
partition 2, leader 3, replicas: 3, isrs: 3
~/dev/kafka-stack-docker-compose [master] $
And finally last step is to show how I’m going to be running various Kafka Consumers examples in order to show the kafka-consumer-groups.sh
.
Let’s start with a screenshot of terminal tabs
I’m going to have 3 terminal tabs open to start. In tab 3, I’m in the kafka_2.11-2.4.1/bin
directory to execute the kafka-consumer-groups.sh
examples. In tab 1 and tab 2, I’ll be running Kafka Consumers via kcat
, but you could use something like kafka-console-consumer.sh
too if you’d like. I just wanted to let you know. You don’t have to do it this way. You can do it your way. It’s your call because YOU are a big time blog reader. Do what you do.
Listing all Kafka Consumer Groups Example
In both tab 1 and tab 2, I’m running kcat -b localhost:9092 -G mygroup -o beginning syslog
which will creates two consumers in the “mygroup” Consumer Group.
In tab 3, let’s list all the consumer groups
~/dev/kafka_2.11-2.4.1/bin $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
mygroup
Ok, we see we have one group called “mygroup”, but can we find out anymore?
Describe a Kafka Consumer Group Example
Let’s find out more about the “mygroup”.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup --members
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
mygroup rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1 rdkafka 2
mygroup rdkafka-d3a68893-29d6-4080-80bd-1c0330cb1a69 /172.22.0.1 rdkafka 1
With the --describe
and --members
options, along with the specific --group
we can now see quite a bit more such as knowing there are two active Consumers in the group and which partitions each consumer is assigned to. Consumer-ID rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf is assigned to 2 partitions while the other Consumer is assigned to 1.
For review of how consumers are assigned to partitions when running in groups, see the previous tutorial on Kafka Consumer Group fundamentals.
If we don’t pass in the --members
arg, we can see different info for each consumer.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup
Picked up JAVA_TOOL_OPTIONS: -Dlog4j2.formatMsgNoLookups=true
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup syslog 0 - 0 - rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1 rdkafka
mygroup syslog 1 89 89 0 rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1 rdkafka
mygroup syslog 2 - 0 - rdkafka-d3a68893-29d6-4080-80bd-1c0330cb1a69 /172.22.0.1 rdkafka
Resetting a Kafka Consumer Group Example
From the previous example, we can see the mygroup
has no lag. We see it two different ways: LAG = 0 and CURRENT-OFFSET = LOG-ENG-OFFSET. How do we reset the consumption back to the beginning of the topic without restarting either of the two consumers? Let’s try it.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --execute --reset-offsets --to-earliest --all-topics
Error: Assignments can only be reset if the group 'mygroup' is inactive, but the current state is Stable.
GROUP TOPIC PARTITION NEW-OFFSET
Ok, looks like we cannot and consumers need to be stopped. So, I’m going to go to tab 1 and tab 2 and stop both consumers.
Now, when I try to reset
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --execute --reset-offsets --to-earliest --all-topics
GROUP TOPIC PARTITION NEW-OFFSET
mygroup syslog 1 0
All good in the neighborhood.
If the consumers are now restarted WITHOUT the previous argument to start the beginning; i.e. -o beginning
, the first consumer should still read from the beginning because we reset the offset back to 0.
This can be verified, by restarting the Consumers in tab 1 and tab 2 with kcat -b localhost:9092 -G mygroup syslog
. Whichever tab you run this command in first, it will output the syslog
topic from the beginning.
While this example showed resetting Consumer Group to the beginning, you can get as specific as you’d like on resetting offsets.
Kafka Delete Consumer Group Example
How about performing clean-up and removing a Consumer Group? Let’s try while consumers are still running.
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group mygroup
Error: Deletion of some consumer groups failed:
* Group 'mygroup' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
As we can see in above, it doesn’t work. Let’s stop both consumers and try again
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group mygroup
Deletion of requested consumer groups ('mygroup') was successful.
and let’s go back to the beginning and see if there any groups
~/dev/kafka_2.11-2.4.1/bin $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
~/dev/kafka_2.11-2.4.1/bin $
No more mygroup. In this example, we showed how to delete a Kafka Consumer Group.
Conclusion
Hope you found this exploration of Kafka Consumer Group administration with kafka-consumer-groups.sh
helpful. If you have any suggestions for improvements or if you’d like to see any other examples or approaches using different tools, let me know in the comments below.