Kafka Streams Testing with Scala Part 1


After experimenting with Kafka Streams with Scala, I started to wonder how one goes about Kafka Streams testing in Java or Scala.  How does one create and run automated tests for Kafka Streams applications?  How does it compare to Spark Streaming testing?

In this tutorial, I’ll describe what I’ve learned so far.  Also, if you have any questions or ideas on how to improve, let me know your feedback.  All source is available from “kafka-streams” github repo.  Link below.

Kafka Streams Testing with Scala Objectives

First, let’s consider the following just one way to create automated tests using ScalaTest framework and `kafka-streams-test-utils`.  As you know in Scala and Java, there are many ways to build automated tests.  So, if this Kafka Streams testing tutorial is one of many possibilities, you may be wondering, is it the best way?  And the answer is… Yes, of course, it is.  My way is always the best way.  I live in a bubble jackass.

As the saying goes, “it’s my way or the highway”.  Speaking of a highway, according to Tom Cochrane, “life is a highway” [2].  Remember that next time you’re feeling blue.  And I mean, Tom Cochrane ripping on that harmonica like he just doesn’t care kind of highway and definitely NOT the Rascal Flatts cover of the same song for the movie Cars.  Sorry, not sorry, if you are a Rascal Flatts or the movie “Cars” fans.  Oh man, I’m getting way off track here.  Where were we jackasses?  That’s right, I called you a jackass again.  It’s my tutorial.

Brief History

When beginning my search for examples of how to create automated Scala tests for Kafka Streams, I came across an approach which used `EmbeddedSingleNodeKafkaCluster` from the Confluent GitHub examples [3].  Well, long story short, this didn’t turn out so well.  So after some more searching, I found the Kafka Streams Test Utils page [4] and went from there.  I’m fairly certain the `TopologyTestDriver` provided in Kafka Streams Testing Utils will be the recommended approach going forward, but correct me if I’m wrong.

See also  GlobalKTable vs KTable in Kafka Streams

Kafka Streams Testing with Scala Example

Ok, here we go.  I needed to refactor the original WordCount Kafka Streams in Scala example to be more testable. I just added this new version of the code to the Kafka Streams repo [5].  For this tutorial, there is a Scala class and companion object with refactored logic into more testable functions.  See `src/main/scala/com/supergloo/WordCountTestable.scala`

Screencast

Here’s a quick screencast of Kafka Streams Scala test example to set the context.

Kafka Streams Code Highlights

So what are the takeaways?  What stood out?   Let’s consider one of the Kafka Streams tests

"WordCountTestable" should "count number of words" in {
  val driver = new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)
  val recordFactory = new ConsumerRecordFactory("input-topic", new StringSerializer(), new StringSerializer())
  val words = "Hello Kafka Streams, All streams lead to Kafka"
  driver.pipeInput(recordFactory.create(words))
  val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")
  store.get("hello") shouldBe 1
  store.get("kafka") shouldBe 2
  store.get("streams") shouldBe 2
  store.get("lead") shouldBe 1
  store.get("to") shouldBe 1
  driver.close()
}

TopologyTestDriver

According to the source code comments, this class is designed to help write Kafka Streams topologies created with either `Topology` or `StreamBuilder` without the need for a “real” Kafka broker.  Ok, I’m sold.  Of course, I don’t want a dependency on an actual running Kafka cluster in order to execute Kafka Streams tests.  Anything we can do to simplify is valued, right?

Now, I’m not entirely certain on differences between `Topology` and `StreamBuilder` approaches yet.  Maybe we’ll explore further in the future.  Let me know if you have ideas or suggestions.

In this example, the approach was to use `Topology` created from `StreamBuilder` as seen in the constructor call to `TopologyTestDriver`:

new TopologyTestDriver(wordCountApplication.countNumberOfWords("input-topic", "output-topic", "counts-store"), config)

When you review the `countNumberOfWords` function, you’ll see the return type of `Topology` which is created by using `StreamBuilder`.

See also  Kafka Streams Tutorial with Scala for Beginners Example

ConsumerRecordFactory

Next up, `ConsumerRecordFactory`.  From looking at the source, we see it’s package is in `org.apache.kafka.streams.test` so that’s an indicator of function.  It must be used for testing only.  Further, the code comments go on to describe this class should be used to create `ConsumerRecord` for a single partitioned topic.  Ok, great.  I’m not sure if I’ll need or want to test multi-partitioned topics someday, but I’m guessing this will be more of a load testing or stress testing exercise which won’t be part of automated tests.

TopologyTestDriver and ConsumerRecordFactory – Well, It’s a Story

I was thinking about calling this section a “Love Story”, but it just didn’t feel right.  Some folks might enjoy a love story or a romantic comedy, but others may not.  I don’t want to be polarizing here, you know.  It’s all about the sweet, sweet code music.

Everything comes together with `TopologyTestDriver` and `ConsumerRecordFactory` when they create their sweet, sweet love child `KeyValueStore`

driver.pipeInput(recordFactory.create(words))
val store: KeyValueStore[String, java.lang.Long] = driver.getKeyValueStore("counts-store")

What’s going on here?

`pipeInput` function sends a given key, value and timestamp to the specified topic.  It accepts a `ConsumerRecord` we create from the `ConsumerRecordFactory` `create` function.

Next, the `KeyValueStore` is retrieved from the `TopologyTestDriver` with a specific `StateStore`.  (For more info, see `Materialized.as` in the `countNumberOfWords` implementation.) . With this `KeyValueStore` we’re able to perform test the values of certain keys in the store; e.g.

store.get("hello") shouldBe 1

Kafka Streams Testing in Scala References

[1] Kafka Streams with Scala Quick Start

[2] According to Tom Cochrane, life is a highway  Now just stop for 20 seconds and watch the beginning of this video.  The guy is nuts.  He nearly gets run over while doin his whole harmonica in the middle of the road spiel.  Do NOT try this at home or on a highway near you.

See also  Kafka Streams Joins Examples

[3] https://github.com/confluentinc/kafka-streams-examples/blob/5.1.0-post/src/test/scala/io/confluent/examples/streams/WordCountScalaIntegrationTest.scala <- As mentioned, didn’t have much luck with this approach.  Trouble figuring out how to load and make available the  `EmbeddedSingleNodeKafkaCluster` class in SBT

[4] Testing a Streams Application

[5] Kafka Streams Examples at https://github.com/tmcgrath/kafka-streams and for additional Kafka Streams Tutorials

Image credit https://pixabay.com/en/color-chalk-india-colorful-color-106692/

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment