GlobalKTable vs KTable in Kafka Streams

KTable vs GlobalKTable

Kafka Streams presents two options for materialized views in the forms of GlobalKTable vs KTables.  We will describe the meaning of “materialized views” in a moment, but for now, let’s just agree there are pros and cons to GlobalKTable vs KTables.

The essential three factors in your decision of when to use a GlobalKTable vs KTable will come down to 1) the number of nodes in your Kafka Streams application, 2) the number of partitions in your underlying topics, and 3) how you plan to join streams.  For the last point on joins, in particular, you will find the choice of GlobalKTable vs KTable most interesting when you are designing and/or implementing Kafka Streams Joins when deployed across multiple nodes using topics having multiple partitions.

To help me understand this further and hopefully you as well, let’s explore GlobalKTable and KTables a bit more.  Also, if the idea of “materialized view” is not clear to you, I think this post will help as well.

Let’s start with “Why KTable?”

KTable is an abstraction on a Kafka topic that can represent the latest state of a key/value pair.  The underlying Kafka topic is likely enabled with log compaction.  When I was first learning about KTables, the idea of UPSERTS immediately came to mind.  Why?  Well, the processing characteristics when attempting inserts or updates are familiar to upsert capable systems.  For example, an attempt to append a key/value pair without an existing key in a KTable will result in an INSERT while an attempt to append a key/value pair with an existing key will result in an UPDATE.

KTable Example

For a simple example of a “materialized view” through KTables, consider an event arriving at the KTable with a key tommy and value of 3. If the tommykey does not exist in the KTable, it will be appended as an INSERT.  On the other hand, when a subsequent event with a key of tommy arrives, the existing tommy key event will be updated.  If the next tommy event has a value of 2, the KTable value for the tommy key will be 2.

Another key takeaway is the update is a simple replace and not a calculation of any sort.  For example, the values of 3 and 2 do not result in a sum of 5.

KTable vs GlobalKTable Considerations

That’s great and simple to understand in isolation.  But, we need to take it further in order to understand why KTable vs GlobalKTable?  For that, we need to explore two more constructs before we get to the results.

The first construct involves the effect of operating Kafka Streams applications across multiple nodes with topics containing multiple partitions.

The second construct involves the ramifications of KTables with multiple underlying topic partitions and multiple Kafka Streams nodes when performing JOINS with other event streams.

Let’s start with KTables operations and then move to joins.

Let’s consider a Kafka Streams application deployed across three nodes with an application.id of ABC.  These three Kafka Streams nodes are interacting with a Kafka Cluster with 3 Broker nodes.  That keeps it simple right?

Now, let’s assume a KTable with an underlying topic that contains 3 partitions.  On any node running your Kafka Streams application, this example KTable will only be populated with 1 partition worth of data.  To illustrate this point, the key/value pair with the tommy key may or may not be present in your KTable.

 

KTable Simple Mapping Diagram
KTable Simple Mapping Diagram

 

This shouldn’t come as shock when you consider how Kafka Streams and Kafka Connect often leverage the capabilities found in Producer and Consumer Kafka APIs.  This 3 node, 3 partitions KTable example with the tommy key event is only present is similar to how Consumers will attach to 1-to-1 to particular partitions when configured in a Kafka Consumer Group.

Ok, so tommyis present in one KTable on a particular Kafka Streams node.  So what?  What’s the big deal? Let’s cover that next when we consider the mechanics of performing join operations.  First, let’s setup our example.

KTable and KStream Join Example

Let’s assume we have a stream of data which represents tommy’s location.  Perhaps the key/value pair is key = tommy and value = {longitude: 68.9063° W, latitude: 41.8101 S}.  Now, imagine this stream of tommy’s location is arriving every 10 minutes or so landing in a KStream with an underlying topic with 6 partitions.  Let’s call this stream the locations stream.

At this point, we have a KTable called tommy key with a materialized view value of 25.  Maybe this is tommy’s current age or his jersey number or his favorite number.  It doesn’t really matter.  But, let’s call this KTable current_status.

What happens if we want to join locations with current_statusin this example?  Should be simple right?  The tommy keys align for performing the joins, but is it that simple?  (hint: remember the 3 Kafka Streams app nodes interacting with a Kafka Cluster with 3 brokers where there are a different number of partitions in the underlying topics of both locationsKStream and the current_status KTable.)

Answer: there’s a chance the join will fail.  Why? Because the Kafka Streams node performing the join may not have the tommy key from both locationsand current_status as shown in the following diagram.

 

KTable to KStream Not Co-partitioned Diagram
KTable to KStream Not Co-partitioned Diagram

 

Another way to describe this scenario is to flip-the-script and ask “are there any requirements when performing joins in Kafka Streams?”.  The answer, of course, is yes.  And in this particular example of a KStream to KTable join, the requirement is that the data which should be joined must be “co-partitioned”.  More on this available in the Resources and References section below.

How to Solve this Example Challenge?

Well, we have two options to ensure the joins will succeed regardless of which node performs the join.  One of the options involve using a GlobalKTable for current_status instead of a KTable.

GlobalKTables are replicate all underlying topic partitions on each instance of KafkaStreams.  So, when we modify our example to use a GlobalKTable, the tommykey event will be all 3 Kafka Stream nodes.  This means a join to the locations stream will succeed regardless of which node performs the join.

Simple right? Well, like many things in software there are trade-offs.

This example doesn’t consider the size of the overall size and mutation velocity of the underlying 3 partition topic for current_status.  It’s not difficult to imagine this example becoming complicated really quickly if this topic is over 10GB.  That’s a lot of synchronization across nodes.  That’s potentially a lot to keep in memory to avoid disk i/o.

So, in essence, whether or not GlobalKTable is a good solution depends on additional factors.  But, hopefully, this post helps describe KTable vs GlobalKTable and when and why you should consider one vs the other.

By the way, the other option to GlobalKTable would be manually ensuring co-partitioning could be creating a new underlying topic(s) for either side of the join and ensuring the same number of underlying partitions AND the same partitioning strategy used.   More on this approach can be found in the Resources and References section below.

Hopefully, this GlobalKTable vs KTable analysis helps.  Let me know in the comments section below.

Resources and References

  • More information on co-partitioning requirements https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#join-co-partitioning-requirements
  • Nice write up on GlobalKTables http://timvanlaer.be/2017-06-28/working-with-globalktables/
  • Historical Perspective on adding GlobalKTables https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649
  • To consider manually ensuring co-partitioning, see “Ensuring data co-partitioning:” section under https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#join-co-partitioning-requirements

 

 

Featured Image credit https://pixabay.com/photos/back-to-nature-climate-4536617/

Kafka Streams – Transformations Examples

Kafka Streams Transformations

Kafka Streams Transformations provide the ability to perform actions on Kafka Streams such as filtering and updating values in the stream.  Kafka Stream’s transformations contain operations such as `filter`, `map`, `flatMap`, etc. and have similarities to functional combinators found in languages such as Scala.  And, if you are coming from Spark, you will also notice similarities to Spark Transformations.  But, even if you don’t have experience with combinators or Spark, we’ll cover enough examples of Kafka Streams Transformations in this post for you to feel comfortable and gain confidence through hands-on experience.  We’re going to cover examples in Scala, but I think the code would readable and comprehensible for those of you with a Java preference as well.

Kafka Stream Transformations are available from `KTable` or `KStream` and will result in one or more `KTable`, `KStream` or `KGroupedTable` depending on the transformation function.  We’ll cover examples of various inputs and outputs below.

Before we go into the source code examples, let’s cover a little background and also a screencast of running through the examples.

Kafka Streams Transformation Types

Kafka Streams Transformations are available in two types: Stateless and Stateful.

Stateless transformations do not require state for processing.  For example, let’s imagine you wish to filter a stream for all keys starting with a particular string in a stream processor.  In this case, Kafka Streams doesn’t require knowing the previous events in the stream.  It simply performs each filtering operation on the message and moves on.  Conversely, let’s say you wish to sum certain values in the stream.  In this case, you would need “state” to know what has been processed already in previous messages in the stream in order to keep a running tally of the sum result.

As previously mentioned, stateful transformations depend on maintaining the state of the processing.  To maintain the current state of processing the input and outputs, Kafka Streams introduces a construct called a State Store.  Operations such as aggregations such as the previous sum example and joining Kafka streams are examples of stateful transformations.

Kafka Streams Transformations Source Code

All the source code is available from my Kafka Streams Examples repo on Github.

Kafka Streams Transformations Screencast

Before we begin going through the Kafka Streams Transformation examples, I’d recommend viewing the following short screencast where I demonstrate how to run the Scala source code examples in IntelliJ.

Kafka Streams Transformations Examples Scala Source Code

The following Kafka Streams transformation examples are primarily examples of stateless transformations.  Let me know if you want some stateful examples in a later post.  I do plan to cover aggregating and windowing in a future post.  Also, related to stateful Kafka Streams joins, you may wish to check out the previous Kafka Streams joins post.

It is recommended to watch the short screencast above, before diving into the examples.

Kafka Streams Transformation Examples


branch
filter
flatMap
map
groupBy

`branch`

The `branch` function is used to split a KStream by the supplied predicates into one of more KStream results.  In this Kafka Streams Transformations tutorial, the `branch` example had three predicates: two filters for key name and one default predicate for everything else.

This is the example implementation

val results: Array[KStream[String, String]] = inputStream.branch(
  (key, value) => key.startsWith(keyFilter1),
  (key, value) => key.startsWith(keyFilter2),
  (key, value) => true
)

and we tested the expected results for filters on “sensor-1” and “sensor-2” and a default.

storeOne.get("sensor-1") shouldBe "MN"
storeOne.get("sensor-11") shouldBe "IL"
storeTwo.get("sensor-2") shouldBe "WI"

`filter`

The ‘filter` function can filter either a KTable or KStream to produce a new KTable or KStream respectively.

For our example, we used a KStream

inputStream.filter(
  (key, value) => value == keyFilter
).to(s"${keyFilter}-topic")

In this example, we use the passed in filter based on values in the KStream.

storeOne.get("sensor-1") shouldBe valFilter
storeOne.get("sensor-2") shouldBe null
storeOne.get("sensor-11") shouldBe null

`valFilter` is set to “MN” in the Spec class.

`FLATMAP`

`flatMap` performs as expected if you have used it before in Spark or Scala.  Use it to produce zero, one or more records from each input record processed.
From the Kafka Streams documentation, it’s important to note

Marks the stream for data re-partitioning:

 Applying a grouping or a join after flatMap will result in re-partitioning of the records. If possible use flatMapValues instead, which will not cause data re-partitioning.

In the example `flatMap` implementation

inputStream.flatMap {
  (key, value) => {
    expanderList.flatMap { s =>
      List((s"${s}-${value}", value))
    }
  }
}

we are using both `flatMap` from Kafka Streams as well as `flatMap` from Scala.  The intention is to show creating multiple new records for each input record.

`MAP`

Where `flatMap` may produce multiple records from a single input record, `map` is used to produce a single output record from an input record.  I like to think of it as one-to-one vs the potential for `flatMap` to be one-to-many.

In the implementation example

val outputStream = inputStream.map {
  (key, value) => (key, s"${value}-new")
  }.to(resultTopic)

Here we simply create a new key, value pair with the same key, but an updated value.

In the tests, we test for the new values from the result stream.

storeOne.get("sensor-1") shouldBe "MN-new"
storeOne.get("sensor-2") shouldBe "WI-new"
storeOne.get("sensor-11") shouldBe "IL-new"

`GROUPBY`

In `groupBy` we deviate from stateless to stateful transformation here in order to test expected results.  In the implementation shown here

inputStream.groupBy {
  (key, value) => value
}.count()(Materialized.as(s"${storeName}"))

we are going to group by the values. Notice in the test class we are passing two records with the value of “MN” now. This will allow us to test the expected `count` results

storeOne.get("MN") shouldBe 2
storeOne.get("WI") shouldBe 1

`count` is a stateful operation which was only used to help test in this case.

Conclusion

Hope these examples helped. Do let me know if you have any questions, comments or ideas for improvement.

References

Kafka Streams Transformation Examples featured image: https://pixabay.com/en/dandelion-colorful-people-of-color-2817950/

Kafka Streams – Why Should You Care?

Why Kafka Streams?

Kafka Streams is another entry into the stream processing framework category with options to leverage from either Java or Scala.  In this post, we’ll describe what is Kafka Streams, features and benefits, when to consider, how-to Kafka Stream tutorials, and external references.  Ultimately, the goal of this post is to answer the question, why should you care?

What is Kafka Streams?

Kafka Streams is a client library for processing and analyzing data stored in Kafka. Developers use the Kafka Streams library to build stream processor applications when both the stream input and stream output are Kafka topic(s).  We’ll cover stream processors and stream architectures throughout this tutorial.

Some of you may be wondering, why Kafka Streams over Kafka Connect or writing your own Kafka Consumer or Kafka Producer?  What makes Kafka Streams different?

Well, to answer those questions, we should note one key difference right from the start.

As mentioned, Kafka Streams is used to write stream processors where the input and output are Kafka topics.  Visually, an example of a Kafka Streams architecture may look like the following.

Kafka Streams Example
Kafka Streams Example

As see above, both the input and output of Kafka Streams applications are Kafka topics.

From this image, it appears Kafka Consumer and Kafka Producer APIs are being used.  Is either of these APIs used in Kafka Streams based apps?  Well, the answer is yes and no.  Kafka Streams builds upon Kafka Producer and Consumer libraries, so it is able to leverage the native capabilities of Kafka such as fault tolerance, distributed coordination, and parallelism.  But, it doesn’t expose the APIs directly.  Instead, input and output to Kafka topics in Kafka Streams apps are available via Consumer and Producer abstractions such as the Kafka Streams DSL and Processor API.  You’ll learn more about these in the Kafka Streams tutorial section below.

Ok, but why not configure and run a Kafka Connect based app?  Good question, but as I understand it, Kafka Connect is essentially the E and L in ETL.  Support for transformations is minimal as seen in the Kafka Connect Transformations documentation.  In addition, the transformations are on a per message basis, or single message transforms, rather than across a stream.  Finally, and somebody please correct me if I’m wrong here, but are there options for using a Kafka topic as either a sink or source in Kafka Connect?

KAFKA STREAMS FEATURES AND BENEFITS

Some of the factors I find appealing about Kafka Streams include:

  • Lightweight – no external dependencies other than Kafka itself of course, yet you can still scale out horizontally
  • Testable – it’s easy to write Kafka Streams test (see the tutorial section for links on how to test Kafka Streams)
  • JVM – I don’t have to learn a new language.  Can continue to use Scala as I can in Spark.
  • Exactly-once processing guarantees.  This is really powerful and shows how Kafka has evolved over the years.  I’m a big fan but believe we need to be careful when considering processing vs delivery.  For more, see my thoughts on exactly once in Kafka
  • Abstraction DSL – as you can see from the Kafka tutorials, the code is very readable.
  • Ability to perform both stateless transformations and stateful aggregations and joins
  • Supports common stream processing constructs such as differences in meaning of time and windowing
  • API is familiar coming from Apache Spark, so the learning curve is low

Why Kafka Streams?

To answer the question of why Kafka Streams, I believe we need to understand the role of Kafka Streams in a larger software architecture paradigm change from batch-oriented to stream-oriented.  We need to speak of the emergence of stream processors and streaming architectures.  For me, the person who influenced my thinking most on this topic most was Martin Kleppman.  When I downloaded the freely available “Making Sense of Stream Processing” book, I had already experimented and deployed to production Kafka, Lambda and kappa architectures, CAP theorem and microservices, so I wasn’t expecting the book to be that impactful.  But it was.

Making Sense of Stream Processing
Making Sense of Stream Processing

At the time of this writing, if you search for “Making Sense of Stream Processing”, you’ll find numerous sources where you can download this book for free.

If I could attempt to summarize visually, I would try by showing the older architectures of tight coupling components in an overall architectural stack has more drawbacks than advantages.  A tightly coupled architecture such as this

Streaming Architecture Before
Streaming Architecture Before

should be stopped

Replace with Streaming Architecture
Replace with Streaming Architecture

and the introduction of a distributed ordered log like Kafka should be introduced to provide the backbone of streaming architectures and processors.  It may look something like this

Kafka Streams Ready Streaming Architecture
Kafka Streams Ready Streaming Architecture

Now, what this last image does not show is the rise of stream processors.  Stream processors provide value by providing curated results to Kafka topics.  For example, a stream processor may pull from one or more distributed log inputs (i.e. Kafka topics), perform some transformations, aggregations, filtering, etc. across messages in the stream and then, write the results back to the distributed log output (i.e. Kafka topic).  The results of a stream processor back into the stream is intended for the consumption somewhere downstream.  In other words, this is an example of developing and deploying a stream processor.  And that, girls and boys, is why-we-are-here as they say in the business.  Kafka Streams is one option for creating stream processors when the input and output are both Kafka topics.

As noted on the Stream Processors and Streaming Architecture overview page, stream processors build upon key concepts such as the meaning of time, aggregations and windowing, stateless vs stateful, and processing vs delivery guarantees.  Kafka Stream is no exception to these requirements and provides varying support for each one.

WHEN TO USE KAFKA STREAMS?

Hopefully, it’s obvious by now, but the appropriate time to consider Kafka Streams is when you are building streaming processors where both the input and output are Kafka Topics.

Kafka Streams Tutorials

This Kafka Streams overview will be fine for those of you looking to obtain a high-level understanding of Kafka Streams.  But, for developers looking to gain hands-on experience with Kafka Streams, be sure to check out the Kafka Streams tutorials section of this site.  Here you will be to experiment with all kinds of Kafka Streams use cases such as quick starts, automated testing, joining streams, etc.

For the most recent list, check out the Kafka Streams Tutorials section of on the Kafka tutorial page.

Comparisons or Alternatives to Kafka Streams

Remember, Kafka Streams is designed for building Kafka based stream processors where a stream input is a Kafka topic and the stream processor output is a Kafka topic.  This distinction is simply a requirement when considering other mechanisms for producing and consuming to Kafka.  For example, you could build a stream processor with Spark Streaming and Kafka.

If your use case is only producing messages to Kafka or only consuming messages from Kafka then a Kafka Streams based stream processor may be the right choice.  However, if you need to write your own code to build stream processors for more than just Kafka such as Kinesis or Pulsar or Google Pub/Sub, you may wish to consider alternatives such as Spark Streaming, Apache Flink or Apache Beam.

References

Kafka Streams Joins Examples

Kafka Join Examples

Performing Kafka Streams Joins presents interesting design options when implementing streaming processor architecture patterns.

There are numerous applicable scenarios, but let’s consider an application might need to access multiple database tables or REST APIs in order to enrich a topic’s event record with context information. For example, perhaps we could augment records in a topic with sensor event data with location and temperature with the most current weather information for the location.  Furthermore, let’s say we require these weather lookups based on a sensor’s location to have extremely low processing latency which we cannot achieve with a database or REST API lookup.  In this case, we may wish to leverage the Kafka Streams API to perform joins of such topics (sensor events and weather data events), rather than requiring lookups to remote databases or REST APIs.  This could result in improved processing latency.  (If not entirely obvious, this previous example assumes we are piping sensor and weather events into Kafka topics)

In this Kafka Streams Joins examples tutorial, we’ll create and review the sample code of various types of Kafka joins.  In addition, let’s demonstrate how to run each example.  The intention is a deeper dive into Kafka Streams joins to highlight possibilities for your use cases.

These examples below are in Scala, but the Java version is also available at https://github.com/tmcgrath/kafka-streams-java.

KAFKA STREAMS JOINS OPERATORS

When going through the Kafka Stream join examples below, it may be helpful to start with a visual representation of expected results join operands.

Kafka Joins Operand Expected Results
Kafka Joins Operand Expected Results

When we go through examples of Kafka joins, it may be helpful to keep this above diagram in mind.  The color blue represents are expected results when performing the Kafka based joins.

Kafka Streams Joins Code Overview

We can implement Kafka joins in different ways.  In the following examples, we’ll cover the Kafka Streams DSL perspective.  From this approach, we’ll use the DSL for abstractions such as `KTable`, `KStream`, and `GlobalKTable`.  We’ll cover various usage examples of these abstractions, but it’s important to note regardless of abstraction, joining streams involves :

  1. Specifying at least two input streams which are read from Kafka topics
  2. Performing transformations on the joined streams to produce results
  3. Writing the results back to Kafka topics

In essence, we will be creating miniature stream processing applications for each one of the join examples.

But first, how should we think about our choices of `KTable` vs `KStream` vs `GlobalKTable`?

`KTable` represents each data record as an upsert.  If an existing key in the stream exists, it will be updated.  If the key does not exist it will be inserted.  For those of you coming from relational databases, I like to think of `KTable` as a form of a reference table.  In my experience, the use of reference tables was concerned with using the latest values for a particular key rather than the entire history of a particular key.  The value of a reference table was looking up the most recent value of a particular key in a table, rather than all the values of a particular key.

`KStream` on the other hand is designed for when you are concerned with the entire history of data events for particular keys.  This is often referred to as each data record as being considered an insert (rather than an update or upsert in `KTable`).  For example, KStream would be utilized to process each sensor temperature readings in order to produce an average temperature over a period of time.  All the historical records are required to produce a reasonable average.  This is in contrast to `KTable` where you might wish to know the most recent average temperatures of all sensors in a particular region.  You wouldn’t use a `KTable` to calculate an average because KTable would always return the most recent individual temperature and not concerned with each individual event like `KStream`.

`GlobalKTable`, as the name implies, is a form of `KTable`.  Unlike a regular `KTable` which will represent 1 partition from the topic of which it is being composed, `GlobalKTable`, on the other hand, accounts for all partitions in the underlying topic.  As you can imagine, this has advantages but also performance-related considerations as well.  Performance-related considerations include increased storage and increased network transmission requirements.  Other benefits of `GlobalKTable` include no requirement for co-partitioning for joins, the ability to broadcast to all running instances of an application, and more join operations which we won’t cover in any detail here because of the introductory nature of this tutorial.

We do not cover co-partitioning in this tutorial but let me know if you’d like to explore further.

TYPES OF KAFKA STREAMS JOINS

Keep in mind there are essentially two types of joins: windowed and non-windowed.  Windowing allows us to control how to group records that have the same key.

In joins, a windowing state store is used to retain all the records within a defined window boundary.  Old records in the state store are purged after a defined retention period. The default window retention period is one day.  I’ll add relevant windowing where applicable in the join examples below.

Kafka Streams Join Examples

Before we get into the Kafka Streams Join source code examples, I’d like to show a quick screencast of running the examples to help set some overall context and put you in a position to succeed.  As you’ll see, the examples are in Scala, but let me know if you’d like to see them converted to Java.  Let me know.

As you see in the screencast, we’re going to run all the Kafka Streams Joins examples through Scala tests.  If you want some background on this approach, it may be helpful to check out the previous Kafka Streams Testing post.

Code Examples

Let’s start with 3 examples of `KTable` to `KTable` joins.  These examples and other examples of Kafka Joins are contained in the `com.supergloo.KafkaStreamsJoins` class.  (All source code is available for download.  See link to it in the Reference section below and Screencast above for further reference.)

Each of the `KTable` to `KTable` join examples are within functions starting with the name `kTableToKTable`.  For example, an inner join example is within the `kTableToKTableJoin` function

def kTableToKTableJoin(inputTopic1: String,
                       inputTopic2: String,
                       storeName: String): Topology = {

  val builder: StreamsBuilder = new StreamsBuilder

  val userRegions: KTable[String, String] = builder.table(inputTopic1)
  val regionMetrics: KTable[String, Long] = builder.table(inputTopic2)

  userRegions.join(regionMetrics,
    Materialized.as(storeName))((regionValue, metricValue) => regionValue + "/" + metricValue)

  builder.build()
}
What’s going on in the code above?

Using a `StreamBuilder` we construct two `KTable` and perform the inner join.  In the args we are providing to `join` function, we are providing a specific instance of `StateStore` in `Materialzed.as(storedName)`.   In essence, this `StateStore` is another `KTable` which is based on a Kafka topic.  I find it helps when I attempt to simplify the constructs behind the API.  In this case, we’re simply joining two topics based on keys and particular moments in time (message ordering in the topic).  The results of this join are stored to another Kafka topic for a period of time.  Finally, in the last portion of the call

((regionValue, metricValue) => regionValue + "/" + metricValue)

we’re providing an implementation of what to do with the values of each topic based on the join of keys.  If this is confusing, it will make sense when you see the results we are testing for next.  (Also, to really drive it home, try changing “/” to “-” for example and re-run the tests to see the failures.)

How to run the Kafka join examples?

To run the Kafka join examples, check out the `com.supergloo.KafkaStreamsJoinsSpec` test class as shown in the Screencast above.  Running this class will run all of the Kafka join examples.  For example, the following test will run this inner join test described above.

// – --- –  KTable to KTable Joins – -------- – //
"KTable to KTable Inner join" should "save expected results to state store" in {

  val driver = new TopologyTestDriver(
    KafkaStreamsJoins.kTableToKTableJoin(inputTopicOne, inputTopicTwo, stateStore),
    config
  )

  driver.pipeInput(recordFactory.create(inputTopicOne, userRegions))
  driver.pipeInput(recordFactoryTwo.create(inputTopicTwo, sensorMetric))

  // Perform tests
  val store: KeyValueStore[String, String] = driver.getKeyValueStore(stateStore)

  store.get("sensor-1") shouldBe "MN/99"
  store.get("sensor-3-in-topic-one") shouldBe null
  store.get("sensor-99-in-topic-two") shouldBe null

  driver.close()
}

The expected results specific to Kafka Joins will be in the tests

  store.get("sensor-1") shouldBe "MN/99"
  store.get("sensor-3-in-topic-one") shouldBe null
  store.get("sensor-99-in-topic-two") shouldBe null

Pay attention to how these tests differ from the other `KTable` to `KTable` join tests later in the test code.

Windowing note: As you might expect, `KTable` to `KTable` are non-windowed because of the nature of `KTable` where only the most recent keys are considered.

Next, let’s move on to `KStream` to `KTable` join examples.  Following the overall code organization of join implementations and test examples described above, we can find three examples of these joins in functions starting with the name “kStreamToKTable” in `KafkaStreamsJoins`.  Similarly, we can find examples of how to run the examples and differences in their tests in the `KafkaStreamsJoinsSpec` class.

As you’ll see in the implementation of the `KStream` to `KTable` examples, the API use is slightly different.  For example, in the inner join example

val userRegions: KTable[String, String] = builder.table(inputTopic1)
val regionMetrics: KStream[String, Long] = builder.stream(inputTopic2)

regionMetrics.join(userRegions){(regionValue, metricValue) =>
  regionValue + "/" + metricValue
}.to(outputTopicName)

val outputTopic: KTable[String, String] =
  builder.table(
    outputTopicName,
    Materialized.as(storeName)
  )

In this example above, we don’t have the option to provide a `StateStore` in the join.  So, instead, we use `to` function to pipe results to a new topic directly.  Then, we customize the `StateStore` by creating a `KTable` with the previously mentioned topic, so we can reference in the tests.

When moving to the `KStream` to `KStream` examples with a function name starting with “kStreamToKStream”, notice we need to provide a `JoinWindow` now.  For example

regionMetrics.join(userRegions)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

The final two examples are `KStream` to `GlobalKTable` joins.  Again, the code is similar, but key differences include how to create a GlobalKTable and the `join` function signature as seen in the following.

val userRegions: GlobalKTable[String, String] = builder.globalTable(inputTopic1)
val regionMetrics: KStream[String, Long] = builder.stream(inputTopic2)

regionMetrics.join(userRegions)(
  (lk, rk) => lk,
  ((regionValue, metricValue) => regionValue + "/" + metricValue)
).to(outputTopicName)

Constructing a `GlobalKTable` is simple enough that it doesn’t require elaboration.

The `join` function signature changes to require a keyValueMapper: `(lk, rk) => lk`  This keyValueMapper is a function used to map the key, value pair from the KStream to the key of the `GlobalKTable`.  In this implementation, nothing fancy.  We simply want the key of the `KStream` (represented as “lk), to match the key of the `GlobalKTable`.

As you might expect based on the aforementioned description of `KTable` vs `GlobalKTable`, the tests in `KStream` to `GlobalKTable` joins are nearly identical to `KStream` to `KTable` examples.

Kafka JoinS Examples Conclusion

Hopefully, you found these Kafka join examples helpful and useful.  If you have any questions or even better, suggestions on how to improve, please let me know.

Noteworthy

As shown in the screencast above, the path is not smooth when a failed test occurs. If you run a test which fails and then you attempt to rerun tests again, an Exception occurs and none of the tests pass.  The Exception is

org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory for task 0_0

The only way I’ve found to resolve is `rm -rf /tmp/kafka-streams/testing/0_0/`

This experience happens when running tests in both IntelliJ and SBT REPL.

Let me know any suggestions to resolve.

References

Kafka Streams Joins Examples image credit: https://pixabay.com/en/network-networking-rope-connection-1246209/

Kafka Streams Testing with Scala Part 1

Kafka Streams Testing in Scala Part 1

After experimenting with Kafka Streams with Scala, I started to wonder how one goes about Kafka Streams testing in Java or Scala.  How does one create and run automated tests for Kafka Streams applications?  How does it compare to Spark Streaming testing?

In this tutorial, I’ll describe what I’ve learned so far.  Also, if you have any questions or ideas on how to improve, let me know your feedback.  All source is available from “kafka-streams” github repo.  Link below.

Kafka Streams Testing with Scala Objectives

First, let’s consider the following just one way to create automated tests using ScalaTest framework and `kafka-streams-test-utils`.  As you know in Scala and Java, there are many ways to build automated tests.  So, if this Kafka Streams testing tutorial is one of many possibilities, you may be wondering, is it the best way?  And the answer is… Yes, of course, it is.  My way is always the best way.  I live in a bubble jackass.

As the saying goes, “it’s my way or the highway”.  Speaking of a highway, according to Tom Cochrane, “life is a highway” [2].  Remember that next time you’re feeling blue.  And I mean, Tom Cochrane ripping on that harmonica like he just doesn’t care kind of highway and definitely NOT the Rascal Flatts cover of the same song for the movie Cars.  Sorry, not sorry, if you are a Rascal Flatts or the movie “Cars” fans.  Oh man, I’m getting way off track here.  Where were we jackasses?  That’s right, I called you a jackass again.  It’s my tutorial.

Brief History

When beginning my search for examples of how to create automated Scala tests for Kafka Streams, I came across an approach which used `EmbeddedSingleNodeKafkaCluster` from the Confluent GitHub examples [3].  Well, long story short, this didn’t turn out so well.  So after some more searching, I found the Kafka Streams Test Utils page [4] and went from there.  I’m fairly certain the `TopologyTestDriver` provided in Kafka Streams Testing Utils will be the recommended approach going forward, but correct me if I’m wrong.

Kafka Streams Testing with Scala Example

Ok, here we go.  I needed to refactor the original WordCount Kafka Streams in Scala example to be more testable. I just added this new version of the code to the Kafka Streams repo [5].  For this tutorial, there is a Scala class and companion object with refactored logic into more testable functions.  See `src/main/scala/com/supergloo/WordCountTestable.scala`

Screencast

Here’s a quick screencast of Kafka Streams Scala test example to set the context.

Kafka Streams Code Highlights

So what are the takeaways?  What stood out?   Let’s consider one of the Kafka Streams tests

"WordCountTestable" should "count number of words" in {
  val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
  val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
  val words = "Hello Kafka Streams, All streams lead to Kafka"
  driver.pipeInput(recordFactory.create(words))
  val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
  store.get("hello") shouldBe 1
  store.get("kafka") shouldBe 2
  store.get("streams") shouldBe 2
  store.get("lead") shouldBe 1
  store.get("to") shouldBe 1
  driver.close()
}
TopologyTestDriver

According to the source code comments, this class is designed to help write Kafka Streams topologies created with either `Topology` or `StreamBuilder` without the need for a “real” Kafka broker.  Ok, I’m sold.  Of course, I don’t want a dependency on an actual running Kafka cluster in order to execute Kafka Streams tests.  Anything we can do to simplify is valued, right?

Now, I’m not entirely certain on differences between `Topology` and `StreamBuilder` approaches yet.  Maybe we’ll explore further in the future.  Let me know if you have ideas or suggestions.

In this example, the approach was to use `Topology` created from `StreamBuilder` as seen in the constructor call to `TopologyTestDriver`:

new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)

When you review the `countNumberOfWords` function, you’ll see the return type of `Topology` which is created by using `StreamBuilder`.

ConsumerRecordFactory

Next up, `ConsumerRecordFactory`.  From looking at the source, we see it’s package is in `org.apache.kafka.streams.test` so that’s an indicator of function.  It must be used for testing only.  Further, the code comments go on to describe this class should be used to create `ConsumerRecord` for a single partitioned topic.  Ok, great.  I’m not sure if I’ll need or want to test multi-partitioned topics someday, but I’m guessing this will be more of a load testing or stress testing exercise which won’t be part of automated tests.

TopologyTestDriver and ConsumerRecordFactory –  Well, It’s a Story

I was thinking about calling this section a “Love Story”, but it just didn’t feel right.  Some folks might enjoy a love story or a romantic comedy, but others may not.  I don’t want to be polarizing here, you know.  It’s all about the sweet, sweet code music.

Everything comes together with `TopologyTestDriver` and `ConsumerRecordFactory` when they create their sweet, sweet love child `KeyValueStore`

driver.pipeInput(recordFactory.create(words))
val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")

What’s going on here?

`pipeInput` function sends a given key, value and timestamp to the specified topic.  It accepts a `ConsumerRecord` we create from the `ConsumerRecordFactory` `create` function.

Next, the `KeyValueStore` is retrieved from the `TopologyTestDriver` with a specific `StateStore`.  (For more info, see `Materialized.as` in the `countNumberOfWords` implementation.) . With this `KeyValueStore` we’re able to perform test the values of certain keys in the store; e.g.

store.get("hello") shouldBe 1

Kafka Streams Testing in Scala References

[1] Kafka Streams with Scala Quick Start

[2] According to Tom Cochrane, life is a highway  Now just stop for 20 seconds and watch the beginning of this video.  The guy is nuts.  He nearly gets run over while doin his whole harmonica in the middle of the road spiel.  Do NOT try this at home or on a highway near you.

[3] https://github.com/confluentinc/kafka-streams-examples/blob/5.1.0-post/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala <- As mentioned, didn’t have much luck with this approach.  Trouble figuring out how to load and make available the  `EmbeddedSingleNodeKafkaCluster` class in SBT

[4] Testing a Streams Application

[5] Kafka Streams Examples at https://github.com/tmcgrath/kafka-streams and for additional Kafka Streams Tutorials

[6] Thank you to http://learnscala.co/kafka-stream-testing/  for the examples.  Check it out for more.

Image credit https://pixabay.com/en/color-chalk-india-colorful-color-106692/