How To Generate Kafka Streaming Join Test Data By Example

Kafka Joins Test Data

Why “Joinable” Streaming Test Data for Kafka?

When creating streaming join applications in KStreams, ksqldb, Spark, Flink, etc. with source data in Kafka, it would be convenient to generate fake data with cross-topic relationships; i.e. a customer topic and an order topic with a value attribute of customer.id.  In this example, we might want to hydrate or enrich events in the order topic stream with customer topic data…. via joins.

This post will show how to generate cross-topic related events in Kafka, so you can write and test stream join code.  Let’s consider it an expansion to the previous post on generating test data in Kafka.  I’m going to show how to generate both Avro and JSON.  You’ll have choices.

At the end of this post you will have a local running Kafka cluster in Docker with test data being delivered to multiple topics.  You’ll have the ability to choose between Avro or JSON for the payload of the tests messages.  At the end, you will have a convenient spot to run and test your streaming join code.  Let the good times begin.

System Requirements

  • Docker install and running
  • docker-compose
  • git
  • curl
  • jq
  • A fun lovin, do-not-take-yourself-too-seriously, good times attitude

Assumptions

Before we begin, you should know this post assumes this isn’t your first time using Docker, docker-compose, and Git.  This means you should be comfortable starting and stopping containers in your environment.  Now, you may be wondering, how can I make these assumptions?  Well, it’s simple, I can make these assumptions because it’s my blog and I’m the big-time boss around here.

Overview

Here’s how we’re going to do it. We’re going to run a Kafka cluster and Kafka Connect node in Docker containers with docker-compose.  We’re going to use a 3rd party Kafka Connect connector called Voluble for generating the joinable test data. I’m show how to install, configure and run it our Docker containers.  I’ve written about Voluble in the past.  It’s awesome.

Kafka Join Test Data Setup Steps

  1. Start Docker if it is not already running
  2. Download zip file from https://www.confluent.io/hub/mdrogalis/voluble and extract.  Depending on the version you downloaded, you should have a directory such as `mdrogalis-voluble-0.3.1`.  In this case, I downloaded version 0.3.1.
  3. git clone https://github.com/conduktor/kafka-stack-docker-compose.git
  4. cd kafka-stack-docker-compose
  5. mkdir connectors && cd connectors
  6. Copy the directory from Step 1; i.e. `mdrogalis-voluble-0.3.1` to the connectors directory.  (Remember you are in the kafka-stack-docker-compose directory which has this connectorsdirectory.  For example, theconnectors directory will have mdrogalis-voluble-0.3.1/ directory in it now.
  7. cd .. (so you are back to the kafka-stack-docker-compose directory)
  8. docker-compose -f full-stack.yml up -d (this will start on the Docker containers.  Give it 30 seconds or so after starting up.)
  9. curl http://localhost:8083/connector-plugins | jq ‘.’ (This is to confirm you can list the Kafka Connect plugins available.  You should see VolubleSourceConnector in the list.  You need this step to complete successfully before continuing.)

Ok, at this point, you are ready to start the Voluble Kafka Connect connector with test data.

Streaming Join Test Configuration and Start

In this example, I’m going to use an example config file at https://github.com/tmcgrath/kafka-connect-examples/tree/master/voluble/joinable.json.  It’s config file to generate some joinable data.  Download this file (or copy-and-paste the contents to a new file) to your environment called joinable.json.

Now, in the same directory as the joinable.json file, start the Voluble connector with

curl -X POST -H "Accept:application/json" -H "Content-Type: application/json" – data @joinable.json http://localhost:8083/connectors | jq '.'

As you can see, this assumes you have a voluble.json file in your current directory.

If the command was successful, you should see an output similar to the following

{
  "name": "joinable",
  "config": {
    "connector.class": "io.mdrogalis.voluble.VolubleSourceConnector",
    "genkp.inventory.sometimes.with": "#{Code.asin}",
    "genkp.inventory.sometimes.matching": "inventory.key",
    "genv.inventory.amount_in_stock.with": "#{number.number_between '5','15'}",
    "genv.inventory.product_name.with": "#{Commerce.product_name}",
    "genv.inventory.last_updated.with": "#{date.past '10','SECONDS'}",
    "genkp.customer.with": "#{Code.isbn10}",
    "genv.customer.name.with": "#{Name.full_name}",
    "genv.customer.gender.with": "#{Demographic.sex}",
    "genv.customer.favorite_beer.with": "#{Beer.name}",
    "genv.customer.state.with": "#{Address.state}",
    "genkp.order.matching": "inventory.key",
    "genv.order.quantity.with": "#{number.number_between '1','5'}",
    "genv.order.customer_id.matching": "customer.key",
    "global.throttle.ms": "1000",
    "global.history.records.max": "10000",
    "name": "joinable"
  },
  "tasks": [],
  "type": "source"
}

You should now be generating test data in Avro format.

There are multiple ways to view and confirm what we just did, so I’ll briefly show a couple of different ways.  At some point, I’ll probably use this example to demonstrate an Apache Flink app with Kafka Join.  Let me know if you’d like to see that.

I have Kafdrop running and configured to use the Confluent Schema Registry which is running at port 8081 in the full-stack deploy we ran earlier.  From Kafdrop, I can see that an order messages always reference a customertopic message key.

Order topic with customer_id set

kafka joinable test data generation verification of order
kafka joinable test data generation verification of order

and if we compare it to the customer topic message key, we’ll see the reference.  For example a customer_id of 0860762270 matches the customer key

kafka joinable test data verification 2
kafka joinable test data verification 2

 

I know many of you will not have Kafkdrop installed, so you can also test with something like kcat.  For example, if you run:

kcat -b localhost:9092 -t customer -s avro -r http://localhost:8081

and compare the output to

kcat -b localhost:9092 -t order -s avro -r http://localhost:8081

But both of these verification methods are a bit away from the primary focus of this post.  But, if you’d like me to make a screencast video to demonstrate any of this, just let me know in comments below.

Avro and JSON Test Data Generation Examples

By default, what we did in the above steps will produce/consume messages in Avro format.  This may be exactly what you are looking for.  But, just in case, let’s quickly explore how we would generate JSON payload instead of Avro.  There is nothing to change in Voluble.  Just need to make changes to the Kafka Connect container configuration.

To generate JSON instead of Avro, open the full-stack.yml file and change the lines as shown in the following snippet

# CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
# CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"

 

For those of you wondering what changed, there are new values for KEY and VALUE converters as well as a new line for CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE.

To see this in action, stop the existing containers with docker-compose -f full-stack.yml down.  The previously created topics and data will not be retained.

Now, if you start at step 8 from the original setup instructions provided above and then re-POST in exact same the Voluble config, you will now produce JSON value payloads with a String key.

kafka joinable test data 3
kafka joinable test data in JSON instead of Avro now

 

Looking Ahead

As mentioned in the excellent Voluble documentation, notice the references to various .matching settings in the source JSON config file for cross-topic relationships; i.e.

 "genv.order.customer_id.matching": "customer.key"

For even further examples of what you can do, check out Java Faker github repo.  Link below.

You are now ready to point your Flink || Spark || KStreams || ksql using the topics generated and hydrated with Voluble.

Hope this helps!

Streaming Join Test Data References

 

Image credit https://pixabay.com/illustrations/texture-pixels-tile-background-2484499/

Kafka Certification Tips for Developers

Kafka Certification Tips

If you are considering Kafka Certification, this page describes what I did to pass the Confluent Certified Developer for Apache Kafka Certification exam.  Good luck and hopefully this page is helpful for you!

There are many reasons why you may wish to become Kafka certified.

My number 1 recommendation to help you become Kafka Developer Certified?  Buy these practice exams.

Achieving Kafka Certification might be a way to distinguish yourself in your current role.  Or, you may wish to transition to a new role which includes a higher bill rate for Kafka contract work.  Or, you may just like the pressure of testing yourself and your Kafka developer chops.  Whatever your reasons, it doesn’t matter here folks.  We are all equal under the stars in the sky.  Well, I’m not exactly sure that’s true about all being equal, but it sounds pretty good, right?  Sometimes, when you are Kafka certified, you need to sound good when you are speaking and writing.  Pfft…lol, anyhow, let’s get serious.

I achieved the Kafka Developer certification through hard work.  I had a couple of years of Kafka experience, but this experience wasn’t ever really solely focused on Kafka.  Kafka was just a component in the overall stack I needed and/or wanted to utilize.  It was never 100% devoted to Kafka though. Along the way, I wrote some Kafka tutorial blog posts here to help engrain some of my Kafka learnings.

My point is I had some Kafka experience.  And if I’m being honest, I had misconceptions about what Kafka is and is not at times.  If I’m being entirely honest, I still do sometimes.  You see, I’m honest.

Here’s a link to my certification

Kafka Certification
Kafka Certification

With my previously described experience with Kafka in mind, I took the following steps to pass the certification test on my first try.  Consider these my personal Kafka Certification tips and FAQ.

What is Kafka Certification?

Kafka Certification is a program through Confluent and at the time of this writing, there are two available certifications.  The Confluent Certified Developer for Apache Kafka (CCDAK) and the Confluent Certified Operator for Apache Kafka (CCOAK).  To become certified, you must take a remotely proctored exam from your computer.  Does anyone else think “remotely proctored” sounds a bit…. well, odd.

Are there multiple types of Kafka Certification?

Yes. This post, certification suggestions, and descriptions focus on the Kafka Developer certification.

What should you study to become Apache Kafka Certified?

Well, first off, you should be prepared for more than just Kafka, the distributed commit log, Consumers and Producers.  This is what most people think of when they hear Kafka.  The exam will also test your knowledge in areas such as Kafka Connect, Kafka Streams, KSQL and the Schema Registry.  I cover this in more detail in the next questions.

What is the percentage breakout of questions on Kafka vs. Kafka Streams, etc?

According to the developer’s certification guide from Confluent (at the time of this writing), we are given the following guidance on the breakout.

Application Design 40% which includes command-line tools, configuration, metrics, architecture, and design

Development 30% which is focused on Java Consumer and Producer APIs as well as the REST API

Deployment/Testing/Monitoring 30% includes tuning for latency/throughput, Kafka Streams, KSQL and troubleshooting

This percentage breakout is taken directly from the Confluent Certification Guide.  There is a link to this Guide in the Resources section below.

Are there any Kafka Certification practice tests?

Yes and I purchased a Kafka Certification practice exam course on Udemy and it helped.  It includes 3 sets of practice tests.

How did you prepare for the exam?

I started by reading documentation and performing some hands-on examples from both the Apache Kafka and Confluent documentation.  I read these Kafka docs pretty quickly at first because I wanted to obtain the big picture view of my skills.

Next, I took the first set of practice questions from the practice tests course I purchased. The results of this first practice test showed where I was strong and where I was weak.

Next, I went back to both the Apache Kafka and Confluent documentation and read more carefully in the areas where I needed improvement.

Afterward, I took the 2nd practice test with higher expectations this time.  Again, the results provided an updated assessment of my current Kafka skills and showed me my weaknesses.  This time, I studied where I was weak again.  But, I also spent 10-20% of the time in areas where I already felt confident.

By now, I imagine you can guess what I did next.  I took the third series in the practice exam set.

At this point, I was feeling pretty confident, so I scheduled my test and began studying the documentation again for 30min to 1 hour a day for each day before the exam.

Please describe the “proctor” experience?

Well, let me tell you they are not kidding about wanting to view the area where you take the test.  I had to spin my webcam 360 degrees.  The proctor asked questions about my dual monitor setup and was quite concerned.  I had to disconnect my second monitor and external camera.

Any Kafka Developer Certification sample questions you can share?

Well, the Confluent Certification guide contains a few sample questions, but from my personal experience, I’d buy the recommended practice exam questions I mentioned above and below.

Anything you didn’t do to prepare, but considered doing?

I kept it simple.  I didn’t buy any courses from Udemy or Pluralsight, but I admit I was tempted to buy.  Instead, I just purchased the practice exams and studied the Apache Kafka and Confluent documentation.  I was tempted to, but the documentation is good and there are plenty of examples on the Internet.  According to Steven Pressfield, I just needed to “do the work”.

Final Thoughts or any other helpful resources

 

Featured image https://pixabay.com/users/insspirito-1851261/

Kafka Test Data Generation Examples

Kafka Test Data Generation

After you start working with Kafka, you will soon find yourself asking the question, “how can I generate test data into my Kafka cluster?”  Well, I’m here to show you have many options for generating test data in Kafka.  In this post and demonstration video, we’ll cover a few of the ways you can generate test data into your Kafka cluster.

Now, before we begin, let’s cover a possible edge case.  If you are wondering about test data in Kafka Streams applications, you might find my previous post on testing Kafka Streams helpful. Well, also, I might find it helpful if you read it and comment on it too.

With that out of the way, let’s go through a few of your options.  I’ll cover ways to generate test data in Kafka from both Apache Kafka and Confluent Platform.

Kafka Test Data Screencast (AKA: Big Time TV Show)

Check out the screencast below to see a demo of examples using kafkacat, Kafka Connectors Datagen and Voluble and finally, ksql-datagen

Part 1 with Kafkacat

Our first example utilizes the kafkacat which is freely available at https://github.com/edenhill/kafkacat/

Here are the steps (more or less) in the above screencast

  1. Start Zookeeper and Kafka on localhost
  2. kafkacatis installed and in my path
  3. cat /var/log/system.log | kafkacat -b localhost:9092 -t syslog
  4. kafkacat -b localhost:9092 -t syslog -J
  5. curl -s “http://api.openweathermap.org/data/2.5/weather?q=Minneapolis,USA&APPID=my-key-get-your-own” |\
    kafkacat -b localhost:9092 -t minneapolis_weather -P
  6. kafkacat -b localhost:9092 -t minneapolis_weather
  7. Show other fun, good time resources such as Mockeroo and JSON-server

Test Data with Apache Kafka Connect Options

There are a couple of available Kafka Connect source connectors to assist in generating test data into Kafka.   There is the Kafka Connect Datagen connector which has been around for a while.  The Datagen connector includes two quickstart schemas to ahh, well, you know, get you started quickly.  See the Reference section below for the link.

In the screencast, I showed how both connectors are already installed.

Next, run some commands such as

  1. confluent local config datagen-pageviews — -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (your path might be different)
  2. kafkacat -b localhost:9092 -t pageviews

Next, we switched to another option for Kafka Connect based Kafka mock (or stub) data generation is a connector called Voluble.  I like how it integrates the Java Faker project which provides support for creating cross-topic relationships such as seen the examples

'genkp.users.with' = '#{Name.full_name}'
'genvp.users.with' = '#{Name.blood_group}'

'genkp.publications.matching' = 'users.key'
'genv.publications.title.with' = '#{Book.title}'

See how the users.keyis referenced in the above example.  Anyhow, much more documentation available from Github repo in the link below.

Steps with Voluble

  1. Listed topics kafka-topics – list – bootstrap-server localhost:9092
  2. Then, I loaded using a sample properties file found in my Github repo.  See the Resources below.
  3. confluent local load voluble-source – -d voluble-source.properties (bonus points and a chance to join me on a future Big Time TV Show if you post how to load it in vanilla Kafka in the comments below.  )
  4. kafka-topics – list – bootstrap-server localhost:9092
  5. kafkacat -b localhost:9092 -t owners

Kafka Test Data in Confluent Platform

If you are a user of the Confluent Platform, you have an easy button available from the CLI with ksql-datagentool.  It has a couple of quickstart schemas to get you rolling quickly as shown in the following screencast

Quickly, let’s run through the following commands

  1. ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100
  2. confluent local consume orders – – value-format avro – from-beginning
  3. kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081

Resources and Helpful References

 

Featured image credit https://pixabay.com/photos/still-life-bottles-color-838387/

Kafka Producer

Kafka Producer Example

Kafka Producers are one of the options to publish data events (messages) to Kafka topics.  Kafka Producers are custom coded in a variety of languages through the use of Kafka client libraries.  The Kafka Producer API allows messages to be sent to Kafka topics asynchronously, so they are built for speed, but also Kafka Producers have the ability to process receipt acknowledgments from the Kafka cluster, so they can be as safe as you desire as well.

For a refresher on this, please see What is Kafka?

Kafka Producer Objective

In this Kafka Producer tutorial, let’s examine a Kafka Producer example and highlight some of the key features and customization options.  It assumes the reader is already familiar with Kafka architectural components such as Producers, Consumers, and Topics.  For a list of other Kafka resources, see Kafka Tutorials page.

On the surface, the idea behind a Kafka Producer is simple.  Send messages to topics.  In Kafka Producer example in this tutorial, we’re going with an easy example of sending to a topic with a single partition.  However, in larger environments, the dynamics of optimized Kafka Producer performance changes.  For example, in production deployments, we will want to experiment and test different batch sizes, compression and possibly even custom partitioners.  Factors which will affect these tests and experiments include the configured replication factor of the topic(s) in question.   We’ll touch upon some of these options and design decisions throughout this tutorial.

Kafka Producer Example Screencast

Before we begin analysis of the Kafka Producer example client source code, let’s show how to run the example in the following screencast

Outside of running and debugging in IntelliJ, we also use the `kafka-console-consumer` command-line tool to inspect messages being sent to the topic in a few different ways including

`./bin/kafka-console-consumer – bootstrap-server localhost:9092 – topic example-topic`

and

`./bin/kafka-console-consumer – bootstrap-server localhost:9092 – topic example-topic – property print.key=true – property key.separator=:: – from-beginning`

Kafka Producer Source Code

Developing Kafka Producers is similar to developing Kafka Consumers by which a Kafka client library is made available to your source code project.   As we saw in the Kafka Consumer tutorial, if you are using a build tool like SBT or Maven, it’s just a matter of adding the library as a dependency, for example

val kafka = "0.10.2.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafka

Let’s start by looking at some of the option Kafka Producer configuration options that are commented out by default.

p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, 0)
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384)
p.put(ProducerConfig.LINGER_MS_CONFIG, 1)
p.put(ProducerConfig.RETRIES_CONFIG, "TODO")
p.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432)

As we saw in the above screencast, these variables are instances of `static final String` and are provided for conveniently setting the underlying configuration key.  For example, `ACKS_CONFIG` default setting in the underlying Java code is:

public static final String ACKS_CONFIG = "acks";

As a Kafka developer, administrator, or architect, you have options on how to design the desired performance characteristics of your Kafka Producer.  These Kafka Producer configuration options are your first stop for tuning for both speed and safety.  (Note: we’re not going to cover all the configuration options; just the ones in this example.)

Kafka Producer `ack` configuration

`ack` specifies the number of replicas which need to respond in order to consider the message as committed.  Recall each topic in a Kafka cluster has a designated leader and possibly a set of replicas among the brokers.  All writes to a particular partition must go the topic partition leader.  Then, replicas fetch from this leader in order to keep in sync.  With the `ack` setting, the partition leader can wait until the configured amount of replicas have synched before sending a committed response back to the Producer.

Kafka Producer `ack` configuration gives the producer developer control over message durability safety at the cost of throughput.

The strongest durability guarantee is setting `acks` to `all` as shown in the commented out code.  With this `ack` setting, it guarantees the partition leader accepted the write request, but also, it was successfully synchronized to all the replicas.  Other options for `ack` includes 0 and 1.  The default setting is 1.

Kafka Producer Batch Size Configuration

Kafka Producers may attempt to collect messages into batches before sending to leaders in an attempt to improve throughput. Use batch.size (or `BATCH_SIZE_CONFIG` as seen in this example.  Remember it’s a convenience mapping) to control the max size in bytes of each message batch.  You can use `linger.ms` to give more time for batches to fill.

You may be wondering, how do batch transmission and topic partitioning co-exist?

Producers maintain buffers (batches) of unsent records for each partition according to the `batch.size` setting. Increasing the value will result in more batching at the cost of requiring more memory.

Kafka Producer Compression

As described in the previous section, if the Kafka Producer is transmitting a batch of messages, is there an option to compress the batch payload?  Yes, with `compression.type` setting.  The default is `none` but may be set to `gzip`, `snappy`, or `lz4`.

Kafka Producer Asynchronous `send`

A few noteworthy items about the two versions of `send` function shown here

       producer.send(new ProducerRecord(topics, s"key ${k}", "oh the value!"))

      // with example callback
      // producer.send(new ProducerRecord(topics,
                                  s"key ${k}",
                                 "oh the value!"),
                                        callback)

First, the obvious and shown in the screencast.  The `send` function can accept a `Callback` argument which will be invoked on message receipt acknowledgment.  This was demonstrated in the screencast.   The callback implementation will receive two arguments to the overridden implementation of `onCompletion`.  `metadata` includes the partition and offset of the sent message.  `exception` includes approximately 10 variations of exceptions which may be sent.

Second, you may remember the internal mechanics of sending messages include leader determination of topic partitions.  Do you remember what this means?  When a Kafka Producer wants to send a message, it needs to determine the leader (a particular node) of the topic partition.  Any Kafka node in the cluster may answer the Producer’s question on which nodes are alive and who the leaders are for particular partitions in a given topic.  Answering these questions allows the Producer to route the message appropriately.

Noteworthy

Starting with Kafka 0.11 the Kafka Producer supports two additional modes beyond `send`.  They include transactional and idempotent producers.  Let me know if we should cover these two additional capabilities in the future?

Kafka Producer Conclusion

In this tutorial, we created and executed a Kafka Producer in Scala.  Did it help?  Or would you prefer a Java example?  Let me know and let me know if you have any questions or suggestions for improvement.  Enjoy yourselves, people.  Try to be good to one another.

Kafka Producer References

Kafka Consumer

Kafka Consumer

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run an example of Kafka Consumer, so you can gain the confidence to develop and deploy your own Kafka Consumer applications.  At the end of this Kafka Consumer tutorial, you’ll have both the source code and screencast of how to run and customize the Kafka Consumer example.

Kafka Consumer Background

As a quick background, recall Kafka Consumers are applications which read messages from Kafka topic partitions.  As previously described in Apache Kafka Overview, Kafka Consumers read messages from Kafka topics in sequential order through use of offsets.  Ordering is a key feature of Apache Kafka and it is what differentiates it from more traditional pub/sub messaging systems.

Kafka messages do not need to be a particular format such as JSON or Avro or plain-text.  There is not a required, formal agreement of data format between Kafka Consumers and Kafka Producers.  In these cases, there is an agreement on what Producers and Consumers can expect in data formats for particular topics.  For those looking to implement more rigidity in data formats and schema validation, there is the option to implement the Confluent Schema Registry with the Avro to ensure a data format contract, but we’ll cover that at a later time.

Three key settings for Kafka Consumers related to how they process messages according to offsets are `auto.offset.reset`, `enable.auto.commit` and `auto.commit.interval.ms`.   Let’s explore this a bit now.

Kafka Consumers will commit their current offset location back to Kafka every 5 seconds by default and when `enable.auto.commit` is set to true (again, the default).  You can change the frequency at which the commits happen by changing the `auto.commit.interval.ms` value.  We’ll show examples and explore this further later in the tutorial.

Kafka Consumer Architecture

When considering the mechanics or flow of data from topics to Consumers\, you may wonder if it is a pull or push action?  In other words, do Kafka Consumers pull messages from Kafka Topics or are messages pushed to Kafka Consumers?  The answer is “pull” based mechanics.  Because one the design goals is allowing consumers to read messages at the best rate for themselves, Kafka Consumers are in control of pulling messages.  A push model would take control away from the Kafka Consumer.   

On the subject of Kafka Consumer mechanics, you should be aware of the differences between older and newer Kafka Consumer clients.  Older Kafka clients depended on ZooKeeper for Kafka Consumer group management, while new clients use a group protocol built into Kafka itself.  Using this group protocol, one of the brokers is designated as the Consumer group’s coordinator and is responsible for managing both the members of the group as well as their partition assignments.

As noted in the previous paragraph, Kafka Consumers may be deployed and configured as Kafka Consumer Groups which can be utilized to increase throughput.  Although you will see reference to Consumer Group ID settings in the source code below, we will not be covering the concept of grouping Kafka Consumers in any depth within this tutorial.

Kafka Consumer Example

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run a Kafka Consumer.  We’ll use Scala in this example, but the concepts hold true regardless of which language you choose to use.   See the link for Kafka Clients in the Reference section below for alternative language options.

Some of the commands used to send test data to the `example-topic` in the screencast included:

`cat words.txt | ./bin/kafka-console-producer – broker-list localhost:9092 – topic example-topic`

`kafka-console-producer – broker-list localhost:9092 – topic example-topic --property parse.key=true --property key.separator=,`

And then we sent keys and values separated by a comma and ended with Ctrl-D

Next, we explored reading a topic from the beginning rather than the latest, so we changed some default properties and the consumer group id.

Kafka Consumer Example Source Code

Let’s examine the Kafka Consumer relevant source code.  To start, notice how we are setting various required properties as shown here

p.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

// if we want to adjust defaults
// p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // default is latest
// p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // default 5000 - change how often to commit offsets
// p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) // default 5000 - change how often to commit offsets

We discussed and tried experimenting with most of these properties during the screencast.  If you have any questions, check out the screencast first.

Later, we spawn our Kafka Consumer, set the topic to subscribe, and start polling every second

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(Collections.singletonList(this.topics))

while (true) {
  val records = consumer.poll(1000).asScala

  for (record <- records) {
    println("Message: (key: " +
      record.key() + ", with value: " + record.value() +
      ") at on partition " + record.partition() + " at offset " + record.offset())
  }

We’re not doing anything interesting with messages other than displaying keys, values and metadata.  What do you notice about the parition and offset values?  Do you see examples of strict ordering in the partition and offset values?

Finally, we should quickly call out the commented out code towards the end of the `main` function

// if you don't want to auto commit offset which is default
// comment out below and adjust properties above
/*
try
  consumer.commitSync
catch {
  case e: CommitFailedException =>
   // possible rollback of processed records
   // which may have failed to be delivered to ultimate destination
}
*/

By default, Kafka will commit it’s current offset back to Kafka every 5 seconds automatically.  But what if processing fails from the time messages were received until the time the offset is committed back?  Is there a chance to lose data?  Is there a chance to duplicate data?  This is covered in much more detail in the Kafka delivery guarantees post, so I recommend you check that out if interested in learning more.

For now, this commented code and properties such as `ENABLE_AUTO_COMMIT_CONFIG` shown above indicate options you have to tune how delivery vs processing guarantees.

Kafka Consumer Conclusion

I hope you enjoyed and learned from this Kafka Consumer tutorial.  Do let me know if you have any questions or suggestions for improvement.  Thanks!  Next up, you may wish to try the Kafka Producer tutorial and example or move back to the list of Kafka Tutorials.

Kafka Consumer References

Kafka Consumer featured image https://pixabay.com/en/alcohol-drink-alkolismus-bottles-64164/