How To Generate Kafka Streaming Join Test Data By Example

Kafka Joins Test Data

Why “Joinable” Streaming Test Data for Kafka?

When creating streaming join applications in KStreams, ksqldb, Spark, Flink, etc. with source data in Kafka, it would be convenient to generate fake data with cross-topic relationships; i.e. a customer topic and an order topic with a value attribute of  In this example, we might want to hydrate or enrich events in the order topic stream with customer topic data…. via joins.

This post will show how to generate cross-topic related events in Kafka, so you can write and test stream join code.  Let’s consider it an expansion to the previous post on generating test data in Kafka.  I’m going to show how to generate both Avro and JSON.  You’ll have choices.

At the end of this post you will have a local running Kafka cluster in Docker with test data being delivered to multiple topics.  You’ll have the ability to choose between Avro or JSON for the payload of the tests messages.  At the end, you will have a convenient spot to run and test your streaming join code.  Let the good times begin.

System Requirements

  • Docker install and running
  • docker-compose
  • git
  • curl
  • jq
  • A fun lovin, do-not-take-yourself-too-seriously, good times attitude


Before we begin, you should know this post assumes this isn’t your first time using Docker, docker-compose, and Git.  This means you should be comfortable starting and stopping containers in your environment.  Now, you may be wondering, how can I make these assumptions?  Well, it’s simple, I can make these assumptions because it’s my blog and I’m the big-time boss around here.


Here’s how we’re going to do it. We’re going to run a Kafka cluster and Kafka Connect node in Docker containers with docker-compose.  We’re going to use a 3rd party Kafka Connect connector called Voluble for generating the joinable test data. I’m show how to install, configure and run it our Docker containers.  I’ve written about Voluble in the past.  It’s awesome.

Kafka Join Test Data Setup Steps

  1. Start Docker if it is not already running
  2. Download zip file from and extract.  Depending on the version you downloaded, you should have a directory such as `mdrogalis-voluble-0.3.1`.  In this case, I downloaded version 0.3.1.
  3. git clone
  4. cd kafka-stack-docker-compose
  5. mkdir connectors && cd connectors
  6. Copy the directory from Step 1; i.e. `mdrogalis-voluble-0.3.1` to the connectors directory.  (Remember you are in the kafka-stack-docker-compose directory which has this connectorsdirectory.  For example, theconnectors directory will have mdrogalis-voluble-0.3.1/ directory in it now.
  7. cd .. (so you are back to the kafka-stack-docker-compose directory)
  8. docker-compose -f full-stack.yml up -d (this will start on the Docker containers.  Give it 30 seconds or so after starting up.)
  9. curl http://localhost:8083/connector-plugins | jq ‘.’ (This is to confirm you can list the Kafka Connect plugins available.  You should see VolubleSourceConnector in the list.  You need this step to complete successfully before continuing.)

Ok, at this point, you are ready to start the Voluble Kafka Connect connector with test data.

Streaming Join Test Configuration and Start

In this example, I’m going to use an example config file at  It’s config file to generate some joinable data.  Download this file (or copy-and-paste the contents to a new file) to your environment called joinable.json.

Now, in the same directory as the joinable.json file, start the Voluble connector with

curl -X POST -H "Accept:application/json" -H "Content-Type: application/json" – data @joinable.json http://localhost:8083/connectors | jq '.'

As you can see, this assumes you have a voluble.json file in your current directory.

If the command was successful, you should see an output similar to the following

  "name": "joinable",
  "config": {
    "connector.class": "io.mdrogalis.voluble.VolubleSourceConnector",
    "genkp.inventory.sometimes.with": "#{Code.asin}",
    "genkp.inventory.sometimes.matching": "inventory.key",
    "genv.inventory.amount_in_stock.with": "#{number.number_between '5','15'}",
    "genv.inventory.product_name.with": "#{Commerce.product_name}",
    "genv.inventory.last_updated.with": "#{date.past '10','SECONDS'}",
    "genkp.customer.with": "#{Code.isbn10}",
    "": "#{Name.full_name}",
    "genv.customer.gender.with": "#{}",
    "genv.customer.favorite_beer.with": "#{}",
    "genv.customer.state.with": "#{Address.state}",
    "genkp.order.matching": "inventory.key",
    "genv.order.quantity.with": "#{number.number_between '1','5'}",
    "genv.order.customer_id.matching": "customer.key",
    "": "1000",
    "global.history.records.max": "10000",
    "name": "joinable"
  "tasks": [],
  "type": "source"

You should now be generating test data in Avro format.

There are multiple ways to view and confirm what we just did, so I’ll briefly show a couple of different ways.  At some point, I’ll probably use this example to demonstrate an Apache Flink app with Kafka Join.  Let me know if you’d like to see that.

I have Kafdrop running and configured to use the Confluent Schema Registry which is running at port 8081 in the full-stack deploy we ran earlier.  From Kafdrop, I can see that an order messages always reference a customertopic message key.

Order topic with customer_id set

kafka joinable test data generation verification of order
kafka joinable test data generation verification of order

and if we compare it to the customer topic message key, we’ll see the reference.  For example a customer_id of 0860762270 matches the customer key

kafka joinable test data verification 2
kafka joinable test data verification 2


I know many of you will not have Kafkdrop installed, so you can also test with something like kcat.  For example, if you run:

kcat -b localhost:9092 -t customer -s avro -r http://localhost:8081

and compare the output to

kcat -b localhost:9092 -t order -s avro -r http://localhost:8081

But both of these verification methods are a bit away from the primary focus of this post.  But, if you’d like me to make a screencast video to demonstrate any of this, just let me know in comments below.

Avro and JSON Test Data Generation Examples

By default, what we did in the above steps will produce/consume messages in Avro format.  This may be exactly what you are looking for.  But, just in case, let’s quickly explore how we would generate JSON payload instead of Avro.  There is nothing to change in Voluble.  Just need to make changes to the Kafka Connect container configuration.

To generate JSON instead of Avro, open the full-stack.yml file and change the lines as shown in the following snippet

# CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
# CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'


For those of you wondering what changed, there are new values for KEY and VALUE converters as well as a new line for CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE.

To see this in action, stop the existing containers with docker-compose -f full-stack.yml down.  The previously created topics and data will not be retained.

Now, if you start at step 8 from the original setup instructions provided above and then re-POST in exact same the Voluble config, you will now produce JSON value payloads with a String key.

kafka joinable test data 3
kafka joinable test data in JSON instead of Avro now


Looking Ahead

As mentioned in the excellent Voluble documentation, notice the references to various .matching settings in the source JSON config file for cross-topic relationships; i.e.

 "genv.order.customer_id.matching": "customer.key"

For even further examples of what you can do, check out Java Faker github repo.  Link below.

You are now ready to point your Flink || Spark || KStreams || ksql using the topics generated and hydrated with Voluble.

Hope this helps!

Streaming Join Test Data References


Image credit

Spark Structured Streaming with Kafka Example – Part 1

Spark Structured Streaming with Kafka Examples

In this post, let’s explore an example of updating an existing Spark Streaming application to newer Spark Structured Streaming.  We will start simple and then move to a more advanced Kafka Spark Structured Streaming examples.

My original Kafka Spark Streaming post is three years old now.  On the Spark side, the data abstractions have evolved from RDDs to DataFrames and DataSets. RDDs are not the preferred abstraction layer anymore and the previous Spark Streaming with Kafka example utilized DStreams which was the Spark Streaming abstraction over streams of data at the time.  Some of you might recall that DStreams was built on the foundation of RDDs.

From the Spark DStream API docs

“A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a StreamingContext or it can be generated by transforming existing DStreams using operations such as mapwindow and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.”

Because we try not to use RDDs anymore, it can be confusing when there are still Spark tutorials, documentation, and code examples that still show RDD examples.  I’ve updated the previous Spark Streaming with Kafka example to point to this new Spark Structured Streaming with Kafka Example example to try to help clarify.

Ok, let’s show a demo and look at some code.

All source code is available on Github.  See link below.

Spark Structured Streaming with Kafka Examples Overview

We are going to show a couple of demos with Spark Structured Streaming code in Scala reading and writing to Kafka.  The Kafka cluster will consist of three multiple brokers (nodes), schema registry, and Zookeeper all wrapped in a convenient docker-compose example.  Let me know if you have any ideas to make things easier or more efficient.

The Scala code examples will be shown running within IntelliJ as well as deploying to a Spark cluster.

The source code and docker-compose file are available on Github.  See the Resources section below for links.


If you want to run these Kafka Spark Structured Streaming examples exactly as shown below, you will need:

Structured Streaming with Kafka Demo

Let’s take a look at a demo.


The following items or concepts were shown in the demo--

  • Startup Kafka Cluster with docker-compose -up
  • Need kafkacatas described in Generate Test Data in Kafka Cluster (used an example from a previous tutorial)
  • Run the Spark Kafka example in IntelliJ
  • Build a Jar and deploy the Spark Structured Streaming example in a Spark cluster with spark-submit

This demo assumes you are already familiar with the basics of Spark, so I don’t cover it.

Spark Structured Streaming with Kafka CSV Example

For reading CSV data from Kafka with Spark Structured streaming, these are the steps to perform.

  1. Loaded CSV data into Kafka with cat data/cricket.csv | kafkacat -b localhost:19092 -t cricket_csv
  2. Ran the example  in IntelliJ
  3. The code to highlight is the inputDF DataFrame and use of  the selectExprfunction where we utilized the CASTbuilt SparkSQL function to deserialize the Kafka key and value from the INPUT_CSV topic into a new DataFrame called inputCSV
  4. We output inputCSVto the console with writeStream.

Spark Structured Streaming with Kafka JSON Example

For reading JSON values from Kafka, it is similar to the previous CSV example with a few differences noted in the following steps.

  1. Load JSON example data into Kafka with cat data/cricket.json | kafkacat -b localhost:19092 -t cricket_json -J
  2. Notice the inputJsonDFDataFrame creation.  A couple of noteworthy items are casting to String before using the from_jsonfunction.  We pass the from_jsondeserializer a StructTypeas defined in structCricket.
  3. Next, we create a filtered DataFrame called selectDF and output to the console.

Spark Structured Streaming with Kafka Avro

Reading Avro serialized data from Kafka in Spark Structured Streaming is a bit more involved.

  1. First, load some example Avro data into Kafka with cat data/cricket.json | kafka-avro-console-producer – broker-list localhost:19092 – topic cricket_avro – property value.schema="$(jq -r tostring data/cricket.avsc)"
  2. In the Scala code, we create and register a custom UDF called deserializeand use it in two different ways: once in the creation of valueDataFrameand the other in the creation of jsonDf.  This custom UDF is using a simple implementation of the Confluent AbstractKafkaAvroDeserializer.
  3. To make the data more useful, we convert to a DataFrame by using the Confluent Kafka Schema Registry.  In particular, check out the creation of avroDf . A couple of things to note here.  We seem to have to compute two conversions: 1)  deserialize from Avro to JSON and then 2) convert from JSON with from_jsonfunction similar to previous  JSON example but using a DataType from the spark-avro library this time.

I don’t honestly know if this the most efficient straightforward way when using Avro formatted data with Kafka and Spark Structured Streaming, but I definitely want/need to use the Schema Registry.  If you have some suggestions, please let me know.

Also, as noted in the source code, it appears there might be a different option available from Databricks’ available version of thefrom_avrofunction.  I’ll try it out in the next post.

Spark Structured Streaming Kafka Deploy Example

The build.sbt and project/assembly.sbt files are set to build and deploy to an external Spark cluster.  As shown in the demo, just run assembly and then deploy the jar.

Spark Structured Streaming Kafka Example Conclusion

As mentioned above, RDDs have evolved quite a bit in the last few years.  Kafka has evolved quite a bit as well.  However, one aspect which doesn’t seem to have evolved much is the Spark Kafka integration.  As you see in the SBT file, the integration is still using 0.10 of the Kafka API.  It doesn’t matter for this example, but it does prevent us from using more advanced Kafka constructs like Transaction support introduced in 0.11.  In other words, it doesn’t appear we can effectively set the `isolation level` to `read_committed`  from Spark Kafka consumer in other words.

The Bigger Picture

Hopefully, these examples are helpful for your particular use case(s).  I’d be curious to hear more about what you are attempting to do with Spark reading from Kafka.  Do you plan to build a Stream Processor where you will be writing results back to Kafka?  Or, will you be writing results to an object store or data warehouse and not back to Kafka?

My definition of a Stream Processor in this case is taking source data from an Event Log (Kafka in this case), performing some processing on it, and then writing the results back to Kafka.  These results could be utilized downstream from Microservice or used in Kafka Connect to sink the results into an analytic data store.

While I’m obviously a fan of Spark, I’m curious to hear your reasons to use Spark with Kafka.  You have other options, so I’m interested in hearing from you. Let me know in the comments below.




Featured image credit

Running Kafka Connect – Standalone vs Distributed Mode Examples

Kafka Connect Distributed Standalone Modes Examples

One of the many benefits of running Kafka Connect is the ability to run single or multiple workers in tandem.  Running multiple workers provides a way for horizontal scale-out which leads to increased capacity and/or an automated resiliency.  For resiliency, this means answering the question, “what happens if a particular worker goes offline for any reason?”.  Horizontal scale and failover resiliency are available out-of-the-box without a requirement to run another cluster.

In this post, we’ll go through examples of running Kafka Connect in both Standalone and Distributed mode.

Distributed mode is recommended when running Kafka Connect in production.

Have you heard? We just released a Kafka Connect “X-Course”!  It covers Standalone vs. Distributed mode with Apache Kafka; not just Confluent.  Learn about Distributed mode and much much more conveniently packed into under 2 hours of expert level deep-dive training. Check it out at Kafka Connect course.


Standalone and Distributed Mode Overview

To review, Kafka connectors, whether sources or sinks, run as their own JVM processes called “workers”.  As mentioned, there are two ways workers may be configured to run: Standalone and Distributed.

Now, regardless of mode, Kafka connectors may be configured to run more or tasks within their individual processes. For example, a Kafka Connector Source may be configured to run 10 tasks as shown in the JDBC source example here I wanted to make note of tasks vs. Distributed mode to avoid possible confusion.  Multiple tasks do provide some parallelism or scaling out, but it is a different construct than running in Distributed mode.  Ok, good, that’s out of the way.

Kafka Connect Standalone Mode

Running Kafka Connect in Standalone makes things really easy to get started.  Most of the examples on this site and others so far show running connectors in Standalone. In Standalone mode, a single process executes all connectors and their associated tasks.

There are cases when Standalone mode might make sense in Production.  For example, if you are log shipping from a particular host, it could make sense to run your log source in standalone mode on the host with the log(s) you are interested in ingesting into Kafka.  But generally speaking, you’ll probably want to run in Distributed mode in production.

As you can imagine, Standalone scalability is limited.  Also, there is no automated fault-tolerance out-of-the-box when a connector goes offline.  (Well, you could build an external automated monitoring process to restart failed Standalone connectors I guess, but that’s outside of scope here.  And also, why?  I mean, if you want automated failover, just utilize running in Distributed mode out-of-the-box.)

Kafka Connect Distributed Mode

Running Kafka Connect in Distributed mode runs Connect workers on one or multiple nodes.  When running on multiple nodes, the coordination mechanics to work in parallel does not require an orchestration manager such as YARN.  Let me stop here because this is an important point.  In other words, when running in a “cluster” of multiple nodes, the need to coordinate “which node is doing what?” is required.  This may or may not be relevant to you.  For me personally, I came to this after Apache Spark, so no requirement for an orchestration manager interested me.

The management of Connect nodes coordination is built upon Kafka Consumer Group functionality which was covered earlier on this site.  If Consumer Groups are new to you, check out that link first before proceeding here.

As you would expect with Consumer Groups, Connect nodes running in Distributed mode can evolve by adding or removing more nodes.  Like Consumer Group Consumers, Kafka Connect nodes will be rebalanced if nodes are added or removed.  As touched upon earlier in this post, this ability to expand or contract the number of worker nodes provides both the horizontal scale and fault tolerance inherent in Distributed mode.  Again, this is the same expectation you have when running Kafka Consumers in Consumer Groups.

If this is new to you, my suggestion is to try to keep-it-simple and go back to the foundational principles.  To understand Kafka Connect Distributed mode, spend time exploring Kafka Consumer Groups.

Kafka Connect Standalone and Distributed Mode Examples Overview

Let’s run examples of a connector in Standalone and Distributed mode.  To run in these modes, we are going to run a multi-node Kafka cluster in Docker.  When we run the example of Standalone, we will configure the Standalone connector to use this multi-node Kafka cluster.  And, when we run a connector in Distributed mode, yep, you guessed it, we’ll use this same cluster.

This might seem random, but do you watch TV shows?  Me too.  Why do I ask?

Well, I made a TV show running through the examples here.  I’m hoping it’s helpful for you to watch someone else run these examples if you are attempting to run the examples in your environment.

Here’s me running through the examples in the following “screencast” 🙁

By the way, yes, I know, you are right, most folks call these screencasts and not TV shows.  I get it.  But, it’s more fun to call it a Big Time TV show.

Need to learn more about with Kafka Connect?  Check out my Kafka Connect course.

Kafka Connect Standalone and Distributed Mode Example Requirements

To run these examples in your environment, the following are required to be installed and/or downloaded.

  1. Docker installed
  2. Docker-compose installed
  3. Confluent Platform or Apache Kafka downloaded and extracted (so we have access to the CLI scripts like kafka-topics or

Kafka Cluster Setup

To run these Standalone and Distributed examples, we need access to a Kafka cluster.  It can be Apache Kafka or Confluent Platform.  I’m going to use a docker-compose example I created for the Confluent Platform.

As previously mentioned and shown in the Big Time TV show above, the Kafka cluster I’m using for these examples a multi-broker Kafka cluster in Docker.  I’m using Confluent based images.  The intention is to represent a reasonable, but lightweight production Kafka cluster having multi brokers but not too heavy to require multiple Zookeeper nodes.  If you are not using this cluster, you’ll just have to make configuration adjustments for your environment in the steps below.

  1. Download or clone the repo with the docker-compose.yml file available here
  2. Run docker-compose up in the directory which contains this docker-compose.yml file.
  3. Confirm you have external access to the cluster by running kafka-topics – list – bootstrap-server localhost:29092 or kafkacat -b localhost:19092 -L.  Again, running either of these examples presumes you have downloaded, expanded, a distribution of Apache Kafka or Confluent Platform as described in the Requirements section above.  (A note on my environment.  I have CONFLUENT_HOME set as an environment variable and $CONFLUENT_HOME/bin in my path and havekafkacatinstalled.  Your environment may vary.  For example, if you are using Apache Kafka for CLI scripts, use kafka-topics.shinstead of kafka-topics.)

Again, see the…. screencast, pfft, I mean, Big Time TV show, above if you’d like to see a working example of these steps.

If you want to run an Apache Kafka cluster instead of Confluent Platform, you might want to check out or let me know if you have an alternative suggestion.

Kafka Connect Standalone Example

Both Confluent Platform and Apache Kafka include Kafka Connect sinks and source examples for both reading and writing to files.  For our first Standalone example, let’s use a File Source connector.  Again, I’m going to run through using the Confluent Platform, but I will note how to translate the examples to Apache Kafka.

If your external cluster is running as described above, go to the Confluent root directory in a terminal.  (You may have already set this to CONFLUENT_HOME environment variable).

  1. Kafka cluster is running.  See above.
  2. In a terminal window, cd to where you extracted Confluent Platform.  For my environment, I have this set to a CONFLUENT_HOME environment variable.
  3. Copy etc/kafka/ to the local directory; i.e. cp etc/kafka/ . (We are going to use a new, so we can customize it for this example only.  This isn’t a requirement.  We could use the default etc/kafka/, but I want to leave it so I can run confluent localcommands, as described in other Kafka Connect examples on this site.)
  4. Open this new file in your favorite editor and change bootstrap.servers value to localhost:19092
  5. Also, make sure the plugin.pathvariable in this file includes the directory where File Source connector jar file resides. For my environment, it is set to /Users/todd.mcgrath/dev/confluent-5.4.1/share/java/kafka.  By the way, my CONFLUENT_HOME var is set to /Users/todd.mcgrath/dev/confluent-5.4.1
  6. Create a test.txtfile with some sample data
  7. Run bin/connect-standalone ./ etc/kafka/ to start the File Source connector.
  8. If all went well, you should have a connect-test topic now.  You can verify with kafka-topics – list – bootstrap-server localhost:19092
  9. Confirm events are flowing with the console consumer; i.e bin/kafka-console-consumer – bootstrap-server localhost:19092 – topic connect-test


Kafka Connect Standalone Configuration

A fundamental difference between Standalone and Distributed appears in this example.  Where and how offsets are stored in the two modes are completely different.  As we’ll see later on in the Distributed mode example, Distributed mode uses Kafka for offset storage, but in Standalone, we see that offsets are stored locally when looking at the file.

Notice the following configuration in particular--

Apache Kafka Differences

If you were to run these examples on Apache Kafka instead of Confluent, you’d need to run instead of connect-standalone and the locations of the default locations of,, and the File Source connector jar (for setting in plugins.path) will be different.  You’ll need to adjust accordingly.  Let me know if you have any questions or concerns.

Kafka Connect Distributed Example

You’ll notice differences running connectors in Distributed mode right away.  To start, you don’t pass configuration files for each connector to startup.  Rather, you start up the Kafka Connect Distributed process and then manage via REST calls.  You’ll see in the example, but first let’s make sure you are setup and ready to go.

Kafka Connect Distributed Example – Part 1 – Setup

The following steps presume you are in a terminal at the root drive of your preferred Kafka distribution.  Agina, I’m going to use Confluent, so my CLI scripts do not have .shat the end.  Adjust yours as necessary.

First, pre-create 3 topics in the Dockerized cluster for Distributed mode as recommended in the documentation.  I’ll explain why afterward. If you running the Dockerized 3 node cluster described above, change the port from 9092 to 19092 such as:

  • bin/kafka-topics – create – bootstrap-server localhost:19092 – topic connect-configs – replication-factor 3 – partitions 1 – config cleanup.policy=compact
  • bin/kafka-topics – create – bootstrap-server localhost:19092 – topic connect-offsets – replication-factor 3 – partitions 50 – config cleanup.policy=compact
  • bin/kafka-topics – create – bootstrap-server localhost:19092 – topic connect-status – replication-factor 3 – partitions 10 – config cleanup.policy=compact
  • Verify all three topics are listed with – kafka-topics – list – bootstrap-server localhost:19092

Next, cp over the example properties file for Distributed mode so we can customize for this example.  This is similar to what we did above in Standalone mode.

cp etc/kafka/ ./

Edit this in your favorite editor.

  • Change bootstrap.servers=localhost:9092 to bootstrap.servers=localhost:19092
  • Ensure your plugin.path is set to a path that contains the connector JAR you want to run.  Similar to the above, we are going to use File Connector source which is included by default in Confluent.

Ensure you have a test.txt file in your local directory.  The same one from above is fine.

Finally, we need a JSON file with our connector configuration we wish to run in Distributed mode.  Create a connect-file-source.json file and cut-and-paste content into the file from here

So, you should have a connect-file-source.json file.

Ok, to review the Setup, at this point you should have

  • 3 topics created
  • file
  • test.txt file
  • connect-file-source.json file

If you have these 4 things, you, my good-times Internet buddy, are ready to roll.

Kafka Connect Distributed Example – Part 2 – Running a Simple Example

  • Startup Kafka Connect in Distributed — bin/connect-distributed
  • Ensure this Distributed mode process you just started is ready to accept requests for Connector management via the Kafka Connect REST interface.  I’m going to use curlbut you can use whatever REST client you’d prefer.  The output from the commandcurl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors should be 200.
  • Make REST call to start a new File Source Connector with curl -s -X POST -H 'Content-Type: application/json' – data @connect-file-source.json http://localhost:8083/connectors

Should be good-to-go now, but let’s verify.

  • Is the “local-file-source” connector listed when issuing curl -s http://localhost:8083/connectors
  • In a separate terminal window, is the connect-test topic now listed; i.e. bin/kafka-topics – list – bootstrap-server localhost:19092
  • And can we consume from it?  bin/kafka-console-consumer – bootstrap-server localhost:19092 – topic connect-test.  Add more data to test.txt to watch it in real-time as I did in the screencast shown above.

If verification is successful, let’s shut the connector down with

  • curl -s -X PUT http://localhost:8083/connectors/local-file-source/pause
  • curl -s -X GET http://localhost:8083/connectors/local-file-source/status
  • curl -s -X DELETE http://localhost:8083/connectors/local-file-source
  • curl -s http://localhost:8083/connectors should be empty

Kill the Distributed worker process if you’d like.

So, that’s it!  I hope you did it.  You now know how to run Kafka Connect in Distributed mode.

To recap, here’s what we learned in this section

  • Unlike Standalone, running Kafka Connect in Distributed mode stores the offsets, configurations, and task statuses in Kafka topics.  As recommended, we pre-created these topics rather than aut0-create.  In the Dockerized cluster used above, you may have noticed it allows auto-create of topics.  We didn’t do that.  No way.  Not us.
  • As described, Distributed builds on the mechanics of Consumer Groups, so no surprise to see a group.idvariable in the file.
  • To manage connectors in Distributed mode, we use the REST API interface.  This differs from Standalone where we can pass in configuration properties file from CLI.

Kafka Connect Tutorial Updates

Writing this post inspired me to add resources for running in Distributed mode.

For the Kafka MySQL JDBC tutorial, I added JSON examples to GitHub in case you want to run in Distributed mode.  See for access.

For the Kafka S3 examples, I also added JSON examples to GitHub in case you want to run in Distributed mode.  See for access.

For the Kafka Azure tutorial, there is a JSON example for Blob Storage Source available on the Confluent site at which might be helpful.

For providing JSON for the other Kafka Connect Examples listed on GitHub, I will gladly accept PRs.



Featured image

GlobalKTable vs KTable in Kafka Streams

KTable vs GlobalKTable

Kafka Streams presents two options for materialized views in the forms of GlobalKTable vs KTables.  We will describe the meaning of “materialized views” in a moment, but for now, let’s just agree there are pros and cons to GlobalKTable vs KTables.

Need to learn more about Kafka Streams in Java? Here’s a pretty good option Kafka Streams course on Udemy.

The essential three factors in your decision of when to use a GlobalKTable vs KTable will come down to 1) the number of nodes in your Kafka Streams application, 2) the number of partitions in your underlying topics, and 3) how you plan to join streams.  For the last point on joins, in particular, you will find the choice of GlobalKTable vs KTable most interesting when you are designing and/or implementing Kafka Streams Joins when deployed across multiple nodes using topics having multiple partitions.

To help me understand this further and hopefully you as well, let’s explore GlobalKTable and KTables a bit more.  Also, if the idea of “materialized view” is not clear to you, I think this post will help as well.

Let’s start with “Why KTable?”

KTable is an abstraction on a Kafka topic that can represent the latest state of a key/value pair.  The underlying Kafka topic is likely enabled with log compaction.  When I was first learning about KTables, the idea of UPSERTS immediately came to mind.  Why?  Well, the processing characteristics when attempting inserts or updates are familiar to upsert capable systems.  For example, an attempt to append a key/value pair without an existing key in a KTable will result in an INSERT while an attempt to append a key/value pair with an existing key will result in an UPDATE.

KTable Example

For a simple example of a “materialized view” through KTables, consider an event arriving at the KTable with a key tommy and value of 3. If the tommykey does not exist in the KTable, it will be appended as an INSERT.  On the other hand, when a subsequent event with a key of tommy arrives, the existing tommy key event will be updated.  If the next tommy event has a value of 2, the KTable value for the tommy key will be 2.

Another key takeaway is the update is a simple replace and not a calculation of any sort.  For example, the values of 3 and 2 do not result in a sum of 5.

KTable vs GlobalKTable Considerations

That’s great and simple to understand in isolation.  But, we need to take it further in order to understand why KTable vs GlobalKTable?  For that, we need to explore two more constructs before we get to the results.

The first construct involves the effect of operating Kafka Streams applications across multiple nodes with topics containing multiple partitions.

The second construct involves the ramifications of KTables with multiple underlying topic partitions and multiple Kafka Streams nodes when performing JOINS with other event streams.

Let’s start with KTables operations and then move to joins.

Let’s consider a Kafka Streams application deployed across three nodes with an of ABC.  These three Kafka Streams nodes are interacting with a Kafka Cluster with 3 Broker nodes.  That keeps it simple right?

Now, let’s assume a KTable with an underlying topic that contains 3 partitions.  On any node running your Kafka Streams application, this example KTable will only be populated with 1 partition worth of data.  To illustrate this point, the key/value pair with the tommy key may or may not be present in your KTable.


KTable Simple Mapping Diagram
KTable Simple Mapping Diagram


This shouldn’t come as shock when you consider how Kafka Streams and Kafka Connect often leverage the capabilities found in Producer and Consumer Kafka APIs.  This 3 node, 3 partitions KTable example with the tommy key event is only present is similar to how Consumers will attach to 1-to-1 to particular partitions when configured in a Kafka Consumer Group.

Ok, so tommyis present in one KTable on a particular Kafka Streams node.  So what?  What’s the big deal? Let’s cover that next when we consider the mechanics of performing join operations.  First, let’s setup our example.

KTable and KStream Join Example

Let’s assume we have a stream of data which represents tommy’s location.  Perhaps the key/value pair is key = tommy and value = {longitude: 68.9063° W, latitude: 41.8101 S}.  Now, imagine this stream of tommy’s location is arriving every 10 minutes or so landing in a KStream with an underlying topic with 6 partitions.  Let’s call this stream the locations stream.

At this point, we have a KTable called tommy key with a materialized view value of 25.  Maybe this is tommy’s current age or his jersey number or his favorite number.  It doesn’t really matter.  But, let’s call this KTable current_status.

What happens if we want to join locations with current_statusin this example?  Should be simple right?  The tommy keys align for performing the joins, but is it that simple?  (hint: remember the 3 Kafka Streams app nodes interacting with a Kafka Cluster with 3 brokers where there are a different number of partitions in the underlying topics of both locationsKStream and the current_status KTable.)

Answer: there’s a chance the join will fail.  Why? Because the Kafka Streams node performing the join may not have the tommy key from both locationsand current_status as shown in the following diagram.


KTable to KStream Not Co-partitioned Diagram
KTable to KStream Not Co-partitioned Diagram


Another way to describe this scenario is to flip-the-script and ask “are there any requirements when performing joins in Kafka Streams?”.  The answer, of course, is yes.  And in this particular example of a KStream to KTable join, the requirement is that the data which should be joined must be “co-partitioned”.  More on this available in the Resources and References section below.

How to Solve this Example Challenge?

Well, we have two options to ensure the joins will succeed regardless of which node performs the join.  One of the options involve using a GlobalKTable for current_status instead of a KTable.

GlobalKTables are replicate all underlying topic partitions on each instance of KafkaStreams.  So, when we modify our example to use a GlobalKTable, the tommykey event will be all 3 Kafka Stream nodes.  This means a join to the locations stream will succeed regardless of which node performs the join.

Simple right? Well, like many things in software there are trade-offs.

This example doesn’t consider the size of the overall size and mutation velocity of the underlying 3 partition topic for current_status.  It’s not difficult to imagine this example becoming complicated really quickly if this topic is over 10GB.  That’s a lot of synchronization across nodes.  That’s potentially a lot to keep in memory to avoid disk i/o.

So, in essence, whether or not GlobalKTable is a good solution depends on additional factors.  But, hopefully, this post helps describe KTable vs GlobalKTable and when and why you should consider one vs the other.

By the way, the other option to GlobalKTable would be manually ensuring co-partitioning could be creating a new underlying topic(s) for either side of the join and ensuring the same number of underlying partitions AND the same partitioning strategy used.   More on this approach can be found in the Resources and References section below.

Hopefully, this GlobalKTable vs KTable analysis helps.  Let me know in the comments section below.

Need to learn more about Kafka Streams in Java? Here’s a pretty good option Kafka Streams course on Udemy.

Resources and References

  • More information on co-partitioning requirements
  • Nice write up on GlobalKTables
  • Historical Perspective on adding GlobalKTables
  • To consider manually ensuring co-partitioning, see “Ensuring data co-partitioning:” section under



Featured Image credit

What and Why Event Logs for Data Engineers?

Why Event Logs diagram

“The idea of structuring data as a stream of events is nothing new, and it is used in many different
fields. Even though the underlying principles are often similar, the terminology is frequently
inconsistent across different fields, which can be quite confusing. Although the jargon can be
intimidating when you first encounter it, don’t let that put you off; many of the ideas are
quite simple when you get down to the core.” –Martin Kleppmann

And so begins Chapter 1 in the book “Making Sense of Stream Processing” by Martin Kleppmann.

This book can be found from numerous different sources for free, and it is the most efficient way to understand why we need Event Logs.

This amazingly succinct ~170-page book the best resource I’ve found for understanding the questions I had such as “Why do we need Event Logs?”, “How do we utilize Event Logs when designing Real-time Applications?”, “Are there different kinds of Event Logs?”, “Are Event Logs the same as Message Queue, Message Bus?”, etc.

A Personal Perspective on Event Logs

As a personal note, when I read this book the first time, I was challenged in designing a system that could handle a fairly large number of concurrent health-related measurement transactions.  Think measurements such as weight, food intake, exercise completed, etc.  On one hand, the system needed to handle these transactions as fast and as efficiently as possible.  On the other hand, the system needed to aggregate and filter these transactions into composite views such as “How many individuals, with one or more kinds of certain attributes, have recorded their weight in the last 30 minutes.?”.  For example, “how many individuals who live in Kentucky and work at a company called Bourbon, Inc. have eaten more than 2000 calories today?”

Answering these types of questions by querying the database where these transactions were recorded wasn’t fast enough.  Extracting this transactional data from the operational data store and loading it into an analytic store wasn’t fast enough either.

But, back to the book.  This book completely opened my eyes to ways I could have designed the previously mentioned architecture.  If you have struggled with anything similar, then this book is the place to start.  As mentioned, it can be found for free.  Go get it and read it.  Heck, you only have to skim Chapters 1 and 2 to get value and determine if you want to go deeper.  Chapter 3 provides more information on Change Data Capture, which we covered here before.

Types of Event Logs?

Now, I know some of you are asking.  Are Event Logs and Message Queue the same thing?  What about the differences between an Event Log and Message Bus?  Is he thinking something like Apache Kafka, Amazon Kinesis, Google Pub/Sub, Azure Event Hubs?

If you are asking these questions, the short answer is yes-and-no.  For this post, the differences are not as important as the concept of an Event Log.  Let’s cover differences in other posts.  If there are any particular areas you’d like to cover, let me know in the comments below.

Even the “Making Sense of Stream Processing” book deflects questions such as these when Martin writes, “If you want to get a bit more sophisticated, you can introduce an event stream, or a message queue, or an event log (or whatever you want to call it).”

As we get deeper into Data Engineering use cases, we can cover more of the differences between Event Logs in other places on this site.  For example, keep this in mind from the book’s Foreword.

“Whenever people are excited about an idea or technology, they come up with buzzwords to describe it. Perhaps you have come across some of the following terms, and wondered what they are about: “stream processing”, “event sourcing”, “CQRS”, “reactive”, and “complex event processing.

Sometimes, such self-important buzzwords are just smoke and mirrors, invented by companies that want to sell you their solutions. But sometimes, they contain a kernel of wisdom that can really help us design better systems.” -Neha Narkhede

Where Event Logs?

For discussion purposes and to help engrain it in my brain as well, let’s explore some diagrams.

An Application Before Event Logs (Micro Level)

Without Event Log Diagram
Without Event Log Diagram

Now, “application” (App) is intentionally left vague here.  At this point, it’s perfectly fine to consider this application to be a Web application or a Microservice or Log Collection agent or an IoT device.

An Application After Event Logs (again, Micro Level)

With Event Log Diagram
With Event Log Diagram

What are your initial reactions to these two diagrams now?

Does the “Without Event Log” diagram look faster and more straight forward?  The “With Event Log” diagram looks more complicated, and we’re not sure of the added benefits at this point, right?

There was a time when I couldn’t disagree with you, but then I faced the experience described above.  So, let’s take a look at what happens to these diagrams when we move to less isolated scenarios.  (I could have said, “let’s look at the big picture” here, but I didn’t feel like saying it that way, and this is my blog.  I get to call the shots around here.  I’m the Big blog bossman.)

Anyhow, let’s get back to the diagrams.

Over time, when more components require data integration, these diagrams evolve to:

Without Event Log

Without Event Log 2 Diagram
Without an Event Log 2 Diagram


With Event Log

With Event Log 2 Diagram
With an Event Log 2 Diagram

Now, which looks better?

“With an Event Log 2” diagram looks cleaner to me.

But, let’s continue looking ahead to the inevitable time with our architecture must change.

For example, what if I want to perform some calculations such as aggregations, categorizing, filtering, alerting as transaction Events occur within the system.  Well, with an event log in place and the integration components decoupled, we can add a stream processor.

Adaptability to change is another benefit to utilizing the Event Log.  In addition to adding Stream Processors, what happens if we want to integrate a SaaS application data down the line?  Well, we could simply hang it off our Event Log.

For example, notice Stream Processing and an additional SaaS component integration in the following diagram.

Event Logs Provide Flexibility Diagram
Event Logs Provide Flexibility Diagram

(By the way, why stream processors are covered in a different post.)

At this point, I’ll assume you are visually sold on the concept of the Event Log, or at least interested in exploring more. Let’s list further considerations, such as Benefits, Disadvantages, Types, and Technical Differences.  (Let me know if you have something to add in the following lists, by the way.)

Benefits of Event Logs in your Architecture

  • Loosely Coupled Integration
  • Flexible, adaptable, resilient to architectural change of requirements
  • Planning for Failure — some Event Logs are configurable to handle failures in nodes and networks gracefully
  • Events are replayable; i.e. want to retrain a model
  • Events are processed in a guaranteed order
  • Foundation for real-time processing (or as close as possible to it) with Stream Processors
  • Consistency

Disadvantages of Event Logs

  • May seem more complicated and overkill at first
  • Likely introduces a change into your architecture and way of thinking and change can be challenging

Types of Event Logs

  • Apache Kafka, Apache Pulsar
  • Message Queues (RabbitMQ, ActiveMQ, MQSeries, etc.)
  • Cloud-Native (Kinesis, Pub/Sub, Event Hubs, Confluent)

Technical Differences between Event Log implementations

There are technical trade-offs in your choice of Event Logs including

  • Producer / Consumer decoupling; i.e., multiple consumers of the same event and different points of time
  • Scaleability; how many and how fast can events be processed?
  • Replayability (ability to replay events for a particular point in time)
  • Support for Transactions
  • Exactly Once Processing
  • Resiliency — Ability to set the replication factor of events and the level of acknowledgment required for appends to be considered successful.
  • Out-of-the-box connectors for ingress and egress


Event Log Recommended Resources

  • Start with “Making Sense of Stream Processing” by Martin Kleppmann.  Search for it.  You can find it for free from multiple sources at the time of this writing
  • Then, move to Martin’s next book “Designing Data-Intensive Applications”



Featured Image