Kafka Streams – Transformations Examples

Kafka Streams Transformations

Kafka Streams Transformations provide the ability to perform actions on Kafka Streams such as filtering and updating values in the stream.  Kafka Stream’s transformations contain operations such as `filter`, `map`, `flatMap`, etc. and have similarities to functional combinators found in languages such as Scala.  And, if you are coming from Spark, you will also notice similarities to Spark Transformations.  But, even if you don’t have experience with combinators or Spark, we’ll cover enough examples of Kafka Streams Transformations in this post for you to feel comfortable and gain confidence through hands-on experience.  We’re going to cover examples in Scala, but I think the code would readable and comprehensible for those of you with a Java preference as well.

Kafka Stream Transformations are available from `KTable` or `KStream` and will result in one or more `KTable`, `KStream` or `KGroupedTable` depending on the transformation function.  We’ll cover examples of various inputs and outputs below.

Before we go into the source code examples, let’s cover a little background and also a screencast of running through the examples.

Kafka Streams Transformation Types

Kafka Streams Transformations are available in two types: Stateless and Stateful.

Stateless transformations do not require state for processing.  For example, let’s imagine you wish to filter a stream for all keys starting with a particular string in a stream processor.  In this case, Kafka Streams doesn’t require knowing the previous events in the stream.  It simply performs each filtering operation on the message and moves on.  Conversely, let’s say you wish to sum certain values in the stream.  In this case, you would need “state” to know what has been processed already in previous messages in the stream in order to keep a running tally of the sum result.

As previously mentioned, stateful transformations depend on maintaining the state of the processing.  To maintain the current state of processing the input and outputs, Kafka Streams introduces a construct called a State Store.  Operations such as aggregations such as the previous sum example and joining Kafka streams are examples of stateful transformations.

Kafka Streams Transformations Source Code

All the source code is available from my Kafka Streams Examples repo on Github.

Kafka Streams Transformations Screencast

Before we begin going through the Kafka Streams Transformation examples, I’d recommend viewing the following short screencast where I demonstrate how to run the Scala source code examples in IntelliJ.

Kafka Streams Transformations Examples Scala Source Code

The following Kafka Streams transformation examples are primarily examples of stateless transformations.  Let me know if you want some stateful examples in a later post.  I do plan to cover aggregating and windowing in a future post.  Also, related to stateful Kafka Streams joins, you may wish to check out the previous Kafka Streams joins post.

It is recommended to watch the short screencast above, before diving into the examples.

Kafka Streams Transformation Examples


branch
filter
flatMap
map
groupBy

`branch`

The `branch` function is used to split a KStream by the supplied predicates into one of more KStream results.  In this Kafka Streams Transformations tutorial, the `branch` example had three predicates: two filters for key name and one default predicate for everything else.

This is the example implementation

val results: Array[KStream[String, String]] = inputStream.branch(
  (key, value) => key.startsWith(keyFilter1),
  (key, value) => key.startsWith(keyFilter2),
  (key, value) => true
)

and we tested the expected results for filters on “sensor-1” and “sensor-2” and a default.

storeOne.get("sensor-1") shouldBe "MN"
storeOne.get("sensor-11") shouldBe "IL"
storeTwo.get("sensor-2") shouldBe "WI"

`filter`

The ‘filter` function can filter either a KTable or KStream to produce a new KTable or KStream respectively.

For our example, we used a KStream

inputStream.filter(
  (key, value) => value == keyFilter
).to(s"${keyFilter}-topic")

In this example, we use the passed in filter based on values in the KStream.

storeOne.get("sensor-1") shouldBe valFilter
storeOne.get("sensor-2") shouldBe null
storeOne.get("sensor-11") shouldBe null

`valFilter` is set to “MN” in the Spec class.

`FLATMAP`

`flatMap` performs as expected if you have used it before in Spark or Scala.  Use it to produce zero, one or more records from each input record processed.
From the Kafka Streams documentation, it’s important to note

Marks the stream for data re-partitioning:

 Applying a grouping or a join after flatMap will result in re-partitioning of the records. If possible use flatMapValues instead, which will not cause data re-partitioning.

In the example `flatMap` implementation

inputStream.flatMap {
  (key, value) => {
    expanderList.flatMap { s =>
      List((s"${s}-${value}", value))
    }
  }
}

we are using both `flatMap` from Kafka Streams as well as `flatMap` from Scala.  The intention is to show creating multiple new records for each input record.

`MAP`

Where `flatMap` may produce multiple records from a single input record, `map` is used to produce a single output record from an input record.  I like to think of it as one-to-one vs the potential for `flatMap` to be one-to-many.

In the implementation example

val outputStream = inputStream.map {
  (key, value) => (key, s"${value}-new")
  }.to(resultTopic)

Here we simply create a new key, value pair with the same key, but an updated value.

In the tests, we test for the new values from the result stream.

storeOne.get("sensor-1") shouldBe "MN-new"
storeOne.get("sensor-2") shouldBe "WI-new"
storeOne.get("sensor-11") shouldBe "IL-new"

`GROUPBY`

In `groupBy` we deviate from stateless to stateful transformation here in order to test expected results.  In the implementation shown here

inputStream.groupBy {
  (key, value) => value
}.count()(Materialized.as(s"${storeName}"))

we are going to group by the values. Notice in the test class we are passing two records with the value of “MN” now. This will allow us to test the expected `count` results

storeOne.get("MN") shouldBe 2
storeOne.get("WI") shouldBe 1

`count` is a stateful operation which was only used to help test in this case.

Conclusion

Hope these examples helped. Do let me know if you have any questions, comments or ideas for improvement.

References

Kafka Streams Transformation Examples featured image: https://pixabay.com/en/dandelion-colorful-people-of-color-2817950/

Stream Processor Windows

Stream Processor Windows

When moving to stream processing architecture or building stream processors, you will soon face two choices.  Will you process streams on an individual, per event basis?  Or, will you collect and buffer multiple events/messages first, and then apply a function or join results to this collection of events?

Examples of single event processing might be the current GPS location, temperature, removing PII or enriching a record with address information.

Conversely, examples of the processing multiple events, which involves computing results across multiple events, including weblog traffic for breakout pages clicked during a session in order to make a recommendation, metrics from mobile or IoT devices such temperature readings over the last 5 minutes, fraud detection based on events over a set time period, and behavior analysis of what is added and removed from a shopping cart while visiting a particular website.  This is the opposite of single event processing where any associated context of the event is irrelevant.

But, back to the idea of processing events as a group of events rather than individually.  This implies stream processor implementations provide the capability of 1) gathering multiple events and 2) performing some kind of computation function on the collection of events.  Different implementations provide variances around these two fundamental constructs.  This high-level capability when designing stream processors is called window-based operations or more succinctly, windowing.

Stream Processor Windows Overview

Window operations define boundaries to create finite sets of events and then perform functions against the set of events.  Sometimes these sets are called segments or collections or buckets.  The assignment of events into buckets is based on time or properties inherent in the event data.  Critical concept: There can be two notions of “time”: event time and processing time.

Event time is when the event occurred vs processing time which is the time when even was processed.  Do not overlook the differences between event and processing time.  Ideally, these two values are the same, but that’s not reality and the time will have different degrees of skew over time.  For a great explanation of these differences and how skew occurs, check out the Streaming 101 post.

Windowing needs set boundaries for bucketing and how often the window produces a result.  Result functions may be aggregated, such as the total time spent in a particular location or other common functions such as max, min, mean, median, standard deviation, etc.

Types of Stream Processor Windows

There are four types of windowing.  Some of the most commonly used windowing methods implemented in streaming engines include tumbling, hopping sliding and session windows.

Tumbling Windows

Tumbling windows segment events into fixed sizes and then perform a function against the events in the collection.  Tumbling window segmentation may be based on a particular count of elements or a set period of time.   A key differentiator in Tumbling windows is there is no overlap between windows.  In other words, unlike other types of windows, an event can only be part of one particular window.  This differs from Hopping windows, which is the next type of window described.

Hopping Windows

Hopping windows are based on fixed time period intervals.  Hoping window results may overlap with other windows, so events may belong to more than one window processing result.  Hopping windows are defined by window time size (e.g. 5 minutes) and the advance interval or “hop” (e.g. 1 minute).   A Hopping window may be configured to be the same as a Tumbling window if the hop size to be the same as the window time size.  This will result in no overlaps and thus, all events will be part of only one bucket.

Sliding Windows

From my research, sliding vs tumbling windows is a bit of a debate.

Sliding windows produce an output result only when an event occurs which is different than Tumbling or Hopping windows.  A good graphic comparing and contrasting Sliding vs Tumbling windows can be seen here.

In Kafka Streams, sliding windows are used only for `join` operations.  There doesn’t appear to be this distinction in Spark Streaming.

Similarities to previously described Tumbling and Hopping windows include the notion an event might belong to multiple windows and windows are defined by size and hop size.

Session Windows

Grouping of events originating from the same period of user activity or session is commonly used for behavior analysis.  User activities or “sessions” can be collected using a session window.  A session window starts when the first event occurs and remains open until a timeout value is reached. If another event occurs within the specified timeout from the previously ingested event, the session window extends to ingest the new event. If no events occur within the timeout, the session window is closed.

A common example of a session is a user browsing a website.  A user may enter a search term, compare different product pages, add one or more items to their shopping cart and eventually checkout or abandon their session without purchase.

Windowing Conclusion and Resources

You will face a variety of options when considering streaming applications and stream processing architectures.   If you need to process and produce results from a bucket of events rather than processing one event at a time, you will need stream processor windows.  Depending on your streaming implementation, you will have variances in what window operations are available to utilize.   The following links may help in learning more about stream processing windows from different perspectives.

Stream Processor Window Examples

Kafka Stream Joins has examples of setting windows.

Kafka Streams Windowing

Spark Streaming Windowing

Flink Windowing

Beam Windowing

Kafka Producer

Kafka Producer Example

Kafka Producers are one of the options to publish data events (messages) to Kafka topics.  Kafka Producers are custom coded in a variety of languages through the use of Kafka client libraries.  The Kafka Producer API allows messages to be sent to Kafka topics asynchronously, so they are built for speed, but also Kafka Producers have the ability to process receipt acknowledgments from the Kafka cluster, so they can be as safe as you desire as well.

For a refresher on this, please see What is Kafka?

Kafka Producer Objective

In this Kafka Producer tutorial, let’s examine a Kafka Producer example and highlight some of the key features and customization options.  It assumes the reader is already familiar with Kafka architectural components such as Producers, Consumers, and Topics.  For a list of other Kafka resources, see Kafka Tutorials page.

On the surface, the idea behind a Kafka Producer is simple.  Send messages to topics.  In Kafka Producer example in this tutorial, we’re going with an easy example of sending to a topic with a single partition.  However, in larger environments, the dynamics of optimized Kafka Producer performance changes.  For example, in production deployments, we will want to experiment and test different batch sizes, compression and possibly even custom partitioners.  Factors which will affect these tests and experiments include the configured replication factor of the topic(s) in question.   We’ll touch upon some of these options and design decisions throughout this tutorial.

Kafka Producer Example Screencast

Before we begin analysis of the Kafka Producer example client source code, let’s show how to run the example in the following screencast

Outside of running and debugging in IntelliJ, we also use the `kafka-console-consumer` command-line tool to inspect messages being sent to the topic in a few different ways including

`./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic example-topic`

and

`./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic example-topic --property print.key=true --property key.separator=:: --from-beginning`

Kafka Producer Source Code

Developing Kafka Producers is similar to developing Kafka Consumers by which a Kafka client library is made available to your source code project.   As we saw in the Kafka Consumer tutorial, if you are using a build tool like SBT or Maven, it’s just a matter of adding the library as a dependency, for example

val kafka = "0.10.2.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafka

Let’s start by looking at some of the option Kafka Producer configuration options that are commented out by default.

p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, 0)
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384)
p.put(ProducerConfig.LINGER_MS_CONFIG, 1)
p.put(ProducerConfig.RETRIES_CONFIG, "TODO")
p.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432)

As we saw in the above screencast, these variables are instances of `static final String` and are provided for conveniently setting the underlying configuration key.  For example, `ACKS_CONFIG` default setting in the underlying Java code is:

public static final String ACKS_CONFIG = "acks";

As a Kafka developer, administrator, or architect, you have options on how to design the desired performance characteristics of your Kafka Producer.  These Kafka Producer configuration options are your first stop for tuning for both speed and safety.  (Note: we’re not going to cover all the configuration options; just the ones in this example.)

Kafka Producer `ack` configuration

`ack` specifies the number of replicas which need to respond in order to consider the message as committed.  Recall each topic in a Kafka cluster has a designated leader and possibly a set of replicas among the brokers.  All writes to a particular partition must go the topic partition leader.  Then, replicas fetch from this leader in order to keep in sync.  With the `ack` setting, the partition leader can wait until the configured amount of replicas have synched before sending a committed response back to the Producer.

Kafka Producer `ack` configuration gives the producer developer control over message durability safety at the cost of throughput.

The strongest durability guarantee is setting `acks` to `all` as shown in the commented out code.  With this `ack` setting, it guarantees the partition leader accepted the write request, but also, it was successfully synchronized to all the replicas.  Other options for `ack` includes 0 and 1.  The default setting is 1.

Kafka Producer Batch Size Configuration

Kafka Producers may attempt to collect messages into batches before sending to leaders in an attempt to improve throughput. Use batch.size (or `BATCH_SIZE_CONFIG` as seen in this example.  Remember it’s a convenience mapping) to control the max size in bytes of each message batch.  You can use `linger.ms` to give more time for batches to fill.

You may be wondering, how do batch transmission and topic partitioning co-exist?

Producers maintain buffers (batches) of unsent records for each partition according to the `batch.size` setting. Increasing the value will result in more batching at the cost of requiring more memory.

Kafka Producer Compression

As described in the previous section, if the Kafka Producer is transmitting a batch of messages, is there an option to compress the batch payload?  Yes, with `compression.type` setting.  The default is `none` but may be set to `gzip`, `snappy`, or `lz4`.

Kafka Producer Asynchronous `send`

A few noteworthy items about the two versions of `send` function shown here

       producer.send(new ProducerRecord(topics, s"key ${k}", "oh the value!"))

      // with example callback
      // producer.send(new ProducerRecord(topics,
                                  s"key ${k}",
                                 "oh the value!"),
                                        callback)

First, the obvious and shown in the screencast.  The `send` function can accept a `Callback` argument which will be invoked on message receipt acknowledgment.  This was demonstrated in the screencast.   The callback implementation will receive two arguments to the overridden implementation of `onCompletion`.  `metadata` includes the partition and offset of the sent message.  `exception` includes approximately 10 variations of exceptions which may be sent.

Second, you may remember the internal mechanics of sending messages include leader determination of topic partitions.  Do you remember what this means?  When a Kafka Producer wants to send a message, it needs to determine the leader (a particular node) of the topic partition.  Any Kafka node in the cluster may answer the Producer’s question on which nodes are alive and who the leaders are for particular partitions in a given topic.  Answering these questions allows the Producer to route the message appropriately.

Noteworthy

Starting with Kafka 0.11 the Kafka Producer supports two additional modes beyond `send`.  They include transactional and idempotent producers.  Let me know if we should cover these two additional capabilities in the future?

Kafka Producer Conclusion

In this tutorial, we created and executed a Kafka Producer in Scala.  Did it help?  Or would you prefer a Java example?  Let me know and let me know if you have any questions or suggestions for improvement.  Enjoy yourselves, people.  Try to be good to one another.

Kafka Producer References

Kafka Consumer

Kafka Consumer

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run an example of Kafka Consumer, so you can gain the confidence to develop and deploy your own Kafka Consumer applications.  At the end of this Kafka Consumer tutorial, you’ll have both the source code and screencast of how to run and customize the Kafka Consumer example.

Kafka Consumer Background

As a quick background, recall Kafka Consumers are applications which read messages from Kafka topic partitions.  As previously described in Apache Kafka Overview, Kafka Consumers read messages from Kafka topics in sequential order through use of offsets.  Ordering is a key feature of Apache Kafka and it is what differentiates it from more traditional pub/sub messaging systems.

Kafka messages do not need to be a particular format such as JSON or Avro or plain-text.  There is not a required, formal agreement of data format between Kafka Consumers and Kafka Producers.  In these cases, there is an agreement on what Producers and Consumers can expect in data formats for particular topics.  For those looking to implement more rigidity in data formats and schema validation, there is the option to implement the Confluent Schema Registry with the Avro to ensure a data format contract, but we’ll cover that at a later time.

Three key settings for Kafka Consumers related to how they process messages according to offsets are `auto.offset.reset`, `enable.auto.commit` and `auto.commit.interval.ms`.   Let’s explore this a bit now.

Kafka Consumers will commit their current offset location back to Kafka every 5 seconds by default and when `enable.auto.commit` is set to true (again, the default).  You can change the frequency at which the commits happen by changing the `auto.commit.interval.ms` value.  We’ll show examples and explore this further later in the tutorial.

Kafka Consumer Architecture

When considering the mechanics or flow of data from topics to Consumers\, you may wonder if it is a pull or push action?  In other words, do Kafka Consumers pull messages from Kafka Topics or are messages pushed to Kafka Consumers?  The answer is “pull” based mechanics.  Because one the design goals is allowing consumers to read messages at the best rate for themselves, Kafka Consumers are in control of pulling messages.  A push model would take control away from the Kafka Consumer.   

On the subject of Kafka Consumer mechanics, you should be aware of the differences between older and newer Kafka Consumer clients.  Older Kafka clients depended on ZooKeeper for Kafka Consumer group management, while new clients use a group protocol built into Kafka itself.  Using this group protocol, one of the brokers is designated as the Consumer group’s coordinator and is responsible for managing both the members of the group as well as their partition assignments.

As noted in the previous paragraph, Kafka Consumers may be deployed and configured as Kafka Consumer Groups which can be utilized to increase throughput.  Although you will see reference to Consumer Group ID settings in the source code below, we will not be covering the concept of grouping Kafka Consumers in any depth within this tutorial.

Kafka Consumer Example

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run a Kafka Consumer.  We’ll use Scala in this example, but the concepts hold true regardless of which language you choose to use.   See the link for Kafka Clients in the Reference section below for alternative language options.

Some of the commands used to send test data to the `example-topic` in the screencast included:

`cat words.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic example-topic`

`kafka-console-producer --broker-list localhost:9092 --topic example-topic --property parse.key=true --property key.separator=,`

And then we sent keys and values separated by a comma and ended with Ctrl-D

Next, we explored reading a topic from the beginning rather than the latest, so we changed some default properties and the consumer group id.

Kafka Consumer Example Source Code

Let’s examine the Kafka Consumer relevant source code.  To start, notice how we are setting various required properties as shown here

p.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

// if we want to adjust defaults
// p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // default is latest
// p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // default 5000 - change how often to commit offsets
// p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) // default 5000 - change how often to commit offsets

We discussed and tried experimenting with most of these properties during the screencast.  If you have any questions, check out the screencast first.

Later, we spawn our Kafka Consumer, set the topic to subscribe, and start polling every second

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(Collections.singletonList(this.topics))

while (true) {
  val records = consumer.poll(1000).asScala

  for (record <- records) {
    println("Message: (key: " +
      record.key() + ", with value: " + record.value() +
      ") at on partition " + record.partition() + " at offset " + record.offset())
  }

We’re not doing anything interesting with messages other than displaying keys, values and metadata.  What do you notice about the parition and offset values?  Do you see examples of strict ordering in the partition and offset values?

Finally, we should quickly call out the commented out code towards the end of the `main` function

// if you don't want to auto commit offset which is default
// comment out below and adjust properties above
/*
try
  consumer.commitSync
catch {
  case e: CommitFailedException =>
   // possible rollback of processed records
   // which may have failed to be delivered to ultimate destination
}
*/

By default, Kafka will commit it’s current offset back to Kafka every 5 seconds automatically.  But what if processing fails from the time messages were received until the time the offset is committed back?  Is there a chance to lose data?  Is there a chance to duplicate data?  This is covered in much more detail in the Kafka delivery guarantees post, so I recommend you check that out if interested in learning more.

For now, this commented code and properties such as `ENABLE_AUTO_COMMIT_CONFIG` shown above indicate options you have to tune how delivery vs processing guarantees.

Kafka Consumer Conclusion

I hope you enjoyed and learned from this Kafka Consumer tutorial.  Do let me know if you have any questions or suggestions for improvement.  Thanks!  Next up, you may wish to try the Kafka Producer tutorial and example or move back to the list of Kafka Tutorials.

Kafka Consumer References

Kafka Consumer featured image https://pixabay.com/en/alcohol-drink-alkolismus-bottles-64164/

Kafka Consumer Groups by Example

Kafka Consumer Group Example

Kafka Consumer Groups are the way to horizontally scale out event consumption from Kafka topics… with failover resiliency.  “With failover resiliency” you say!?  That sounds interesting.  Well, hold on, let’s leave out the resiliency part for now and just focus on scaling out.  We’ll come back to resiliency later.

When designing for horizontal scale-out, let’s assume you would like more than one Kafka Consumer to read in parallel with another.  Why?  Maybe you are trying to answer the question “How can we consume and process more quickly?”

Now, we’ve covered Kafka Consumers in a previous tutorial, so you may be wondering, how are Kafka Consumer Groups the same or different?   Well, don’t you wonder too long my Internet buddy, because you are about to find out, right here and right now.  Ready!?  Of course, you are ready, because you can read.  Also, if you like videos, there’s an example of Kafka Consumer Groups waiting for you below too.  The video should cover all the things described here.  Are you ready for a good time?  That should be a song.

Anyhow, first some quick history and assumption checking…

Like many things in Kafka’s past, Kafka Consumer Groups use to have a Zookeeper dependency.  But starting in 0.9, the Zookeeper dependency was removed.  So, if you are revisiting Kafka Consumer Groups from previous experience, this may be news to you.  There’s a link in the Reference section below which you might want to check out if you are interested in learning more about the dependency history.

Kafka Topic Fundamentals

First off, in order to understand Kafka Consumer Groups, let’s confirm our understanding of how Kafka topics are constructed.  Recall that Kafka topics consist of one or more partitions.  There is even a configuration setting for the default number of partitions created for each topic in the server.properties files called `num.partitions` which is by default set to 1.  More partitions allow more parallelism.  Or, put another way and as we shall see shortly, allow more than one Consumer to read from the topic.

To see partitions in topics visually, consider the following diagrams.

A Kafka topic with a single partition looks like this

Kafka Consumer Groups Example One Single Partition Topic
Kafka Consumer Groups Example One

A Kafka Topic with four partitions looks like this

Kafka Consumer Groups Example 2 Four Partitions in a Topic
Kafka Consumer Groups Example 2 Four Partitions in a Topic

And note, we are purposely not distinguishing whether or not the topic is being written from a Producer with particular keys.

Kafka Consumer Group Essentials

Now, if we visualize Consumers working independently (without Consumer Groups) compared to working in tandem in a Consumer Group, it can look like the following example diagrams.

Without Consumer Groups

Kafka Consumer Groups Example 3
Kafka Consumer Groups Example 3

With Consumer Groups

Kafka Consumer Groups Example 4
Kafka Consumer Groups Example 4

Rules of the road

So, to recap, it may be helpful to remember the following rules:

  • Each partition in a topic will be consumed by exactly one Consumer in a Consumer Group
  • Now, one Consumer may consume from more than one partition, if there are fewer Consumers in a Consumer Group than there are topic partitions
  • If you like deploying with efficient use of resources (and I highly suspect you do), then the number of consumers in a Consumer Group should equal or less than partitions, but you may also want a standby as described in this post’s accompanying screencast.

A quick comment on that last bullet point-- here’s the “resiliency” bit.  Deploying more Consumers than partitions might be redundancy purposes and avoiding a single point of failure; what happens if my one consumer goes down!?  As we’ll see in the screencast, an idle Consumer in a Consumer Group will pick up the processing if another Consumer goes down.

If bullet points are not your thing, then here’s another way to describe the first two bullet points. Let’s say you N number of consumers, well then you should have at least N number of partitions in the topic. Or, put a different way, if the number of consumers is greater than the number of partitions, you may not be getting it because any additional consumers beyond the number of partitions will be sitting there idle.  We show an example of this in the video later.

 

Kafka Consumer Group Example

Alright, enough is enough, right.  Let’s get to some code.  In the Consumer Group screencast below, call me crazy, but we are going to use code from the previous examples of Kafka Consumer and Kafka Producer.

We are going to configure IntelliJ to allow us to run multiple instances of the Kafka Consumer.  This will allow us to run multiple Kafka Consumers in the Consumer Group and simplify the concepts described here.  I show how to configure this in IntelliJ in the screencast if you are interested.

And again, the source code may be downloaded from https://github.com/tmcgrath/kafka-examples

Kafka Consumer Groups Examples Pictures and Demo

In the following screencast, let’s cover Kafka Consumer Groups with diagrams and then run through a demo.

The link to the Github repo used in the demos is available below.  Here are the bullet points of running the demos yourself

  • Kafka and Zookeeper are running.  Run list topics to show everything running as expected.
  • Create an example topic with 2 partitions with bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic example-topic
  • Run Consumer 1 (show how to run more than one instance in IntelliJ), Run Consumer 2 (after editing) with different group id and show the output.  Run Producer.  Both consume everything and notice how there is no sequential ordering because Consumers are consuming from both partitions.  Ordering is on a per partition basis.
  • What happens if we change these Consumers to be part of the same group?  Stop Consumers and producer, set new Group id, rerun both, which shows round robin results because the key is unique for each message.
  • What happens if we add a third consumer?  Fire up a third consumer and show how it’s idle, kill one consumer and then re-run the producer to show how consumer 3 is now engaged (note: session.timeout.ms and `heartbeat.interval.ms`)
  • Stop all running consumers and producers.  Restart consumers (notice even though earliest it’s reading… this because already read and stored in `__consumer_offsets`, run Producer with key “red” vs “blue” vs. “green” and now highlight how each consumer goes to each partition in order.  Run a few times.  Note: murmur2 hash on key and the run mod 2 (based on the number of partitions.)
  • Repeat the previous step but use a topic with 3 partitions
  • Repeat the previous step but use a new topic with 4 partitions

Why Kafka Consumer Groups?

In distributed computing frameworks, the capability to pool resources to work in collaboration isn’t new anymore, right?  In other words, you may be asking “why Kafka Consumer Groups?”  What makes Kafka Consumer Groups so special?  Good question, thanks for asking.

To me, the first reason is how the pooling of resources is coordinated amongst the “workers”.  I put “workers” in quotes because the naming may be different between frameworks.  But in this case, “workers” is essentially an individual process performing work in conjunction with other processes in a group or pool.  Multiple processes working together to “scale out”.  In Kafka Consumer Groups, this worker is called a Consumer.

The coordination of Consumers in Kafka Consumer Groups does NOT require an external resource manager such as YARN.  The capability is built into Kafka already.   This is an attractive differentiator for horizontal scaling with Kafka Consumer Groups.  Put another way, if you want to scale out with an alternative distributed cluster framework, you’re going to need to run another cluster of some kind and that may add unneeded complexity.

Now, another reason to invest in understanding Kafka Consumer Groups is if you are using other components in the Kafka ecosystem such as Kafka Connect or Kafka Streams.  Both Kafka Connect and Kafka Streams utilize Kafka Consumer Groups behind the scenes, but we’ll save that for another time.  Or if you have any specific questions or comments, let me know in the comments.

References

Kafka examples source code used in this post

What is a Kafka Consumer?

Kafka Consumer documentation

Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client

 

Credits

Kafka Consumer Groups Post image by かねのり 三浦