Kafka Streams – Transformations Examples

Kafka Streams Transformations

Kafka Streams Transformations provide the ability to perform actions on Kafka Streams such as filtering and updating values in the stream.  Kafka Stream’s transformations contain operations such as `filter`, `map`, `flatMap`, etc. and have similarities to functional combinators found in languages such as Scala.  And, if you are coming from Spark, you will also notice similarities to Spark Transformations.  But, even if you don’t have experience with combinators or Spark, we’ll cover enough examples of Kafka Streams Transformations in this post for you to feel comfortable and gain confidence through hands-on experience.  We’re going to cover examples in Scala, but I think the code would readable and comprehensible for those of you with a Java preference as well.

Kafka Stream Transformations are available from `KTable` or `KStream` and will result in one or more `KTable`, `KStream` or `KGroupedTable` depending on the transformation function.  We’ll cover examples of various inputs and outputs below.

Before we go into the source code examples, let’s cover a little background and also a screencast of running through the examples.

Kafka Streams Transformation Types

Kafka Streams Transformations are available in two types: Stateless and Stateful.

Stateless transformations do not require state for processing.  For example, let’s imagine you wish to filter a stream for all keys starting with a particular string in a stream processor.  In this case, Kafka Streams doesn’t require knowing the previous events in the stream.  It simply performs each filtering operation on the message and moves on.  Conversely, let’s say you wish to sum certain values in the stream.  In this case, you would need “state” to know what has been processed already in previous messages in the stream in order to keep a running tally of the sum result.

As previously mentioned, stateful transformations depend on maintaining the state of the processing.  To maintain the current state of processing the input and outputs, Kafka Streams introduces a construct called a State Store.  Operations such as aggregations such as the previous sum example and joining Kafka streams are examples of stateful transformations.

KAFKA STREAMS TRANSFORMATIONS Source Code

All the source code is available from my Kafka Streams Examples repo on Github.

Kafka Streams Transformations Screencast

Before we begin going through the Kafka Streams Transformation examples, I’d recommend viewing the following short screencast where I demonstrate how to run the Scala source code examples in IntelliJ.

Kafka Streams Transformations

 

Kafka Streams Transformations Examples Scala Source Code

The following Kafka Streams transformation examples are primarily examples of stateless transformations.  Let me know if you want some stateful examples in a later post.  I do plan to cover aggregating and windowing in a future post.  Also, related to stateful Kafka Streams joins, you may wish to check out the previous Kafka Streams joins post.

It is recommended to watch the short screencast above, before diving into the examples.

Kafka Streams Transformation Examples


branch
filter
flatMap
map
groupBy

`branch`

The `branch` function is used to split a KStream by the supplied predicates into one of more KStream results.  In this Kafka Streams Transformations tutorial, the `branch` example had three predicates: two filters for key name and one default predicate for everything else.

This is the example implementation

val results: Array[KStream[String, String]] = inputStream.branch(
  (key, value) => key.startsWith(keyFilter1),
  (key, value) => key.startsWith(keyFilter2),
  (key, value) => true
)

and we tested the expected results for filters on “sensor-1” and “sensor-2” and a default.

storeOne.get("sensor-1") shouldBe "MN"
storeOne.get("sensor-11") shouldBe "IL"
storeTwo.get("sensor-2") shouldBe "WI"

`filter`

The ‘filter` function can filter either a KTable or KStream to produce a new KTable or KStream respectively.

For our example, we used a KStream

inputStream.filter(
  (key, value) => value == keyFilter
).to(s"${keyFilter}-topic")

In this example, we use the passed in filter based on values in the KStream.

storeOne.get("sensor-1") shouldBe valFilter
storeOne.get("sensor-2") shouldBe null
storeOne.get("sensor-11") shouldBe null

`valFilter` is set to “MN” in the Spec class.

`FLATMAP`

`flatMap` performs as expected if you have used it before in Spark or Scala.  Use it to produce zero, one or more records from each input record processed.
From the Kafka Streams documentation, it’s important to note

Marks the stream for data re-partitioning: Applying a grouping or a join after flatMap will result in 
re-partitioning of the records. If possible use flatMapValues instead, which will not cause data re-partitioning.

In the example `flatMap` implementation

inputStream.flatMap {
  (key, value) => {
    expanderList.flatMap { s =>
      List((s"${s}-${value}", value))
    }
  }
}

we are using both `flatMap` from Kafka Streams as well as `flatMap` from Scala.  The intention is to show creating multiple new records for each input record.

 

`MAP`

Where `flatMap` may produce multiple records from a single input record, `map` is used to produce a single output record from an input record.  I like to think of it as one-to-one vs the potential for `flatMap` to be one-to-many.

In the implementation example

val outputStream = inputStream.map {
  (key, value) => (key, s"${value}-new")
  }.to(resultTopic)

Here we simply create a new key, value pair with the same key, but an updated value.

In the tests, we test for the new values from the result stream.

storeOne.get("sensor-1") shouldBe "MN-new"
storeOne.get("sensor-2") shouldBe "WI-new"
storeOne.get("sensor-11") shouldBe "IL-new"

`GROUPBY`

In `groupBy` we deviate from stateless to stateful transformation here in order to test expected results.  In the implementation shown here

inputStream.groupBy {
  (key, value) => value
}.count()(Materialized.as(s"${storeName}"))

we are going to group by the values. Notice in the test class we are passing two records with the value of “MN” now. This will allow us to test the expected `count` results

storeOne.get("MN") shouldBe 2
storeOne.get("WI") shouldBe 1

`count` is a stateful operation which was only used to help test in this case.

Conclusion

Hope these examples helped. Do let me know if you have any questions, comments or ideas for improvement.

References

 

 

Kafka Streams Transformation Examples featured image: https://pixabay.com/en/dandelion-colorful-people-of-color-2817950/

Kafka Streams – Why Should You Care?

Why Kafka Streams?

Kafka Streams is another entry into the stream processing framework category with options to leverage from either Java or Scala.  In this post, we’ll describe what is Kafka Streams, features and benefits, when to consider, how-to Kafka Stream tutorials, and external references.  Ultimately, the goal of this post is to answer the question, why should you care?

What is Kafka Streams?

Kafka Streams is a client library for processing and analyzing data stored in Kafka. Developers use the Kafka Streams library to build stream processor applications when both the stream input and stream output are Kafka topic(s).  We’ll cover stream processors and stream architectures throughout this tutorial.

Some of you may be wondering, why Kafka Streams over Kafka Connect or writing your own Kafka Consumer or Kafka Producer?  What makes Kafka Streams different?

Well, to answer those questions, we should note one key difference right from the start.

As mentioned, Kafka Streams is used to write stream processors where the input and output are Kafka topics.  Visually, an example of a Kafka Streams architecture may look like the following.

Kafka Streams Example
Kafka Streams Example

 

As see above, both the input and output of Kafka Streams applications are Kafka topics.

From this image, it appears Kafka Consumer and Kafka Producer APIs are being used.  Is either of these APIs used in Kafka Streams based apps?  Well, the answer is yes and no.  Kafka Streams builds upon Kafka Producer and Consumer libraries, so it is able to leverage the native capabilities of Kafka such as fault tolerance, distributed coordination, and parallelism.  But, it doesn’t expose the APIs directly.  Instead, input and output to Kafka topics in Kafka Streams apps are available via Consumer and Producer abstractions such as the Kafka Streams DSL and Processor API.  You’ll learn more about these in the Kafka Streams tutorial section below.

Ok, but why not configure and run a Kafka Connect based app?  Good question, but as I understand it, Kafka Connect is essentially the E and L in ETL.  Support for transformations is minimal as seen in the Kafka Connect Transformations documentation.  In addition, the transformations are on a per message basis, or single message transforms, rather than across a stream.  Finally, and somebody please correct me if I’m wrong here, but are there options for using a Kafka topic as either a sink or source in Kafka Connect?

KAFKA STREAMS FEATURES AND BENEFITS

Some the factors I find appealing about Kafka Streams include:

  • Lightweight – no external dependencies other than Kafka itself of course, yet you can still scale out horizontally
  • Testable – it’s easy to write Kafka Streams test (see the tutorial section for links on how to test Kafka Streams)
  • JVM – I don’t have to learn a new language.  Can continue to use Scala as I can in Spark.
  • Exactly-once processing guarantees.  This is really powerful and shows how Kafka has evolved over the years.  I’m a big fan but believe we need to be careful when considering processing vs delivery.  For more, see my thoughts on exactly once in Kafka
  • Abstraction DSL – as you can see from the Kafka tutorials, the code is very readable.
  • Ability to perform both stateless transformations and stateful aggregations and joins
  • Supports common stream processing constructs such as differences in meaning of time and windowing
  • API is familiar coming from Apache Spark, so the learning curve is low

 

Why Kafka StreamS?

To answer the question of why Kafka Streams, I believe we need to understand the role of Kafka Streams in a larger software architecture paradigm change from batch-oriented to stream-oriented.  We need to speak of the emergence of stream processors and streaming architectures.  For me, the person who influenced my thinking most on this topic most was Martin Kleppman.  When I downloaded the freely available “Making Sense of Stream Processing” book, I had already experimented and deployed to production Kafka, Lambda and kappa architectures, CAP theorem and microservices, so I wasn’t expecting the book to be that impactful.  But it was.

Making Sense of Stream Processing
Making Sense of Stream Processing

 

At the time of this writing, if you search for “Making Sense of Stream Processing”, you’ll find numerous sources where you can download this book for free.

If I could attempt to summarize visually, I would try by showing the older architectures of tight coupling components in an overall architectural stack has more drawbacks than advantages.  A tightly coupled architecture such as this

Streaming Architecture Before
Streaming Architecture Before

should be stopped

Replace with Streaming Architecture
Replace with Streaming Architecture

and the introduction of a distributed ordered log like Kafka should be introduced to provide the backbone of streaming architectures and processors.  It may look something like this

Kafka Streams Ready Streaming Architecture
Kafka Streams Ready Streaming Architecture

 

 

Now, what this last image does not show is the rise of stream processors.  Stream processors provide value by providing curated results to Kafka topics.  For example, a stream processor may pull from one or more distributed log inputs (i.e. Kafka topics), perform some transformations, aggregations, filtering, etc. across messages in the stream and then, write the results back to the distributed log output (i.e. Kafka topic).  The results of a stream processor back into the stream is intended for the consumption somewhere downstream.  In other words, this is an example of developing and deploying a stream processor.  And that, girls and boys, is why-we-are-here as they say in the business.  Kafka Streams is one option for creating stream processors when the input and output are both Kafka topics.

As noted on the Stream Processors and Streaming Architecture overview page, stream processors build upon key concepts such as the meaning of time, aggregations and windowing, stateless vs stateful, and processing vs delivery guarantees.  Kafka Stream is no exception to these requirements and provides varying support for each one.

 

WHEN TO USE KAFKA STREAMS?

Hopefully, it’s obvious by now, but the appropriate time to consider Kafka Streams is when you are building streaming processors where both the input and output are Kafka Topics.

KAFKA STREAMS Tutorials

This Kafka Streams overview will be fine for those of you looking to obtain a high-level understanding of Kafka Streams.  But, for developers looking to gain hands-on experience with Kafka Streams, be sure to check out the Kafka Streams tutorials section of this site.  Here you will be to experiment with all kinds of Kafka Streams use cases such as quick starts, automated testing, joining streams, etc.

For the most recent list, check out the Kafka Streams Tutorials section of on the Kafka tutorial page.

Comparisons or Alternatives to Kafka Streams

Remember, Kafka Streams is designed for building Kafka based stream processors where a stream input is a Kafka topic and the stream processor output is a Kafka topic.  This distinction is simply a requirement when considering other mechanisms for producing and consuming to Kafka.   If your use case is only producing messages to Kafka or only consuming messages from Kafka then a Kafka Streams based stream processor may be the right choice.  However, if you need to write your own code to build stream processors for more than just Kafka such as Kinesis or Pulsar or Google Pub/Sub, you may wish to consider alternatives such as Spark Streaming, Apache Flink or Apache Beam.

References

 

Kafka Streams Joins Examples

Kafka Join Examples

Performing Kafka Streams Joins presents interesting design options when implementing streaming architecture patterns.

There are numerous applicable scenarios, but let’s consider an application might need to access multiple database tables or REST APIs in order to enrich a topic’s event record with context information. For example, perhaps we could augment records in a topic with sensor event data with location and temperature with the most current weather information for the location.  Furthermore, let’s say we require these weather lookups based on a sensor’s location to have extremely low processing latency which we cannot achieve with a database or REST API lookup.  In this case, we may wish to leverage the Kafka Streams API to perform joins of such topics (sensor events and weather data events), rather than requiring lookups to remote databases or REST APIs.  This could result in improved processing latency.  (If not entirely obvious, this previous example assumes we are piping sensor and weather events into Kafka topics)

In this Kafka Streams Joins examples tutorial, we’ll create and review sample code of various types of Kafka joins.  In addition, let’s demonstrate how to run each example.  The intention is a deeper dive into Kafka Streams joins to highlight possibilities for your use cases.

KAFKA STREAMS JOINS OPERATORS

When going through the Kafka Stream join examples below, it may be helpful to start with a visual representation of expected results join operands.

Kafka Joins Operand Expected Results
Kafka Joins Operand Expected Results

When we go through examples of Kafka joins, it may be helpful keep this above diagram in mind.  The color blue represents are expected results when performing the Kafka based joins.

Kafka Streams Joins Code Overview

We can implement Kafka joins in different ways.  In the following examples, we’ll cover the Kafka Streams DSL perspective.  From this approach, we’ll use the DSL for abstractions such as `KTable`, `KStream` and `GlobalKTable`.  We’ll cover various usage examples of these abstractions, but it’s important to note regardless of abstraction, joining streams involves :

  1. Specifying at least two input streams which are read from Kafka topics
  2. Performing transformations on the joined streams to produce results
  3. Writing the results back to Kafka topics

 

In essence, we will be creating miniature stream processing applications for each one of the join examples.

But first, how should we think about our choices of `KTable` vs `KStream` vs `GlobalKTable`?

`KTable` represents each data record as an upsert.  If an existing key in the stream exists, it will be updated.  If the key does not exist it will be inserted.  For those of you coming from relational databases, I like to think of `KTable` as a form of a reference table.  In my experience, the use of reference tables was concerned with using the latest values for a particular key rather than the entire history of a particular key.  The value of a reference table was looking up the most recent value of a particular key in a table, rather than all the values of a particular key.

`KStream` on the other hand is designed for when you are concerned with the entire history of data events for particular keys.  This is often referred to as each data record as being considered an insert (rather than an update or upsert in `KTable`).  For example, KStream would be utilized to process each sensor temperature readings in order to produce an average temperature over a period of time.  All the historical records are required to produce a reasonable average.  This is in contrast to `KTable` where you might wish to know the most recent average temperatures of all sensors in a particular region.  You wouldn’t use a `KTable` to calculate an average because KTable would always return the most recent individual temperature and not concerned with each individual event like `KStream`.

`GlobalKTable`, as the name implies, is a form of `KTable`.  Unlike a regular `KTable` which will represent 1 partition from the topic of which it is being composed, `GlobalKTable`, on the other hand, accounts for all partitions in the underlying topic.  As you can imagine, this has advantages but also performance related considerations as well.  Performance related considerations include increased storage and increased network transmission requirements.  Other benefits of `GlobalKTable` include no requirement for co-partitioning for joins, the ability to broadcast to all running instances of an application, and more join operations which we won’t cover in any detail here because of the introductory nature of this tutorial.

We do not cover co-partitioning in this tutorial but let me know if you’d like to explore further.

TYPES OF KAFKA STREAMS JOINS

Keep in mind there are essentially two types of joins: windowed and non-windowed.  Windowing allows us to control how to group records which have the same key.

In joins, a windowing state store is used to retain all the records within a defined window boundary.  Old records in the state store are purged after a defined retention period. The default window retention period is one day.  I’ll add relevant windowing where applicable in join examples below.

Kafka Streams Join Examples

Before we get into the Kafka Streams Join source code examples, I’d like to show a quick screencast of running the examples to help set some overall context and put you in a position to succeed.  As you’ll see, the examples are in Scala, but let me know if you’d like to see them converted to Java.  Let me know.

Kafka Streams – Examples of Joins

As you see in the screencast, we’re going to run all the Kafka Streams Joins examples through Scala tests.  If you want some background on this approach, it may be helpful to check out the previous Kafka Streams Testing post.

Code Examples

Let’s start with 3 examples of `KTable` to `KTable` joins.  These examples and other examples of Kafka Joins are contained in the `com.supergloo.KafkaStreamsJoins` class.  (All source code is available for download.  See link to it in the Reference section below and Screencast above for further reference.)

Each of the `KTable` to `KTable` join examples are within functions starting with the name `kTableToKTable`.  For example, an inner join example is within the `kTableToKTableJoin` function

def kTableToKTableJoin(inputTopic1: String,
                       inputTopic2: String,
                       storeName: String): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder

  val userRegions: KTable[String, String] = builder.table(inputTopic1)
  val regionMetrics: KTable[String, Long] = builder.table(inputTopic2)

  userRegions.join(regionMetrics,
    Materialized.as(storeName))((regionValue, metricValue) => regionValue + "/" + metricValue)

  builder.build()
}
What’s going on in the code above?

Using a `StreamBuilder` we construct two `KTable` and perform the inner join.  In the args we are providing to `join` function, we are providing a specific instance of `StateStore` in `Materialzed.as(storedName)`.   In essence, this `StateStore` is another `KTable` which is based on a Kafka topic.  I find it helps when I attempt to simplify the constructs behind the API.  In this case, we’re simply joining two topics based on keys and particular moments in time (message ordering in the topic).  The results of this join are stored to another Kafka topic for a period of time.  Finally, in the last portion of the call

((regionValue, metricValue) => regionValue + "/" + metricValue)

we’re providing an implementation of what to do with the values of each topic based on the join of keys.  If this is confusing, it will make sense when you see the results we are testing for next.  (Also, to really drive it home, try changing “/” to “-” for example and re-run the tests to see the failures.)

How to run the Kafka join examples?

To run the Kafka join examples, check out the `com.supergloo.KafkaStreamsJoinsSpec` test class as shown in the Screencast above.  Running this class will run all of the Kafka join examples.  For example, the following test will run this inner join test described above.

// -------  KTable to KTable Joins ------------ //
"KTable to KTable Inner join" should "save expected results to state store" in {

  val driver = new TopologyTestDriver(
    KafkaStreamsJoins.kTableToKTableJoin(inputTopicOne, inputTopicTwo, stateStore),
    config
  )

  driver.pipeInput(recordFactory.create(inputTopicOne, userRegions))
  driver.pipeInput(recordFactoryTwo.create(inputTopicTwo, sensorMetric))

  // Perform tests
  val store: KeyValueStore[String, String] = driver.getKeyValueStore(stateStore)

  store.get("sensor-1") shouldBe "MN/99"
  store.get("sensor-3-in-topic-one") shouldBe null
  store.get("sensor-99-in-topic-two") shouldBe null

  driver.close()
}

The expected results specific to Kafka Joins will be in the tests

  store.get("sensor-1") shouldBe "MN/99"
  store.get("sensor-3-in-topic-one") shouldBe null
  store.get("sensor-99-in-topic-two") shouldBe null

Pay attention to how these tests differ from the other `KTable` to `KTable` join tests later in the test code.

Windowing note: As you might expect, `KTable` to `KTable` are non-windowed because of the nature of `KTable` where only the most recent keys are considered.

Next, let’s move on to `KStream` to `KTable` join examples.  Following the overall code organization of join implementations and test examples described above, we can find three examples of these joins in functions starting with the name “kStreamToKTable” in `KafkaStreamsJoins`.  Similarly, we can find examples of how to run the examples and differences in their tests in the `KafkaStreamsJoinsSpec` class.

As you’ll see in the implementation of the `KStream` to `KTable` examples, the API use is slightly different.  For example, in the inner join example

val userRegions: KTable[String, String] = builder.table(inputTopic1)
val regionMetrics: KStream[String, Long] = builder.stream(inputTopic2)

regionMetrics.join(userRegions){(regionValue, metricValue) =>
  regionValue + "/" + metricValue
}.to(outputTopicName)

val outputTopic: KTable[String, String] =
  builder.table(
    outputTopicName,
    Materialized.as(storeName)
  )

In this example above, we don’t have the option to provide a `StateStore` in the join.  So, instead, we use `to` function to pipe results to a new topic directly.  Then, we customize the `StateStore` by creating a `KTable` with the previously mentioned topic, so we can reference in the tests.

When moving to the `KStream` to `KStream` examples with a function name starting with “kStreamToKStream”, notice we need to provide a `JoinWindow` now.  For example

regionMetrics.join(userRegions)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

The final two examples are `KStream` to `GlobalKTable` joins.  Again, the code is similar, but key differences include how to create a GlobalKTable and the `join` function signature as seen in the following.

val userRegions: GlobalKTable[String, String] = builder.globalTable(inputTopic1)
val regionMetrics: KStream[String, Long] = builder.stream(inputTopic2)

regionMetrics.join(userRegions)(
  (lk, rk) => lk,
  ((regionValue, metricValue) => regionValue + "/" + metricValue)
).to(outputTopicName)

Constructing a `GlobalKTable` is simple enough that it doesn’t require elaboration.

The `join` function signature changes to require a keyValueMapper: `(lk, rk) => lk`  This keyValueMapper is a function used to map the key,value pair from the KStream to the key of the `GlobalKTable`.  In this implemenation, nothing fancy.  We simply want the key of the `KStream` (represented as “lk), to match the key of the `GlobalKTable`.

As you might expect based on the aforementioned description of `KTable` vs `GlobalKTable`, the tests in `KStream` to `GlobalKTable` joins are nearly identical to `KStream` to `KTable` examples.

Kafka JoinS Examples Conclusion

Hopefully, you found these Kafka join examples helpful and useful.  If you have any questions or even better, suggestions on how to improve, please let me know.

 

Noteworthy

As shown in the screencast above, the path is not smooth when a failed test occurs. If you run a test which fails and then you attempt to rerun tests again, an Exception occurs and none of the tests pass.  The Exception is

org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory for task 0_0

The only way I’ve found to resolve is `rm -rf /tmp/kafka-streams/testing/0_0/`

This experience happens when running tests in both IntelliJ and SBT REPL.

Let me know any suggestions to resolve.

 

References

 

 

Kafka Streams Joins Examples image credit: https://pixabay.com/en/network-networking-rope-connection-1246209/

 

Kafka Streams Testing with Scala Part 1

Kafka Streams Testing in Scala Part 1

After experimenting with Kafka Streams with Scala in the first post [1], I started to wonder how one goes about Kafka Streams testing in Java or Scala.  How does one create and run automated tests for Kafka Streams applications?  In this post, I’ll describe what I’ve learned so far.  Also, if you have any questions or ideas on how to improve, let me know your feedback.  All source is available from “kafka-streams” github repo.  Link below.

Kafka Streams Testing with Scala Tutorial Objectives

First, let’s consider the following just one way to create automated tests using ScalaTest framework and `kafka-streams-test-utils`.  As you know in Scala and Java, there are many ways to build automated tests.  So, if this Kafka Streams testing tutorial is one of many possibilities, you may be wondering, is it the best way?  And the answer is… Yes, of course, it is.  My way is always the best way.  I live in a bubble jackass.

As the saying goes, “it’s my way or the highway”.  Speaking of a highway, according to Tom Cochrane, “life is a highway” [2].  Remember that next time you’re feeling blue.  And I mean, Tom Cochrane ripping on that harmonica like he just doesn’t care kind of highway and definitely NOT the Rascal Flatts cover of the same song for the movie Cars.  Sorry, not sorry, if you are a Rascal Flatts or the movie “Cars” fans.  Oh man, I’m getting way off track here.  Where were we jackasses?  That’s right, I called you a jackass again.  It’s my tutorial.

Brief History

When beginning my search for examples of how to create automated Scala tests for Kafka Streams, I came across an approach which used `EmbeddedSingleNodeKafkaCluster` from the Confluent GitHub examples [3].  Well, long story short, this didn’t turn out so well.  So after some more searching, I found the Kafka Streams Test Utils page [4] and went from there.  I’m fairly certain the `TopologyTestDriver` provided in Kafka Streams Testing Utils will be the recommended approach going forward, but correct me if I’m wrong.

Kafka Streams Testing with Scala Example

Ok, here we go.  I needed to refactor the original WordCount Kafka Streams in Scala example to be more testable. I just added this new version of the code to the Kafka Streams repo [5].  For this tutorial, there is a Scala class and companion object with refactored logic into more testable functions.  See `src/main/scala/com/supergloo/WordCountTestable.scala`

Screencast

Here’s a quick screencast of Kafka Streams Scala test example to set the context.

Kafka Streams Testing with Scala Part 1

Kafka Streams Code Highlights

So what are the takeaways?  What stood out?   Let’s consider one of the Kafka Streams tests

"WordCountTestable" should "count number of words" in {
  val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
  val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
  val words = "Hello Kafka Streams, All streams lead to Kafka"
  driver.pipeInput(recordFactory.create(words))
  val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
  store.get("hello") shouldBe 1
  store.get("kafka") shouldBe 2
  store.get("streams") shouldBe 2
  store.get("lead") shouldBe 1
  store.get("to") shouldBe 1
  driver.close()
}
TopologyTestDriver

According to the source code comments, this class is designed to help write Kafka Streams topologies created with either `Topology` or `StreamBuilder` without the need for a “real” Kafka broker.  Ok, I’m sold.  Of course, I don’t want a dependency on an actual running Kafka cluster in order to execute Kafka Streams tests.  Anything we can do to simplify is valued, right?

Now, I’m not entirely certain on differences between `Topology` and `StreamBuilder` approaches yet.  Maybe we’ll explore further in the future.  Let me know if you have ideas or suggestions.

In this example, the approach was to use `Topology` created from `StreamBuilder` as seen in the constructor call to `TopologyTestDriver`:

new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)

When you review the `countNumberOfWords` function, you’ll see the return type of `Topology` which is created by using `StreamBuilder`.

ConsumerRecordFactory

Next up, `ConsumerRecordFactory`.  From looking at the source, we see it’s package is in `org.apache.kafka.streams.test` so that’s an indicator of function.  It must be used for testing only.  Further, the code comments go on to describe this class should be used to create `ConsumerRecord` for a single partitioned topic.  Ok, great.  I’m not sure if I’ll need or want to test multi-partitioned topics someday, but I’m guessing this will be more of a load testing or stress testing exercise which won’t be part of automated tests.

TopologyTestDriver and ConsumerRecordFactory – Well, It’s a Story

I was thinking about calling this section a “Love Story”, but it just didn’t feel right.  Some folks might enjoy a love story or a romantic comedy, but others may not.  I don’t want to be polarizing here, you know.  It’s all about the sweet, sweet code music.

Everything comes together with `TopologyTestDriver` and `ConsumerRecordFactory` when they create their sweet, sweet love child `KeyValueStore`

driver.pipeInput(recordFactory.create(words))
val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")

What’s going on here?

`pipeInput` function sends a given key, value and timestamp to the specified topic.  It accepts a `ConsumerRecord` we create from the `ConsumerRecordFactory` `create` function.

Next, the `KeyValueStore` is retrieved from the `TopologyTestDriver` with a specific `StateStore`.  (For more info, see `Materialized.as` in the `countNumberOfWords` implementation.) . With this `KeyValueStore` we’re able to perform test the values of certain keys in the store; e.g.

store.get("hello") shouldBe 1

 

 

Kafka Streams Testing in Scala References

[1] Kafka Streams with Scala Quick Start

[2] According to Tom Cochrane, life is a highway  Now just stop for 20 seconds and watch the beginning of this video.  The guy is nuts.  He nearly gets run over while doin his whole harmonica in the middle of the road spiel.  Do NOT try this at home or on a highway near you.

[3] https://github.com/confluentinc/kafka-streams-examples/blob/5.1.0-post/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala <- As mentioned, didn’t have much luck with this approach.  Trouble figuring out how to load and make available the  `EmbeddedSingleNodeKafkaCluster` class in SBT

[4] Testing a Streams Application

[5] Kafka Streams Examples at https://github.com/tmcgrath/kafka-streams

[6] Thank you to http://learnscala.co/kafka-stream-testing/  for the examples.  Check it out for more.

 

Image credit https://pixabay.com/en/color-chalk-india-colorful-color-106692/

Kafka Streams Tutorial with Scala for Beginners Example

Kafka Streams Tutorial with Scala for Beginners

If you’re new to Kafka Streams, here’s a Kafka Streams Tutorial with Scala tutorial which may help jumpstart your efforts.  My plan is to keep updating the sample project, so let me know if you would like to see anything in particular with Kafka Streams with Scala.  In this example, the intention is to 1) provide an SBT project you can pull, build and run 2) describe the interesting lines in the source code.

The project is available to clone at https://github.com/tmcgrath/kafka-streams

Assumptions

This example assumes you’ve already downloaded Open Source or Confluent Kafka.  It’s run on a Mac in a bash shell, so translate as necessary.

The screencast below also assumes some familiarity with IntelliJ.

Kafka Streams Tutorial with Scala Quick Start

Let’s run the example first and then describe it in a bit more detail.

1. Start up Zookeeper

<KAFKA_HOME>/bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

For example ~/dev/confluent-5.0.0/bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2. Start Kafka

<KAFKA_HOME>/bin/kafka-server-start ./etc/kafka/server.properties

3. Create a topic

<KAFKA_HOME>/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic text_lines

4. Create some sample content

echo -e "doo dooey do dodah\ndoo dooey do dodah\ndoo dooey do dodah" > words.txt

5. Send the content to a Kafka topic

cat words.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic text_lines

6. Run it like you mean it.  I mean put some real effort into it now.  In screencast (below), I run it from IntelliJ, but no one tells you what to do.  You do it the way you want to… in SBT or via `kafka-run-class`

7. Verify the output like you just don’t care.  Yeah.

bin/kafka-console-consumer --bootstrap-server localhost:9092 \
        --topic word_count_results \
        --from-beginning \
        --formatter kafka.tools.DefaultMessageFormatter \
        --property print.key=true \
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

 

ScreenCast

If the steps above left you feeling somewhat unsatisfied and putting you in a wanting-more kind of groove, a screencast is next.  Let’s run through the steps above in the following Kafka Streams Scala with IntelliJ example.  Prepare yourself.

Kafka Streams with Scala for Beginners Example

 

Kafka Streams

So, why Kafka Streams?  My first thought was it looks like Apache Spark.  The code itself doesn’t really offer me any compelling reason to switch.

But it is cool that Kafka Streams apps can be packaged, deployed, etc. without a need for a separate processing cluster.  Also, it was nice to be able to simply run in a debugger without any setup ceremony required when running cluster based code.

I’m intrigued by the idea of being able to scale out by adding more instances of the app.  In other words, this example could horizontally scale out by simply running more than one instance of `WordCount`.  Maybe I’ll explore that in a later post.

 

Kafka Streams TUTORIAL with Scala Source Code Breakout

When I started exploring Kafka Streams, there were two areas of the Scala code that stood out: the SerDes import and the use of KTable vs KStreams.

Kafka SerDes with Scala

This sample utilizes implicit parameter support in Scala.  This makes the code easier to read and more concise.  As shown in the above screencast, the ramifications of not importing are shown.  This is part of the Scala library which we set as a dependency in the SBT build.sbt file.  Serdes._ will bring `Grouped`, `Produced`, `Consumed` and `Joined` instances into scope.

import Serdes._
KTable and KStreams

The second portion of the Scala Kafka Streams code that stood out was the use of KTable and KStream.

I wondered what’s the difference between KStreams vs KTable?  Why would I use one vs the other?

KStreams are useful when you wish to consume records as independent, append-only inserts.  Think of records such as page views or in this case, individual words in text. Each word, regardless of past or future, can be thought of as an insert.

KStreams has operators that should look familiar functional combinators in Apache Spark – map, filter, etc.

KTable, on the other hand, represents each data record as an update rather than an insert.  In our example, we want an update on the count of words.  If a word as been previously counted to 2 and it appears again, we want to update the count to 3.

KTable operators will look familiar to SQL constructs… groupBy various Joins,etc.

——-

References

https://docs.confluent.io/3.1.1/streams/concepts.html#duality-of-streams-and-tables

https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kafka-streams-dsl-for-scala

https://github.com/tmcgrath/kafka-streams

 

 

Kafka Streams with Scala post image credit https://pixabay.com/en/whiskey-bar-alcohol-glass-scotch-315178/