Kafka Producer in Scala


Kafka Producers are one of the options to publish data events (messages) to Kafka topics.  Kafka Producers are custom coded in a variety of languages through the use of Kafka client libraries.  The Kafka Producer API allows messages to be sent to Kafka topics asynchronously, so they are built for speed, but also Kafka Producers have the ability to process receipt acknowledgments from the Kafka cluster, so they can be as safe as you desire as well.

For a refresher on this, please see What is Kafka?

Objective

In this Kafka Producer tutorial, let’s examine a Kafka Producer example and highlight some of the key features and customization options.  It assumes the reader is already familiar with Kafka architectural components such as Producers, Consumers, and Topics.  For a list of other Kafka resources, see Kafka Tutorials page.

On the surface, the idea behind a Kafka Producer is simple.  Send messages to topics.  In Kafka Producer example in this tutorial, we’re going with an easy example of sending to a topic with a single partition.  However, in larger environments, the dynamics of optimized Kafka Producer performance changes.  For example, in production deployments, we will want to experiment and test different batch sizes, compression and possibly even custom partitioners.  Factors which will affect these tests and experiments include the configured replication factor of the topic(s) in question.   We’ll touch upon some of these options and design decisions throughout this tutorial.

Kafka Producer Example Screencast

Before we begin analysis of the Kafka Producer example client source code, let’s show how to run the example in the following screencast

Outside of running and debugging in IntelliJ, we also use the `kafka-console-consumer` command-line tool to inspect messages being sent to the topic in a few different ways including

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic example-topic

and

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic example-topic --property print.key=true --property key.separator=:: --from-beginning

Source Code

Developing Kafka Producers is similar to developing Kafka Consumers by which a Kafka client library is made available to your source code project.   As we saw in the Kafka Consumer tutorial, if you are using a build tool like SBT or Maven, it’s just a matter of adding the library as a dependency, for example

val kafka = "0.10.2.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafka

Let’s start by looking at some of the option Kafka Producer configuration options that are commented out by default.

p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, 0)
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384)
p.put(ProducerConfig.LINGER_MS_CONFIG, 1)
p.put(ProducerConfig.RETRIES_CONFIG, "TODO")
p.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432)

As we saw in the above screencast, these variables are instances of `static final String` and are provided for conveniently setting the underlying configuration key.  For example, `ACKS_CONFIG` default setting in the underlying Java code is:

public static final String ACKS_CONFIG = "acks";

As a Kafka developer, administrator, or architect, you have options on how to design the desired performance characteristics of your Kafka Producer.  These Kafka Producer configuration options are your first stop for tuning for both speed and safety.  (Note: we’re not going to cover all the configuration options; just the ones in this example.)

See also  Kafka Consumer Groups by Example

Kafka Producer `ack` configuration

`ack` specifies the number of replicas which need to respond in order to consider the message as committed.  Recall each topic in a Kafka cluster has a designated leader and possibly a set of replicas among the brokers.  All writes to a particular partition must go the topic partition leader.  Then, replicas fetch from this leader in order to keep in sync.  With the `ack` setting, the partition leader can wait until the configured amount of replicas have synched before sending a committed response back to the Producer.

Kafka Producer `ack` configuration gives the producer developer control over message durability safety at the cost of throughput.

The strongest durability guarantee is setting `acks` to `all` as shown in the commented out code.  With this `ack` setting, it guarantees the partition leader accepted the write request, but also, it was successfully synchronized to all the replicas.  Other options for `ack` includes 0 and 1.  The default setting is 1.

Kafka Producer Batch Size Configuration

Kafka Producers may attempt to collect messages into batches before sending to leaders in an attempt to improve throughput. Use batch.size (or `BATCH_SIZE_CONFIG` as seen in this example.  Remember it’s a convenience mapping) to control the max size in bytes of each message batch.  You can use `linger.ms` to give more time for batches to fill.

You may be wondering, how do batch transmission and topic partitioning co-exist?

Producers maintain buffers (batches) of unsent records for each partition according to the `batch.size` setting. Increasing the value will result in more batching at the cost of requiring more memory.

Kafka Producer Compression

As described in the previous section, if the Kafka Producer is transmitting a batch of messages, is there an option to compress the batch payload?  Yes, with `compression.type` setting.  The default is `none` but may be set to `gzip`, `snappy`, or `lz4`.

See also  Kafka Consumer in Scala

Kafka Producer Asynchronous `send`

A few noteworthy items about the two versions of `send` function shown here

producer.send(new ProducerRecord(topics, s"key ${k}", "oh the value!"))

// alternative example with callback
// producer.send(new ProducerRecord(topics,
//                                  s"key ${k}",
//                                 "oh the value!"),
//                                        callback)

First, the obvious and shown in the screencast.  The `send` function can accept a `Callback` argument which will be invoked on message receipt acknowledgment.  This was demonstrated in the screencast.   The callback implementation will receive two arguments to the overridden implementation of `onCompletion`.  `metadata` includes the partition and offset of the sent message.  `exception` includes approximately 10 variations of exceptions which may be sent.

Second, you may remember the internal mechanics of sending messages include leader determination of topic partitions.  Do you remember what this means?  When a Kafka Producer wants to send a message, it needs to determine the leader (a particular node) of the topic partition.  Any Kafka node in the cluster may answer the Producer’s question on which nodes are alive and who the leaders are for particular partitions in a given topic.  Answering these questions allows the Producer to route the message appropriately.

Noteworthy

Starting with Kafka 0.11 the Kafka Producer supports two additional modes beyond `send`.  They include transactional and idempotent producers.  Let me know if we should cover these two additional capabilities in the future?

Kafka Producer Conclusion

In this tutorial, we created and executed a Kafka Producer in Scala.  Did it help?  Or would you prefer a Java example?  Let me know and let me know if you have any questions or suggestions for improvement.  Enjoy yourselves, people.  Try to be good to one another.

See also  Best Ways to Determine Apache Kafka Version [1 Right and 2 Wrong Ways]

Kafka Producer References

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment