Kafka Streams – Transformations Examples


Kafka Streams Transformations provide the ability to perform actions on Kafka Streams such as filtering and updating values in the stream.  Kafka Stream’s transformations contain operations such as `filter`, `map`, `flatMap`, etc. and have similarities to functional combinators found in languages such as Scala.  And, if you are coming from Spark, you will also notice similarities to Spark Transformations.  But, even if you don’t have experience with combinators or Spark, we’ll cover enough examples of Kafka Streams Transformations in this post for you to feel comfortable and gain confidence through hands-on experience.  We’re going to cover examples in Scala, but I think the code would readable and comprehensible for those of you with a Java preference as well.

Table of Contents

Kafka Stream Transformations are available from `KTable` or `KStream` and will result in one or more `KTable`, `KStream` or `KGroupedTable` depending on the transformation function.  We’ll cover examples of various inputs and outputs below.

Before we go into the source code examples, let’s cover a little background and also a screencast of running through the examples.

Kafka Streams Transformation Types

Kafka Streams Transformations are available in two types: Stateless and Stateful.

Stateless transformations do not require state for processing.  For example, let’s imagine you wish to filter a stream for all keys starting with a particular string in a stream processor.  In this case, Kafka Streams doesn’t require knowing the previous events in the stream.  It simply performs each filtering operation on the message and moves on.  Conversely, let’s say you wish to sum certain values in the stream.  In this case, you would need “state” to know what has been processed already in previous messages in the stream in order to keep a running tally of the sum result.

As previously mentioned, stateful transformations depend on maintaining the state of the processing.  To maintain the current state of processing the input and outputs, Kafka Streams introduces a construct called a State Store.  Operations such as aggregations such as the previous sum example and joining Kafka streams are examples of stateful transformations.

Kafka Streams Transformations Source Code

All the source code is available from my Kafka Streams Examples repo on Github.

Kafka Streams Transformations Screencast

Before we begin going through the Kafka Streams Transformation examples, I’d recommend viewing the following short screencast where I demonstrate how to run the Scala source code examples in IntelliJ.

Kafka Streams Transformations Examples Scala Source Code

The following Kafka Streams transformation examples are primarily examples of stateless transformations.  Let me know if you want some stateful examples in a later post.  I do plan to cover aggregating and windowing in a future post.  Also, related to stateful Kafka Streams joins, you may wish to check out the previous Kafka Streams joins post.

It is recommended to watch the short screencast above, before diving into the examples.

 

Kafka Streams Transformation Examples

 

branch

The `branch` function is used to split a KStream by the supplied predicates into one of more KStream results.  In this Kafka Streams Transformations tutorial, the `branch` example had three predicates: two filters for key name and one default predicate for everything else.

This is the example implementation

val results: Array[KStream[String, String]] = inputStream.branch(
  (key, value) => key.startsWith(keyFilter1),
  (key, value) => key.startsWith(keyFilter2),
  (key, value) => true
)

and we tested the expected results for filters on “sensor-1” and “sensor-2” and a default.

storeOne.get("sensor-1") shouldBe "MN"
storeOne.get("sensor-11") shouldBe "IL"
storeTwo.get("sensor-2") shouldBe "WI"

filter

The ‘filter` function can filter either a KTable or KStream to produce a new KTable or KStream respectively.

For our example, we used a KStream

inputStream.filter(
  (key, value) => value == keyFilter
).to(s"${keyFilter}-topic")

In this example, we use the passed in filter based on values in the KStream.

storeOne.get("sensor-1") shouldBe valFilter
storeOne.get("sensor-2") shouldBe null
storeOne.get("sensor-11") shouldBe null

`valFilter` is set to “MN” in the Spec class.

flatmap

`flatMap` performs as expected if you have used it before in Spark or Scala.  Use it to produce zero, one or more records from each input record processed.
From the Kafka Streams documentation, it’s important to note

Marks the stream for data re-partitioning:

 Applying a grouping or a join after flatMap will result in re-partitioning of the records. If possible use flatMapValues instead, which will not cause data re-partitioning.

In the example `flatMap` implementation

inputStream.flatMap {
  (key, value) => {
    expanderList.flatMap { s =>
      List((s"${s}-${value}", value))
    }
  }
}

we are using both `flatMap` from Kafka Streams as well as `flatMap` from Scala.  The intention is to show creating multiple new records for each input record.

map

Where `flatMap` may produce multiple records from a single input record, `map` is used to produce a single output record from an input record.  I like to think of it as one-to-one vs the potential for `flatMap` to be one-to-many.

In the implementation example

val outputStream = inputStream.map {
  (key, value) => (key, s"${value}-new")
  }.to(resultTopic)

Here we simply create a new key, value pair with the same key, but an updated value.

In the tests, we test for the new values from the result stream.

storeOne.get("sensor-1") shouldBe "MN-new"
storeOne.get("sensor-2") shouldBe "WI-new"
storeOne.get("sensor-11") shouldBe "IL-new"

groupby

In `groupBy` we deviate from stateless to stateful transformation here in order to test expected results.  In the implementation shown here

inputStream.groupBy {
  (key, value) => value
}.count()(Materialized.as(s"${storeName}"))

we are going to group by the values. Notice in the test class we are passing two records with the value of “MN” now. This will allow us to test the expected `count` results

storeOne.get("MN") shouldBe 2
storeOne.get("WI") shouldBe 1

`count` is a stateful operation which was only used to help test in this case.

Conclusion

Hope these examples helped. Do let me know if you have any questions, comments or ideas for improvement.

References

Kafka Streams Transformation Examples featured image: https://pixabay.com/en/dandelion-colorful-people-of-color-2817950/

See also  Kafka Streams Testing with Scala Part 1
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

1 thought on “Kafka Streams – Transformations Examples”

Leave a Comment