Kafka Consumer Groups with kafka-consumer-groups.sh

Kafka Consumer Groups Operation Examples

How do Kafka administrators perform administrative and diagnostic collection actions of Kafka Consumer Groups? 

This post explores a Kafka Groups operations admin tool called kafka-consumer-groups.sh.  This popular, command-line tool included in Apache Kafka distributions.  There are other examples of both open source and 3rd party tools not included with Apache Kafka which can also be used for Kafka Consumer Group administrative tasks, but for this post, let’s focus on kafka-consumer-groups.sh. The concepts are what matter because they can easily be applied to other tools.

First, please recall Kafka Consumer Groups present a mechanism to distribute consumption from Apache Kafka across multiple individual consumers collaborating together.  It is implementation of parallelism or how to scale out. Multiple Consumer Groups can be separated and still subscribe to the same topics as well.  The Kafka Consumer and Kafka Consumer Group orchestration is built-into Kafka, and unlike other distributed systems, does not require a 3rd party orchestrator. 

If you are not already familiar with basic concepts of Kafka Consumer Groups, please read the what are Kafka Consumer Groups tutorial before proceeding here. This tutorial assumes you are familiar with Kafka Consumer Group functionality such as how consumers are assigned to partitions.

In this tutorial, we will start with an overview and then describe the environment setup used to present the examples.  Then, we will go through a variety of specific examples.

Table of Contents

Overview

As mentioned, kafka-consumer-groups.sh is a command line tool distributed with Apache Kafka.  It is found in the bin/ directory.  At a high level, the tool describes itself as helping “list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets”. Sounds good. Let’s do that.

Kafka Consumer Groups Example Environment Setup

To cover the various examples of Consumer Group administrative tasks, we need Kafka cluster with at least one topic with data.  There are various ways we can do this including a fairly recent tutorial on Kafka Test Generation for Joins, but in this tutorial, let’s use the utility kcat (previously known as kafkacat) and Docker-ized Kafka cluster.  Both of these have been covered on other tutorials on this site, but let me know if you have any questions.

Requirements

  1. Download and extract Apache Kafka so we have access to kafka-consumer-groups.sh command in the bin directory
  2. Install kcat if not already installed from https://github.com/edenhill/kcat
  3. Start Docker (if it is not already running)
  4. git clone https://github.com/conduktor/kafka-stack-docker-compose.git
  5. cd kafka-stack-docker-compose
  6. docker-compose -f zk-single-kafka-multiple.yml up -d (wait 5-30 seconds for everything to come online)
  7. Verify steps 1-5 with kcat -L -b localhost:9092 to ensure we are ready to proceed
~/dev/kafka-stack-docker-compose [master] $ docker-compose -f zk-single-kafka-multiple.yml up -d
Creating network "kafka-stack-docker-compose_default" with the default driver
Creating zoo1 ... done
Creating kafka1 ... done
Creating kafka3 ... done
Creating kafka2 ... done
~/dev/kafka-stack-docker-compose [master] $ kcat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 3 brokers:
  broker 2 at 127.0.0.1:9093
  broker 3 at 127.0.0.1:9094
  broker 1 at 127.0.0.1:9092 (controller)
 0 topics:
~/dev/kafka-stack-docker-compose [master] $

Also, I have downloaded and extracted Apache Kafka into my ~/dev directory as shown next.

~/dev/kafka-stack-docker-compose [master] $ ls ~/dev/kafka_2.11-2.4.1/bin
config.out                          kafka-consumer-groups.sh            kafka-producer-perf-test.sh         kafka-verifiable-producer.sh
connect-distributed.sh              kafka-consumer-perf-test.sh         kafka-reassign-partitions.sh        trogdor.sh
connect-mirror-maker.sh             kafka-delegation-tokens.sh          kafka-replica-verification.sh       windows
connect-standalone.sh               kafka-delete-records.sh             kafka-run-class.sh                  zookeeper-security-migration.sh
kafka-acls.sh                       kafka-dump-log.sh                   kafka-server-start.sh               zookeeper-server-start.sh
kafka-broker-api-versions.sh        kafka-leader-election.sh            kafka-server-stop.sh                zookeeper-server-stop.sh
kafka-configs.sh                    kafka-log-dirs.sh                   kafka-streams-application-reset.sh  zookeeper-shell.sh
kafka-console-consumer.sh           kafka-mirror-maker.sh               kafka-topics.sh
kafka-console-producer.sh           kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh

Ok, next step is creating a topic and pipe some events to it. We need this so our Kafka Consumers subscribe to it and run our consumer group administration examples. Let’s do that now.

Let’s keep it simple and pipe an existing log file to a new topic called syslog.

First, let’s create the syslog topic with 3 partitions.

~/dev/kafka_2.11-2.4.1/bin $ ./kafka-topics.sh – create – topic syslog – partitions 3 – bootstrap-server localhost:9092

Next, let’s pipe data to the syslog topic. I’m running on a Mac, so I have a /var/log/system.log file. Depending on your setup, you may need to use a different log file. Any log file will do, as the actual data in these examples do not matter. We only care about the offset locations.

To create the syslog topic, I ran cat /var/log/system.log | kcat -b localhost:9092 -t syslog as shown in the following and then exited with Control-C after a few seconds. As you can see, I now have a syslog topic after listing topics by running kcat -L -b localhost:9092.

~/dev/kafka-stack-docker-compose [master] $ cat /var/log/system.log | kcat -b localhost:9092 -t syslog
% Auto-selecting Producer mode (use -P or -C to override)
^C% ERROR: Program terminated while producing message of 109 bytes

~/dev/kafka-stack-docker-compose [master] $ kcat -L -b localhost:9092
Metadata for all topics (from broker -1: localhost:9092/bootstrap):
 3 brokers:
  broker 2 at 127.0.0.1:9093
  broker 3 at 127.0.0.1:9094 (controller)
  broker 1 at 127.0.0.1:9092
 1 topics:
  topic "syslog" with 3 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 2, replicas: 2, isrs: 2
    partition 2, leader 3, replicas: 3, isrs: 3
~/dev/kafka-stack-docker-compose [master] $

And finally last step is to show how I’m going to be running various Kafka Consumers examples in order to show the kafka-consumer-groups.sh.

Let’s start with a screenshot of terminal tabs

kafka-consumer-groups.sh examples iterm2 tab setup

I’m going to have 3 terminal tabs open to start. In tab 3, I’m in the kafka_2.11-2.4.1/bin directory to execute the kafka-consumer-groups.sh examples. In tab 1 and tab 2, I’ll be running Kafka Consumers via kcat, but you could use something like kafka-console-consumer.sh too if you’d like. I just wanted to let you know. You don’t have to do it this way. You can do it your way. It’s your call because YOU are a big time blog reader. Do what you do.

Listing all Kafka Consumer Groups Example

In both tab 1 and tab 2, I’m running kcat -b localhost:9092 -G mygroup -o beginning syslog which will creates two consumers in the “mygroup” Consumer Group.

In tab 3, let’s list all the consumer groups

~/dev/kafka_2.11-2.4.1/bin $ ./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – list
mygroup

Ok, we see we have one group called “mygroup”, but can we find out anymore?

Describe a Kafka Consumer Group Example

Let’s find out more about the “mygroup”.

./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – describe – group mygroup – members

GROUP           CONSUMER-ID                                  HOST            CLIENT-ID       #PARTITIONS
mygroup         rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1     rdkafka         2
mygroup         rdkafka-d3a68893-29d6-4080-80bd-1c0330cb1a69 /172.22.0.1     rdkafka         1

With the --describe and --members options, along with the specific --group we can now see quite a bit more such as knowing there are two active Consumers in the group and which partitions each consumer is assigned to. Consumer-ID rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf is assigned to 2 partitions while the other Consumer is assigned to 1.

For review of how consumers are assigned to partitions when running in groups, see the previous tutorial on Kafka Consumer Group fundamentals.

If we don’t pass in the --members arg, we can see different info for each consumer.

./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – describe – group mygroup
Picked up JAVA_TOOL_OPTIONS: -Dlog4j2.formatMsgNoLookups=true

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
mygroup         syslog          0          -               0               -               rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1     rdkafka
mygroup         syslog          1          89              89              0               rdkafka-54c79595-ee94-445a-a783-c5a39ba5aadf /172.22.0.1     rdkafka
mygroup         syslog          2          -               0               -               rdkafka-d3a68893-29d6-4080-80bd-1c0330cb1a69 /172.22.0.1     rdkafka

Resetting a Kafka Consumer Group Example

From the previous example, we can see the mygroup has no lag. We see it two different ways: LAG = 0 and CURRENT-OFFSET = LOG-ENG-OFFSET. How do we reset the consumption back to the beginning of the topic without restarting either of the two consumers? Let’s try it.

./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – group mygroup – execute – reset-offsets – to-earliest – all-topics

Error: Assignments can only be reset if the group 'mygroup' is inactive, but the current state is Stable.

GROUP                          TOPIC                          PARTITION  NEW-OFFSET

Ok, looks like we cannot and consumers need to be stopped. So, I’m going to go to tab 1 and tab 2 and stop both consumers.

Now, when I try to reset

./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – group mygroup – execute – reset-offsets – to-earliest – all-topics

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
mygroup                        syslog                         1          0

All good in the neighborhood.

If the consumers are now restarted WITHOUT the previous argument to start the beginning; i.e. -o beginning, the first consumer should still read from the beginning because we reset the offset back to 0.

This can be verified, by restarting the Consumers in tab 1 and tab 2 with kcat -b localhost:9092 -G mygroup syslog. Whichever tab you run this command in first, it will output the syslog topic from the beginning.

While this example showed resetting Consumer Group to the beginning, you can get as specific as you’d like on resetting offsets.

Deleting Kafka Consumer Group Example

How about performing clean-up and removing a Consumer Group? Let’s try while consumers are still running.

./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – delete – group mygroup

Error: Deletion of some consumer groups failed:
* Group 'mygroup' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

Nope. Let’s stop both consumers and try again

./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – delete – group mygroup
Deletion of requested consumer groups ('mygroup') was successful.

and let’s go back to the beginning and see if there any groups

~/dev/kafka_2.11-2.4.1/bin $ ./kafka-consumer-groups.sh – bootstrap-server localhost:9092 – list
~/dev/kafka_2.11-2.4.1/bin $

No more mygroup.

Conclusion

Hope you found this exploration of Kafka Consumer Group administration with kafka-consumer-groups.sh helpful. If you have any suggestions for improvements or if you’d like to see any other examples or approaches using different tools, let me know in the comments below.

Resources

Leave a Reply

Your email address will not be published.