Kafka Producer

Kafka Producer Example

Kafka Producers are one of the options to publish data events (messages) to Kafka topics.  Kafka Producers are custom coded in a variety of languages through the use of Kafka client libraries.  The Kafka Producer API allows messages to be sent to Kafka topics asynchronously, so they are built for speed, but also Kafka Producers have the ability to process receipt acknowledgments from the Kafka cluster, so they can be as safe as you desire as well.

In this Kafka Producer tutorial, let’s examine a Kafka Producer example and highlight some of the key features and customization options.  It assumes the reader is already familiar with Kafka architectural components such as Producers, Consumers, and Topics.  For a quick overview of both Kafka Producer and Consumer architecture, see Apache Kafka Overview.

On the surface, the idea behind a Kafka Producer is simple.  Send messages to topics.  In Kafka Producer example in this tutorial, we’re going with an easy example of sending to a topic with a single partition.  However, in larger environments, the dynamics of optimized Kafka Producer performance changes.  For example, in production deployments, we will want to experiment and test different batch sizes, compression and possibly even custom partitioners.  Factors which will affect these tests and experiments include the configured replication factor of the topic(s) in question.   We’ll touch upon some of these options and design decisions throughout this tutorial.

Kafka Producer Example Screencast

Before we begin analysis of the Kafka Producer example client source code, let’s show how to run the example in the following screencast

Kafka Producer Example

 

Outside of running and debugging in IntelliJ, we also use the `kafka-console-consumer` command-line tool to inspect messages being sent to the topic in a few different ways including

`./bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic example-topic`

and

`./bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic example-topic –property print.key=true –property key.separator=:: –from-beginning`

Kafka Producer Source Code

Developing Kafka Producers is similar to developing Kafka Consumers by which a Kafka client library is made available to your source code project.   As we saw in the Kafka Consumer tutorial, if you are using a build tool like SBT or Maven, it’s just a matter of adding the library as a dependency, for example

val kafka = "0.10.2.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafka

Let’s start by looking at some of the option Kafka Producer configuration options that are commented out by default.

p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, 0)
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384)
p.put(ProducerConfig.LINGER_MS_CONFIG, 1)
p.put(ProducerConfig.RETRIES_CONFIG, "TODO")
p.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432)

As we saw in the above screencast, these variables are instances of `static final String` and are provided for conveniently setting the underlying configuration key.  For example, `ACKS_CONFIG` default setting in the underlying Java code is:

public static final String ACKS_CONFIG = "acks";

As a Kafka developer, administrator, or architect, you have options on how to design the desired performance characteristics of your Kafka Producer.  These Kafka Producer configuration options are your first stop for tuning for both speed and safety.  (Note: we’re not going to cover all the configuration options; just the ones in this example.)

Kafka Producer `ack` configuration

`ack` specifies the number of replicas which need to respond in order to consider the message as committed.  Recall each topic in a Kafka cluster has a designated leader and possibly a set of replicas among the brokers.  All writes to a particular partition must go the topic partition leader.  Then, replicas fetch from this leader in order to keep in sync.  With the `ack` setting, the partition leader can wait until the configured amount of replicas have synched before sending a committed response back to the Producer.

Kafka Producer `ack` configuration gives the producer developer control over message durability safety at the cost of throughput.

The strongest durability guarantee is setting `acks` to `all` as shown in the commented out code.  With this `ack` setting, it guarantees the partition leader accepted the write request, but also, it was successfully synchronized to all the replicas.  Other options for `ack` includes 0 and 1.  The default setting is 1.

Kafka Producer Batch Size Configuration

Kafka Producers may attempt to collect messages into batches before sending to leaders in an attempt to improve throughput. Use batch.size (or `BATCH_SIZE_CONFIG` as seen in this example.  Remember it’s a convenience mapping) to control the max size in bytes of each message batch.  You can use `linger.ms` to give more time for batches to fill.

You may be wondering, how do batch transmission and topic partitioning co-exist?

Producers maintain buffers (batches) of unsent records for each partition according to the `batch.size` setting. Increasing the value will result in more batching at the cost of requiring more memory.

Kafka Producer Compression

As described in the previous section, if the Kafka Producer is transmitting a batch of messages, is there an option to compress the batch payload?  Yes, with `compression.type` setting.  The default is `none` but may be set to `gzip`, `snappy`, or `lz4`.

Kafka Producer Asynchronous `send`

A few noteworthy items about the two versions of `send` function shown here

       producer.send(new ProducerRecord(topics, s"key ${k}", "oh the value!"))

      // with example callback
      // producer.send(new ProducerRecord(topics,
                                  s"key ${k}",
                                 "oh the value!"),
                                        callback)

First, the obvious and shown in the screencast.  The `send` function can accept a `Callback` argument which will be invoked on message receipt acknowledgment.  This was demonstrated in the screencast.   The callback implementation will receive two arguments to the overridden implementation of `onCompletion`.  `metadata` includes the partition and offset of the sent message.  `exception` includes approximately 10 variations of exceptions which may be sent.

Second, you may remember the internal mechanics of sending messages include leader determination of topic partitions.  Do you remember what this means?  When a Kafka Producer wants to send a message, it needs to determine the leader (a particular node) of the topic partition.  Any Kafka node in the cluster may answer the Producer’s question on which nodes are alive and who the leaders are for particular partitions in a given topic.  Answering these questions allows the Producer to route the message appropriately.

Noteworthy

Starting with Kafka 0.11 the Kafka Producer supports two additional modes beyond `send`.  They include transactional and idempotent producers.  Let me know if we should cover these two additional capabilities in the future?

KAFKA PRODUCER Conclusion

In this tutorial, we created and executed a Kafka Producer in Scala.  Did it help?  Or would you prefer a Java example?  Let me know and let me know if you have any questions or suggestions for improvement.  Enjoy yourselves out there people.  Try to be good to one another.

Kafka Producer References

 

Kafka Consumer

Kafka Consumer

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run an example of Kafka Consumer, so you can gain confidence to develop and deploy your own Kafka Consumer applications.  At the end of this Kafka Consumer tutorial, you’ll have both the source code and screencast of how to run and customize the Kafka Consumer example.

As a quick background, recall Kafka Consumers are applications which read messages from Kafka topic partitions.  As previously described in Apache Kafka Overview, Kafka Consumers read messages from Kafka topics in sequential order through use of offsets.  Ordering is a key feature of Apache Kafka and it is what differentiates it from more traditional pub/sub messaging systems.

Kafka messages do not need to be a particular format such as JSON or Avro or plain-text.  There is not a required, formal agreement of data format between Kafka Consumers and Kafka Producers.  In these cases, there is an agreement on what Producers and Consumers can expect in data formats for particular topics.  For those looking to implement more rigidity in data formats and schema validation, there is the option to implement the Confluent Schema Registry with the Avro to ensure a data format contract, but we’ll cover that at a later time.

Three key settings for Kafka Consumers related to how they process messages according to offsets are `auto.offset.reset`, `enable.auto.commit` and `auto.commit.interval.ms`.   Let’s explore this a bit now.

Kafka Consumers will commit their current offset location back to Kafka every 5 seconds by default and when `enable.auto.commit` is set to true (again, the default).  You can change the frequency at which the commits happen by changing the `auto.commit.interval.ms` value.  We’ll show examples and explore this further later in the tutorial.

Kafka Consumer Architecture

When considering the mechanics or flow of data from topics to Consumers, you may wonder if it is a pull or push action?  In other words, do Kafka Consumers pull messages from Kafka Topics or are messages pushed to Kafka Consumers?  The answer is “pull” based mechanics.  Because one the design goals is allowing consumers to read messages at the best rate for themselves, Kafka Consumers are in control of pulling messages.  A push model would take control away from the Kafka Consumer.   

On the subject of Kafka Consumer mechanics, you should be aware of the differences between older and newer Kafka Consumer clients.  Older Kafka clients depended on ZooKeeper for Kafka Consumer group management, while new clients use a group protocol built into Kafka itself.  Using this group protocol, one of the brokers is designated as the Consumer group’s coordinator and is responsible for managing both the members of the group as well as their partition assignments.

As noted in the previous paragraph, Kafka Consumers may be deployed and configured as Kafka Consumer Groups which can be utilized to increase throughput.  Although you will see reference to Consumer Group ID settings in the source code below, we will not be covering the concept of grouping Kafka Consumers in any depth within this tutorial.

Kafka Consumer Example

In this Kafka Consumer tutorial, we’re going to demonstrate how to develop and run a Kafka Consumer.  We’ll use Scala in this example, but the concepts hold true regardless of which language you choose to use.   See the link for Kafka Clients in the Reference section below for alternative language options.

Kafka Consumer Example

Some of the commands used to send test data to the `example-topic` in the screencast included:

`cat words.txt | ./bin/kafka-console-producer –broker-list localhost:9092 –topic example-topic`

`kafka-console-producer –broker-list localhost:9092 –topic example-topic –property parse.key=true –property key.separator=,`

And then we sent keys and values separated by a comma and ended with Ctrl-D

Next, we explored reading a topic from the beginning rather than the latest, so we changed some default properties and the consumer group id.

Kafka Consumer Example Source Code

Let’s examine the Kafka Consumer relevant source code.  To start, notice how we are setting various required properties as shown here

p.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

// if we want to adjust defaults
// p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // default is latest
// p.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // default 5000 - change how often to commit offsets
// p.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) // default 5000 - change how often to commit offsets

We discussed and tried experimenting with most of these properties during the screencast.  If you have any questions, check out the screencast first.

Later, we spawn our Kafka Consumer, set the topic to subscribe, and start polling every second

val consumer = new KafkaConsumer[String, String](props)

consumer.subscribe(Collections.singletonList(this.topics))

while (true) {
  val records = consumer.poll(1000).asScala

  for (record <- records) {
    println("Message: (key: " +
      record.key() + ", with value: " + record.value() +
      ") at on partition " + record.partition() + " at offset " + record.offset())
  }

We’re not doing anything interesting with messages other than displaying keys, values and metadata.  What do you notice about the parition and offset values?  Do you see examples of strict ordering in the partition and offset values?

Finally, we should quickly call out the commented out code towards the end of the `main` function

// if you don't want to auto commit offset which is default
// comment out below and adjust properties above
/*
try
  consumer.commitSync
catch {
  case e: CommitFailedException =>
   // possible rollback of processed records
   // which may have failed to be delivered to ultimate destination
}
*/

By default, Kafka will commit it’s current offset back to Kafka every 5 seconds automatically.  But what if processing fails from the time messages were received until the time the offset is committed back?  Is there a chance to lose data?  Is there a chance to duplicate data?  This is covered in much more detail in the Kafka delivery guarantees post, so I recommend you check that out if interested in learning more.

For now, this commented code and properties such as `ENABLE_AUTO_COMMIT_CONFIG` shown above indicate options you have to tune how delivery vs processing guarantees.

 

KAFKA CONSUMER Conclusion

I hope you enjoyed and learned from this Kafka Consumer tutorial.  Do let me know if you have any questions or suggestions for improvement.  Thanks!

 

Kafka CONSUMER REFERENCES

 

 

Kafka Consumer featured image https://pixabay.com/en/alcohol-drink-alkolismus-bottles-64164/

 

Apache Kafka Architecture – Delivery Semantics

Kafka architecture delivery guarantees

Apache Kafka offers message delivery guarantees between producers and consumers. These delivery guarantees can be divided into three groups which include “at most once”, “at least once” and “exactly once”.

  • at most once which can lead to messages being lost, but they cannot be redelivered or duplicated
  • at least once ensures messages are never lost but may be duplicated
  • exactly once guarantees each message processing (not delivery) happens once and only once

 

Which option sounds the most appealing?  The choice is obvious, right?

On the surface, everyone wants “exactly once”.    But, what if their overall equation includes more than just Kafka processing.  For example what if the architecture plan includes consuming from Kafka and writing to a data lake?  Now things become more complicated.

This is the point where I pause, reflect and smile and think of the old Jimmy Cliff song “You can get it if you really want it”.

Simply put, there are costs and considerations with each of the processing options.  Considerations include additional performance burdens which increase latency, the additional complexity of your implementation which may make your design more brittle, etc.

When considering, it is important to break things down into two different aspects which include durability guarantees provided for the message producer and the message consumer.

Also, although software stacks may promise to deliver each message once and only once or in other words, exactly once, the devil is in the details. Some delivery semantics of exactly once does not cover cases where a consumer writes to disk or external systems which fail outside of Kafka.  You need to consider you’re entire data flow use case before assuming exactly once.  

Kafka Architecture – Delivery Semantics from a Producer’s Perspective

When a message has been published in Kafka, it is essentially stored in a log.  Kafka’s has the option to configure replication of messages through replication factor settings but it will not be covered here.

Since Kafka 0.11, Kafka producers includes two options for tuning your desired delivery guarantee: idempotent and transactional producers. The idempotent option will no longer introduce duplicates.  This option essentially changes Kafka’s delivery semantics from `at least once` to `exactly once`

The transactional producer allows an application to send messages to multiple partitions atomically.  Topics which are included in transactions should be configured for durability via `replication.factor` settings.  On the other side, consumers should be configured to read only committed messages.

These strong guarantees are not required by all use cases. When it comes to latency sensitive cases, Kafka allows the producer to set a certain level of durability. Should the producer require a waiting time in order for the message to be committed, this can take on the order of 10 ms. In addition, the producer could also specify that the send has to be completed absolutely asynchronously, or it wants to delay the send until the leader receives the message. The last, however, does not necessarily include the followers.

KAFKA ARCHITECTURE – DELIVERY SEMANTICS from a Consumer’s Perspective

Recall Kafka maintains a numerical offset for each record in a partition. This offset denotes the location of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4.  Consumers may choose to store their current offset value in Kafka or outside Kafka through use of the `enable.auto.commit` configuration property.

Instead of relying on the consumer to periodically commit consumed offsets back to Kafka automatically, the Kafka Consumer API users can manually control their offsets positions. This is especially useful when the consumption of the messages is coupled with activities outside of Kafka such as writing to a data lake or warehouse.  In these cases, the consumer may choose to update their offset position only after a successful write to their data lake.  The alternative of automatic offset update is easier to implement but doesn’t account for downstream failures.  For example, if a write to the data lake fails but the consumer offset position is updated after reading from the Kafka topic, data loss in the data lake would occur.

Let’s dive into this a bit more because it is critical in your understanding of delivery guarantee semantics.

If we enable auto commit of offsets back to Kafka and fail between consuming from Kafka, it may look similar to the following in pseudo code

ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             insertDataLake(record)

 Again, pseudo-code here for illustrative purposes only.

If a failure occurs in `insertDataLake` function above and the consumer is restarted with offset commits set to automatic, there is a chance of data loss at data lake because the offset will be greater than the number of messages that have been inserted to the data lake.  This is an example of `at-most-once` delivery.  There are no duplicates in the data lake, but there is the possibility of missing records.

It might be completely fine to deploy with a slight chance of data loss is acceptable.

Conversely, if choosing to manually commit offset location, you can move towards `at-least-once` delivery guarantee.  Each record will likely be delivered one time, but in event of a failure in between consumption to write to data lake and committing back to Kafka, there is the possibility of storing duplicates in the data lake example.

props.put("enable.auto.commit", "false")
KafkaConsumer consumer = new KafkaConsumer(props)
ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             insertDataLake(record)
             // what happens if failure happens after this ^ but before the following completes?
             consumer.commitSync()

In this case, the consumer process that took over from failure would consume from last committed offset and would repeat the insert data. Now, Kafka provides “at-least-once” delivery guarantees, as each record will likely be delivered one time but in a failure case, data could be duplicated.

What hasn’t been covered until now is the choice of granularity with regards to committing offset locations.  First, we should know processing in batches of records can be much more performant (less latency) than processing individual records.  This is similar to architectural software predecessors such as using JDBC.  Processing in batches of records is available in Kafka as well.   This means you have the choice of level granularity of consuming and committing offsets in batches or individual records.

As hopefully expected, your choice of batch vs individual has consequences on performance.  There is much more overhead required to process individually rather than batch.  But, if we require “exactly once” delivery to our data lake, we may be forced to choose the individual record approach.  Alternatively, we may explore the possibility of implementing data lake with upsert support and Kafka consumer with “at least-once” processing.

To recap

Kafka At-Most-Once

Read the message and save its offset position before it possibly processes the message record in entirety; i.e. save to data lake. In case of failure, the consumer recovery process will resume from the previously saved offset position which is further beyond the last record saved to the data lake. This example demonstrates at-most-once semantics.  Data loss at data lake, for example, is possible.

Kafka At-Least-Once

The consumer can also read, process the messages and save to data lake, and afterward save its offset position.  In a failure scenario which occurs after the data lake storage and offset location update, a Kafka consumer addressing the failed process could duplicate data. This forms the “at-least-once” semantics in case of consumer failure.  Duplicates in the data lake could happen.

Kafka Exactly Once

When it comes to exactly once semantics with Kafka and external systems, the restriction is not necessarily a feature of the messaging system, but the necessity for coordination of the consumer’s position with what is stored as output such as data lake. One of the most common ways to do this is by introducing a two-phase commit between the consumer output’s storage and the consumer position’s storage.  Or in other words, taking control of offset location commits in relation to data lake storage on a per record basis.  

Conclusion

By default, Kafka provides at-least-once delivery guarantees.  Users may implement at-most-once and exactly once semantics by customizing offset location recording.  In any case, Kafka offers you the option for choosing which delivery semantics you strive to achieve.

Apache Kafka Overview

what-is-apache-kafka-sm

What is Apache Kafka?

Apache Kafka is a messaging system which can provide the foundation for data to be moved between systems without tight coupling. Apache Kafka is often defined as a distributed log service that is partitioned and possibly replicated. It provides a messaging system that is fast, scalable, durable, distributed by design, and fault-tolerant. For large-scale data systems, this is the preferred choice by many when it comes to an ideal type of communication between systems.

To have a better understanding of Kafka, let us first have a glimpse of some of the most common Apache Kafka terminologies.

First off, let’s discuss the concept of “topics”. To be able to send a message to Kafka from a Producer, you have to do so to a specific topic. The same thing holds true if you want to read a message.  You need to read from one or more specific topics from a Consumer.

You can think of it, like this:

Kafka Topics

Messages are published by topic Producers. Those who subscribe to them, on the other hand, are called Consumers.

Communication to and from Apache Kafka from Producers and Consumers is through a language agnostic TCP protocol, which is high-performing and simple.  It assumes the responsibility for facilitating communications involving servers and clients. 

Topics and Logs

A topic is considered a key abstraction in Kafka. A topic can be considered a feed name or category where the messages will be published. Each topic is further subdivided into partitions. Partitions split data across different nodes by Kafka brokers. In each of the messages within the partition, there is an ID number assigned, which is technically known as the offset.

Kafka Topic Partitions

All of the messages published can be retained in the cluster, regardless if they have been consumed or not. For instance, if the configuration states that it will be available only for one week, then it can be retained within such period only. Once the period has lapsed, it will be automatically deleted, which, in turn, will provide the system with additional space.

On a per consumer basis, only the meta-data, which is also technically known as the offset, is going to be retained. The consumers will have complete control of this retention. It is often consumed linearly, but the user can often go back and reprocess, basically because he or she is in control.

Consumers of Kafka are lightweight, cheap, and they do not have a significant impact on the way the clusters will perform.  This differs from more traditional message systems. Every consumer has the ability to maintain their own offset, and hence, their actions will not affect other consumers.

Kafka Partitions

Why partitions?  Partitions are leveraged for two purposes. The first is to adjust the size of the topic to make it fit on a single node. The second purpose of partition is for parallelism or performance tuning. It makes it possible for one consumer to simultaneously browse messages in concurrent threads. This will be further discussed later on.

Distribution

Every partition is replicated for fault tolerance. In every partition, there is a single server, which is labeled as the “Leader.”. The leader is responsible for the handling of the requests for read-write. There will also be “Followers,” which can be zero or more nodes in the Kafka cluster. They will be responsible for replicating the actions undertaken by the leader. In the case of the failure of the leader, one of the followers will immediately assume the leadership position.  We’ll cover more on Kafka fault tolerance in later tutorials.

So, Why Kafka?  When?

When compared to the conventional messaging system, one of the benefits of Kafka is that it has ordering guarantees. In the case of a traditional queue, messages are kept on the server based on the order at which they are kept. Then messages are pushed out based on the order of their storage and there is no specific timing requirement in their transmission. This means that their arrival to different consumers could be random.  Hold the phone.  Did you just write “could be random”?  Think about that for 5 seconds.  That may be fine in some use cases, but not others.  

Kafka is often deployed as the foundational element in Streaming Architectures because it can facilitate loose coupling between various components in architectures such as databases, microservices, Hadoop, data warehouses, distributed files systems, search applications, etc.

Guarantees

In a Kafka messaging system, there are essentially three guarantees:

  1. A message sent by a producer to a topic partition is appended to the Kafka log based on the order sent.
  2. Consumers see messages in the order of their log storage.
  3. For topics with replication factor N, Kafka will tolerate N-1 number of failures without losing the messages previously committed to the log.

Conclusion

Hopefully, this article provides more high-level insight into Apache Kafka architecture and Kafka benefits over traditional messaging systems.  Stay tuned for additional forthcoming tutorials on Apache Kafka.

And if there any particular “topics” 😉 you would like to see, let us know.

 

 

Featured image based on https://flic.kr/p/eEQpPj

 

Apache Kafka and Amazon Kinesis – How do they compare?

apache kafka

Apache Kafka vs. Amazon Kinesis

Like many of the offerings from Amazon Web Services, Amazon Kinesis software is modeled after an existing Open Source system.  In this case, Kinesis is modeled after Apache Kafka.

Kinesis is known to be incredibly fast, reliable and easy to operate.  Similar to Kafka, there are plenty of language specific clients available including Java, Scala, Ruby, Javascript (Node), etc.

Amazon Kinesis has a built-in cross replication while Kafka requires configuration to be performed on your own. Cross-replication is the idea of syncing data across logical or physical data centers.  Cross-replication is not mandatory, and you should consider doing so only if you need it.

Engineers sold on the value proposition of Kafka and Software-as-a-Service or perhaps more specifically Platform-as-a-Service have options besides Kinesis or Amazon Web Services.  Keep an eye on http://confluent.io.

When to use Kafka or Kinesis?

Kafka or Kinesis are often chosen as an integration system in enterprise environments similar to traditional message brokering systems such as ActiveMQ or RabbitMQ.   Integration between systems is assisted by Kafka clients in a variety of languages including Java, Scala, Ruby, Python, Go, Rust, Node.js, etc.

Other use cases include website activity tracking for a range of use cases including real-time processing or loading into Hadoop or analytic data warehousing systems for offline processing and reporting.

An interesting aspect of Kafka and Kinesis lately is the use in streaming processing.  More and more applications and enterprises are building architectures which include processing pipelines consisting of multiple stages.  For example, a multi-stage design might include raw input data consumed from Kafka topics in stage 1.  In stage 2, data is consumed and then aggregated, enriched, or otherwise transformed. Then, in stage 3, the data is published to new topics for further consumption or follow-up processing during a later stage.

Conclusion

Keep an eye on supergloo.com for more articles and tutorials on Kafka, Kinesis and other stacks used in data processing and pipelines using streams.

References

 

 

Featured image credit https://flic.kr/p/7XWaia