Kafka Consumer in Scala

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run an example of Kafka Consumer in Scala, so you can gain the confidence to develop and deploy your own Kafka Consumer applications.  At the end of this Kafka Consumer tutorial, you’ll have both the source code and screencast of how to run and customize the Kafka Consumer example.

Kafka Consumer Background

As a quick background, recall Kafka Consumers are applications which read messages from Kafka topic partitions.  As previously described in Apache Kafka Overview, Kafka Consumers read messages from Kafka topics in sequential order through use of offsets.  Ordering is a key feature of Apache Kafka and it is what differentiates it from more traditional pub/sub messaging systems.

Kafka messages do not need to be a particular format such as JSON or Avro or plain-text.  There is not a required, formal agreement of data format between Kafka Consumers and Kafka Producers.  In these cases, there is an agreement on what Producers and Consumers can expect in data formats for particular topics.  For those looking to implement more rigidity in data formats and schema validation, there is the option to implement the Confluent Schema Registry with the Avro to ensure a data format contract, but we’ll cover that at a later time.

Three key settings for Kafka Consumers related to how they process messages according to offsets are `auto.offset.reset`, `enable.auto.commit` and `auto.commit.interval.ms`.   Let’s explore this a bit now.

Kafka Consumers will commit their current offset location back to Kafka every 5 seconds by default and when `enable.auto.commit` is set to true (again, the default).  You can change the frequency at which the commits happen by changing the `auto.commit.interval.ms` value.  We’ll show examples and explore this further later in the tutorial.

See also  Multi Tenant Kafka [4 Requirements, 1 Optional]

Kafka Consumer Architecture

When considering the mechanics or flow of data from topics to Consumers\, you may wonder if it is a pull or push action?  In other words, do Kafka Consumers pull messages from Kafka Topics or are messages pushed to Kafka Consumers?  The answer is “pull” based mechanics.  Because one the design goals is allowing consumers to read messages at the best rate for themselves, Kafka Consumers are in control of pulling messages.  A push model would take control away from the Kafka Consumer.   

On the subject of Kafka Consumer mechanics, you should be aware of the differences between older and newer Kafka Consumer clients.  Older Kafka clients depended on ZooKeeper for Kafka Consumer group management, while new clients use a group protocol built into Kafka itself.  Using this group protocol, one of the brokers is designated as the Consumer group’s coordinator and is responsible for managing both the members of the group as well as their partition assignments.

As noted in the previous paragraph, Kafka Consumers may be deployed and configured as Kafka Consumer Groups which can be utilized to increase throughput.  Although you will see reference to Consumer Group ID settings in the source code below, we will not be covering the concept of grouping Kafka Consumers in any depth within this tutorial.

Kafka Consumer Example

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run a Kafka Consumer.  We’ll use Scala in this example, but the concepts hold true regardless of which language you choose to use.   See the link for Kafka Clients in the Reference section below for alternative language options.

See also  Kafka Consumer Groups by Example

Some of the commands used to send test data to the `example-topic` in the screencast included:

`cat words.txt | ./bin/kafka-console-producer –broker-list localhost:9092 –topic example-topic`

`kafka-console-producer –broker-list localhost:9092 –topic example-topic –property parse.key=true –property key.separator=,`

And then we sent keys and values separated by a comma and ended with Ctrl-D

Next, we explored reading a topic from the beginning rather than the latest, so we changed some default properties and the consumer group id.

Kafka Consumer Example Source Code

Let’s examine the Kafka Consumer relevant source code.  To start, notice how we are setting various required properties as shown here

p.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

// if we want to adjust defaults
// p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // default is latest
// p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // default 5000 - change how often to commit offsets
// p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) // default 5000 - change how often to commit offsets

We discussed and tried experimenting with most of these properties during the screencast.  If you have any questions, check out the screencast first.

Later, we spawn our Kafka Consumer, set the topic to subscribe, and start polling every second

val consumer = new KafkaConsumer[String, String](props)


while (true) {
  val records = consumer.poll(1000).asScala

  for (record <- records) {
    println("Message: (key: " +
      record.key() + ", with value: " + record.value() +
      ") at on partition " + record.partition() + " at offset " + record.offset())

We’re not doing anything interesting with messages other than displaying keys, values and metadata.  What do you notice about the parition and offset values?  Do you see examples of strict ordering in the partition and offset values?

Finally, we should quickly call out the commented out code towards the end of the `main` function

// if you don't want to auto commit offset which is default
// comment out below and adjust properties above
catch {
  case e: CommitFailedException =>
   // possible rollback of processed records
   // which may have failed to be delivered to ultimate destination

By default, Kafka will commit it’s current offset back to Kafka every 5 seconds automatically.  But what if processing fails from the time messages were received until the time the offset is committed back?  Is there a chance to lose data?  Is there a chance to duplicate data?  This is covered in much more detail in the Kafka delivery guarantees post, so I recommend you check that out if interested in learning more.

See also  Kafka Configuration with kafka-configs.sh [Tutorial with 4 Examples]

For now, this commented code and properties such as `ENABLE_AUTO_COMMIT_CONFIG` shown above indicate options you have to tune how delivery vs processing guarantees.

Kafka Consumer Conclusion

I hope you enjoyed and learned from this Kafka Consumer tutorial.  Do let me know if you have any questions or suggestions for improvement.  Thanks!  Next up, you may wish to try the Kafka Producer tutorial and example or move back to the list of Kafka Tutorials.

Kafka Consumer References

Kafka Consumer featured image https://pixabay.com/en/alcohol-drink-alkolismus-bottles-64164/

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment