Spark Structured Streaming with Kafka Example – Part 1

Spark Structured Streaming with Kafka Examples

In this post, let’s explore an example of updating an existing Spark Streaming application to newer Spark Structured Streaming.  We will start simple and then move to a more advanced Kafka Spark Structured Streaming examples.

My original Kafka Spark Streaming post is three years old now.  On the Spark side, the data abstractions have evolved from RDDs to DataFrames and DataSets. RDDs are not the preferred abstraction layer anymore and the previous Spark Streaming with Kafka example utilized DStreams which was the Spark Streaming abstraction over streams of data at the time.  Some of you might recall that DStreams was built on the foundation of RDDs.

From the Spark DStream API docs

“A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a StreamingContext or it can be generated by transforming existing DStreams using operations such as mapwindow and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.”

Because we try not to use RDDs anymore, it can be confusing when there are still Spark tutorials, documentation, and code examples that still show RDD examples.  I’ve updated the previous Spark Streaming with Kafka example to point to this new Spark Structured Streaming with Kafka Example example to try to help clarify.

Ok, let’s show a demo and look at some code.

All source code is available on Github.  See link below.

Spark Structured Streaming with Kafka Examples Overview

We are going to show a couple of demos with Spark Structured Streaming code in Scala reading and writing to Kafka.  The Kafka cluster will consist of three multiple brokers (nodes), schema registry, and Zookeeper all wrapped in a convenient docker-compose example.  Let me know if you have any ideas to make things easier or more efficient.

The Scala code examples will be shown running within IntelliJ as well as deploying to a Spark cluster.

The source code and docker-compose file are available on Github.  See the Resources section below for links.

Requirements

If you want to run these Kafka Spark Structured Streaming examples exactly as shown below, you will need:

Structured Streaming with Kafka Demo

Let’s take a look at a demo.

 

The following items or concepts were shown in the demo--

  • Startup Kafka Cluster with docker-compose -up
  • Need kafkacatas described in Generate Test Data in Kafka Cluster (used an example from a previous tutorial)
  • Run the Spark Kafka example in IntelliJ
  • Build a Jar and deploy the Spark Structured Streaming example in a Spark cluster with spark-submit

This demo assumes you are already familiar with the basics of Spark, so I don’t cover it.

Spark Structured Streaming with Kafka CSV Example

For reading CSV data from Kafka with Spark Structured streaming, these are the steps to perform.

  1. Loaded CSV data into Kafka with cat data/cricket.csv | kafkacat -b localhost:19092 -t cricket_csv
  2. Ran the example  in IntelliJ
  3. The code to highlight is the inputDF DataFrame and use of  the selectExprfunction where we utilized the CASTbuilt SparkSQL function to deserialize the Kafka key and value from the INPUT_CSV topic into a new DataFrame called inputCSV
  4. We output inputCSVto the console with writeStream.

Spark Structured Streaming with Kafka JSON Example

For reading JSON values from Kafka, it is similar to the previous CSV example with a few differences noted in the following steps.

  1. Load JSON example data into Kafka with cat data/cricket.json | kafkacat -b localhost:19092 -t cricket_json -J
  2. Notice the inputJsonDFDataFrame creation.  A couple of noteworthy items are casting to String before using the from_jsonfunction.  We pass the from_jsondeserializer a StructTypeas defined in structCricket.
  3. Next, we create a filtered DataFrame called selectDF and output to the console.

Spark Structured Streaming with Kafka Avro

Reading Avro serialized data from Kafka in Spark Structured Streaming is a bit more involved.

  1. First, load some example Avro data into Kafka with cat data/cricket.json | kafka-avro-console-producer --broker-list localhost:19092 --topic cricket_avro --property value.schema="$(jq -r tostring data/cricket.avsc)"
  2. In the Scala code, we create and register a custom UDF called deserializeand use it in two different ways: once in the creation of valueDataFrameand the other in the creation of jsonDf.  This custom UDF is using a simple implementation of the Confluent AbstractKafkaAvroDeserializer.
  3. To make the data more useful, we convert to a DataFrame by using the Confluent Kafka Schema Registry.  In particular, check out the creation of avroDf . A couple of things to note here.  We seem to have to compute two conversions: 1)  deserialize from Avro to JSON and then 2) convert from JSON with from_jsonfunction similar to previous  JSON example but using a DataType from the spark-avro library this time.

I don’t honestly know if this the most efficient straightforward way when using Avro formatted data with Kafka and Spark Structured Streaming, but I definitely want/need to use the Schema Registry.  If you have some suggestions, please let me know.

Also, as noted in the source code, it appears there might be a different option available from Databricks’ available version of thefrom_avrofunction.  I’ll try it out in the next post.

Spark Structured Streaming Kafka Deploy Example

The build.sbt and project/assembly.sbt files are set to build and deploy to an external Spark cluster.  As shown in the demo, just run assembly and then deploy the jar.

Spark Structured Streaming Kafka Example Conclusion

As mentioned above, RDDs have evolved quite a bit in the last few years.  Kafka has evolved quite a bit as well.  However, one aspect which doesn’t seem to have evolved much is the Spark Kafka integration.  As you see in the SBT file, the integration is still using 0.10 of the Kafka API.  It doesn’t matter for this example, but it does prevent us from using more advanced Kafka constructs like Transaction support introduced in 0.11.  In other words, it doesn’t appear we can effectively set the `isolation level` to `read_committed`  from Spark Kafka consumer in other words.

The Bigger Picture

Hopefully, these examples are helpful for your particular use case(s).  I’d be curious to hear more about what you are attempting to do with Spark reading from Kafka.  Do you plan to build a Stream Processor where you will be writing results back to Kafka?  Or, will you be writing results to an object store or data warehouse and not back to Kafka?

My definition of a Stream Processor in this case is taking source data from an Event Log (Kafka in this case), performing some processing on it, and then writing the results back to Kafka.  These results could be utilized downstream from Microservice or used in Kafka Connect to sink the results into an analytic data store.

While I’m obviously a fan of Spark, I’m curious to hear your reasons to use Spark with Kafka.  You have other options, so I’m interested in hearing from you. Let me know in the comments below.

Resources

 

 

Featured image credit https://pixabay.com/photos/water-rapids-stream-cascade-872016/

Running Kafka Connect – Standalone vs Distributed Mode Examples

Kafka Connect Distributed Standalone Modes Examples

One of the many benefits of running Kafka Connect is the ability to run single or multiple workers in tandem.  Running multiple workers provides a way for horizontal scale-out which leads to increased capacity and/or an automated resiliency.  For resiliency, this means answering the question, “what happens if a particular worker goes offline for any reason?”.  Horizontal scale and failover resiliency are available out-of-the-box without a requirement to run another cluster.

In this post, we’ll go through examples of running Kafka Connect in both Standalone and Distributed mode.

Distributed mode is recommended when running Kafka Connect in production.

Standalone and Distributed Mode Overview

To review, Kafka connectors, whether sources or sinks, run as their own JVM processes called “workers”.  As mentioned, there are two ways workers may be configured to run: Standalone and Distributed.

Now, regardless of mode, Kafka connectors may be configured to run more or tasks within their individual processes. For example, a Kafka Connector Source may be configured to run 10 tasks as shown in the JDBC source example here https://github.com/tmcgrath/kafka-connect-examples/blob/master/mysql/mysql-bulk-source.properties. I wanted to make note of tasks vs. Distributed mode to avoid possible confusion.  Multiple tasks do provide some parallelism or scaling out, but it is a different construct than running in Distributed mode.  Ok, good, that’s out of the way.

Kafka Connect Standalone Mode

Running Kafka Connect in Standalone makes things really easy to get started.  Most of the examples on this site and others so far show running connectors in Standalone. In Standalone mode, a single process executes all connectors and their associated tasks.

There are cases when Standalone mode might make sense in Production.  For example, if you are log shipping from a particular host, it could make sense to run your log source in standalone mode on the host with the log(s) you are interested in ingesting into Kafka.  But generally speaking, you’ll probably want to run in Distributed mode in production.

As you can imagine, Standalone scalability is limited.  Also, there is no automated fault-tolerance out-of-the-box when a connector goes offline.  (Well, you could build an external automated monitoring process to restart failed Standalone connectors I guess, but that’s outside of scope here.  And also, why?  I mean, if you want automated failover, just utilize running in Distributed mode out-of-the-box.)

Kafka Connect Distributed Mode

Running Kafka Connect in Distributed mode runs Connect workers on one or multiple nodes.  When running on multiple nodes, the coordination mechanics to work in parallel does not require an orchestration manager such as YARN.  Let me stop here because this is an important point.  In other words, when running in a “cluster” of multiple nodes, the need to coordinate “which node is doing what?” is required.  This may or may not be relevant to you.  For me personally, I came to this after Apache Spark, so no requirement for an orchestration manager interested me.

The management of Connect nodes coordination is built upon Kafka Consumer Group functionality which was covered earlier on this site.  If Consumer Groups are new to you, check out that link first before proceeding here.

As you would expect with Consumer Groups, Connect nodes running in Distributed mode can evolve by adding or removing more nodes.  Like Consumer Group Consumers, Kafka Connect nodes will be rebalanced if nodes are added or removed.  As touched upon earlier in this post, this ability to expand or contract the number of worker nodes provides both the horizontal scale and fault tolerance inherent in Distributed mode.  Again, this is the same expectation you have when running Kafka Consumers in Consumer Groups.

If this is new to you, my suggestion is to try to keep-it-simple and go back to the foundational principles.  To understand Kafka Connect Distributed mode, spend time exploring Kafka Consumer Groups.

Kafka Connect Standalone and Distributed Mode Examples Overview

Let’s run examples of a connector in Standalone and Distributed mode.  To run in these modes, we are going to run a multi-node Kafka cluster in Docker.  When we run the example of Standalone, we will configure the Standalone connector to use this multi-node Kafka cluster.  And, when we run a connector in Distributed mode, yep, you guessed it, we’ll use this same cluster.

This might seem random, but do you watch TV shows?  Me too.  Why do I ask?

Well, I made a TV show running through the examples here.  I’m hoping it’s helpful for you to watch someone else run these examples if you are attempting to run the examples in your environment.

Here’s me running through the examples in the following “screencast” 🙁

By the way, yes, I know, you are right, most folks call these screencasts and not TV shows.  I get it.  But, it’s more fun to call it a Big Time TV show.

Kafka Connect Standalone and Distributed Mode Example Requirements

To run these examples in your environment, the following are required to be installed and/or downloaded.

  1. Docker installed
  2. Docker-compose installed
  3. Confluent Platform or Apache Kafka downloaded and extracted (so we have access to the CLI scripts like kafka-topics or kafka-topics.sh)

Kafka Cluster Setup

To run these Standalone and Distributed examples, we need access to a Kafka cluster.  It can be Apache Kafka or Confluent Platform.  I’m going to use a docker-compose example I created for the Confluent Platform.

As previously mentioned and shown in the Big Time TV show above, the Kafka cluster I’m using for these examples a multi-broker Kafka cluster in Docker.  I’m using Confluent based images.  The intention is to represent a reasonable, but lightweight production Kafka cluster having multi brokers but not too heavy to require multiple Zookeeper nodes.  If you are not using this cluster, you’ll just have to make configuration adjustments for your environment in the steps below.

  1. Download or clone the repo with the docker-compose.yml file available here https://github.com/tmcgrath/docker-for-demos/tree/master/confluent-3-broker-cluster
  2. Run docker-compose up in the directory which contains this docker-compose.yml file.
  3. Confirm you have external access to the cluster by running kafka-topics --list --bootstrap-server localhost:29092 or kafkacat -b localhost:19092 -L.  Again, running either of these examples presumes you have downloaded, expanded, a distribution of Apache Kafka or Confluent Platform as described in the Requirements section above.  (A note on my environment.  I have CONFLUENT_HOME set as an environment variable and $CONFLUENT_HOME/bin in my path and havekafkacatinstalled.  Your environment may vary.  For example, if you are using Apache Kafka for CLI scripts, use kafka-topics.shinstead of kafka-topics.)

Again, see the…. screencast, pfft, I mean, Big Time TV show, above if you’d like to see a working example of these steps.

If you want to run an Apache Kafka cluster instead of Confluent Platform, you might want to check out https://github.com/wurstmeister/kafka-docker or let me know if you have an alternative suggestion.

Kafka Connect Standalone Example

Both Confluent Platform and Apache Kafka include Kafka Connect sinks and source examples for both reading and writing to files.  For our first Standalone example, let’s use a File Source connector.  Again, I’m going to run through using the Confluent Platform, but I will note how to translate the examples to Apache Kafka.

If your external cluster is running as described above, go to the Confluent root directory in a terminal.  (You may have already set this to CONFLUENT_HOME environment variable).

  1. Kafka cluster is running.  See above.
  2. In a terminal window, cd to where you extracted Confluent Platform.  For my environment, I have this set to a CONFLUENT_HOME environment variable.
  3. Copy etc/kafka/connect-standalone.properties to the local directory; i.e. cp etc/kafka/connect-standalone.properties . (We are going to use a new connect-standalone.properties, so we can customize it for this example only.  This isn’t a requirement.  We could use the default etc/kafka/connect-standalone.properties, but I want to leave it so I can run confluent localcommands, as described in other Kafka Connect examples on this site.)
  4. Open this new connect-standalone.properties file in your favorite editor and change bootstrap.servers value to localhost:19092
  5. Also, make sure the plugin.pathvariable in this connect-standalone.properties file includes the directory where File Source connector jar file resides. For my environment, it is set to /Users/todd.mcgrath/dev/confluent-5.4.1/share/java/kafka.  By the way, my CONFLUENT_HOME var is set to /Users/todd.mcgrath/dev/confluent-5.4.1
  6. Create a test.txtfile with some sample data
  7. Run bin/connect-standalone ./connect-standalone.properties etc/kafka/connect-file-source.properties to start the File Source connector.
  8. If all went well, you should have a connect-test topic now.  You can verify with kafka-topics --list --bootstrap-server localhost:19092
  9. Confirm events are flowing with the console consumer; i.e bin/kafka-console-consumer --bootstrap-server localhost:19092 --topic connect-test

Cool.

Kafka Connect Standalone Configuration

A fundamental difference between Standalone and Distributed appears in this example.  Where and how offsets are stored in the two modes are completely different.  As we’ll see later on in the Distributed mode example, Distributed mode uses Kafka for offset storage, but in Standalone, we see that offsets are stored locally when looking at the connect-standalone.properties file.

Notice the following configuration in particular--

offset.storage.file.filename=/tmp/connect.offsets

Apache Kafka Differences

If you were to run these examples on Apache Kafka instead of Confluent, you’d need to run connect-standalone.sh instead of connect-standalone and the locations of the default locations of connect-standalone.properties, connect-file-source.properties, and the File Source connector jar (for setting in plugins.path) will be different.  You’ll need to adjust accordingly.  Let me know if you have any questions or concerns.

Kafka Connect Distributed Example

You’ll notice differences running connectors in Distributed mode right away.  To start, you don’t pass configuration files for each connector to startup.  Rather, you start up the Kafka Connect Distributed process and then manage via REST calls.  You’ll see in the example, but first let’s make sure you are setup and ready to go.

Kafka Connect Distributed Example -- Part 1 -- Setup

The following steps presume you are in a terminal at the root drive of your preferred Kafka distribution.  Agina, I’m going to use Confluent, so my CLI scripts do not have .shat the end.  Adjust yours as necessary.

First, pre-create 3 topics in the Dockerized cluster for Distributed mode as recommended in the documentation.  I’ll explain why afterward. If you running the Dockerized 3 node cluster described above, change the port from 9092 to 19092 such as:

  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  • Verify all three topics are listed with-- kafka-topics --list --bootstrap-server localhost:19092

Next, cp over the example properties file for Distributed mode so we can customize for this example.  This is similar to what we did above in Standalone mode.

cp etc/kafka/connect-distributed.properties ./connect-distributed-example.properties

Edit this connect-distributed-example.properties in your favorite editor.

  • Change bootstrap.servers=localhost:9092 to bootstrap.servers=localhost:19092
  • Ensure your plugin.path is set to a path that contains the connector JAR you want to run.  Similar to the above, we are going to use File Connector source which is included by default in Confluent.

Ensure you have a test.txt file in your local directory.  The same one from above is fine.

Finally, we need a JSON file with our connector configuration we wish to run in Distributed mode.  Create a connect-file-source.json file and cut-and-paste content into the file from here https://gist.github.com/tmcgrath/794ff6c4922251f2859264abf39866ae

So, you should have a connect-file-source.json file.

Ok, to review the Setup, at this point you should have

  • 3 topics created
  • connect-distributed-example.properties file
  • test.txt file
  • connect-file-source.json file

If you have these 4 things, you, my good-times Internet buddy, are ready to roll.

Kafka Connect Distributed Example -- Part 2 -- Running a Simple Example

  • Startup Kafka Connect in Distributed — bin/connect-distributed connect-distributed-example.properties
  • Ensure this Distributed mode process you just started is ready to accept requests for Connector management via the Kafka Connect REST interface.  I’m going to use curlbut you can use whatever REST client you’d prefer.  The output from the commandcurl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors should be 200.
  • Make REST call to start a new File Source Connector with curl -s -X POST -H 'Content-Type: application/json' --data @connect-file-source.json http://localhost:8083/connectors

Should be good-to-go now, but let’s verify.

  • Is the “local-file-source” connector listed when issuing curl -s http://localhost:8083/connectors
  • In a separate terminal window, is the connect-test topic now listed; i.e. bin/kafka-topics --list --bootstrap-server localhost:19092
  • And can we consume from it?  bin/kafka-console-consumer --bootstrap-server localhost:19092 --topic connect-test.  Add more data to test.txt to watch it in real-time as I did in the screencast shown above.

If verification is successful, let’s shut the connector down with

  • curl -s -X PUT http://localhost:8083/connectors/local-file-source/pause
  • curl -s -X GET http://localhost:8083/connectors/local-file-source/status
  • curl -s -X DELETE http://localhost:8083/connectors/local-file-source
  • curl -s http://localhost:8083/connectors should be empty

Kill the Distributed worker process if you’d like.

So, that’s it!  I hope you did it.  You now know how to run Kafka Connect in Distributed mode.

To recap, here’s what we learned in this section

  • Unlike Standalone, running Kafka Connect in Distributed mode stores the offsets, configurations, and task statuses in Kafka topics.  As recommended, we pre-created these topics rather than aut0-create.  In the Dockerized cluster used above, you may have noticed it allows auto-create of topics.  We didn’t do that.  No way.  Not us.
  • As described, Distributed builds on the mechanics of Consumer Groups, so no surprise to see a group.idvariable in the connect-distributed-examples.properties file.
  • To manage connectors in Distributed mode, we use the REST API interface.  This differs from Standalone where we can pass in configuration properties file from CLI.

Kafka Connect Tutorial Updates

Writing this post inspired me to add resources for running in Distributed mode.

For the Kafka MySQL JDBC tutorial, I added JSON examples to GitHub in case you want to run in Distributed mode.  See https://github.com/tmcgrath/kafka-connect-examples/tree/master/mysql for access.

For the Kafka S3 examples, I also added JSON examples to GitHub in case you want to run in Distributed mode.  See https://github.com/tmcgrath/kafka-connect-examples/tree/master/s3 for access.

For the Kafka Azure tutorial, there is a JSON example for Blob Storage Source available on the Confluent site at https://docs.confluent.io/current/connect/kafka-connect-azure-blob-storage/source/index.html#azure-blob-storage-source-connector-rest-example which might be helpful.

For providing JSON for the other Kafka Connect Examples listed on GitHub, I will gladly accept PRs.

References

 

Featured image https://pixabay.com/photos/house-unit-standalone-architecture-3420617/

GlobalKTable vs KTable in Kafka Streams

KTable vs GlobalKTable

Kafka Streams presents two options for materialized views in the forms of GlobalKTable vs KTables.  We will describe the meaning of “materialized views” in a moment, but for now, let’s just agree there are pros and cons to GlobalKTable vs KTables.

The essential three factors in your decision of when to use a GlobalKTable vs KTable will come down to 1) the number of nodes in your Kafka Streams application, 2) the number of partitions in your underlying topics, and 3) how you plan to join streams.  For the last point on joins, in particular, you will find the choice of GlobalKTable vs KTable most interesting when you are designing and/or implementing Kafka Streams Joins when deployed across multiple nodes using topics having multiple partitions.

To help me understand this further and hopefully you as well, let’s explore GlobalKTable and KTables a bit more.  Also, if the idea of “materialized view” is not clear to you, I think this post will help as well.

Let’s start with “Why KTable?”

KTable is an abstraction on a Kafka topic that can represent the latest state of a key/value pair.  The underlying Kafka topic is likely enabled with log compaction.  When I was first learning about KTables, the idea of UPSERTS immediately came to mind.  Why?  Well, the processing characteristics when attempting inserts or updates are familiar to upsert capable systems.  For example, an attempt to append a key/value pair without an existing key in a KTable will result in an INSERT while an attempt to append a key/value pair with an existing key will result in an UPDATE.

KTable Example

For a simple example of a “materialized view” through KTables, consider an event arriving at the KTable with a key tommy and value of 3. If the tommykey does not exist in the KTable, it will be appended as an INSERT.  On the other hand, when a subsequent event with a key of tommy arrives, the existing tommy key event will be updated.  If the next tommy event has a value of 2, the KTable value for the tommy key will be 2.

Another key takeaway is the update is a simple replace and not a calculation of any sort.  For example, the values of 3 and 2 do not result in a sum of 5.

KTable vs GlobalKTable Considerations

That’s great and simple to understand in isolation.  But, we need to take it further in order to understand why KTable vs GlobalKTable?  For that, we need to explore two more constructs before we get to the results.

The first construct involves the effect of operating Kafka Streams applications across multiple nodes with topics containing multiple partitions.

The second construct involves the ramifications of KTables with multiple underlying topic partitions and multiple Kafka Streams nodes when performing JOINS with other event streams.

Let’s start with KTables operations and then move to joins.

Let’s consider a Kafka Streams application deployed across three nodes with an application.id of ABC.  These three Kafka Streams nodes are interacting with a Kafka Cluster with 3 Broker nodes.  That keeps it simple right?

Now, let’s assume a KTable with an underlying topic that contains 3 partitions.  On any node running your Kafka Streams application, this example KTable will only be populated with 1 partition worth of data.  To illustrate this point, the key/value pair with the tommy key may or may not be present in your KTable.

 

KTable Simple Mapping Diagram
KTable Simple Mapping Diagram

 

This shouldn’t come as shock when you consider how Kafka Streams and Kafka Connect often leverage the capabilities found in Producer and Consumer Kafka APIs.  This 3 node, 3 partitions KTable example with the tommy key event is only present is similar to how Consumers will attach to 1-to-1 to particular partitions when configured in a Kafka Consumer Group.

Ok, so tommyis present in one KTable on a particular Kafka Streams node.  So what?  What’s the big deal? Let’s cover that next when we consider the mechanics of performing join operations.  First, let’s setup our example.

KTable and KStream Join Example

Let’s assume we have a stream of data which represents tommy’s location.  Perhaps the key/value pair is key = tommy and value = {longitude: 68.9063° W, latitude: 41.8101 S}.  Now, imagine this stream of tommy’s location is arriving every 10 minutes or so landing in a KStream with an underlying topic with 6 partitions.  Let’s call this stream the locations stream.

At this point, we have a KTable called tommy key with a materialized view value of 25.  Maybe this is tommy’s current age or his jersey number or his favorite number.  It doesn’t really matter.  But, let’s call this KTable current_status.

What happens if we want to join locations with current_statusin this example?  Should be simple right?  The tommy keys align for performing the joins, but is it that simple?  (hint: remember the 3 Kafka Streams app nodes interacting with a Kafka Cluster with 3 brokers where there are a different number of partitions in the underlying topics of both locationsKStream and the current_status KTable.)

Answer: there’s a chance the join will fail.  Why? Because the Kafka Streams node performing the join may not have the tommy key from both locationsand current_status as shown in the following diagram.

 

KTable to KStream Not Co-partitioned Diagram
KTable to KStream Not Co-partitioned Diagram

 

Another way to describe this scenario is to flip-the-script and ask “are there any requirements when performing joins in Kafka Streams?”.  The answer, of course, is yes.  And in this particular example of a KStream to KTable join, the requirement is that the data which should be joined must be “co-partitioned”.  More on this available in the Resources and References section below.

How to Solve this Example Challenge?

Well, we have two options to ensure the joins will succeed regardless of which node performs the join.  One of the options involve using a GlobalKTable for current_status instead of a KTable.

GlobalKTables are replicate all underlying topic partitions on each instance of KafkaStreams.  So, when we modify our example to use a GlobalKTable, the tommykey event will be all 3 Kafka Stream nodes.  This means a join to the locations stream will succeed regardless of which node performs the join.

Simple right? Well, like many things in software there are trade-offs.

This example doesn’t consider the size of the overall size and mutation velocity of the underlying 3 partition topic for current_status.  It’s not difficult to imagine this example becoming complicated really quickly if this topic is over 10GB.  That’s a lot of synchronization across nodes.  That’s potentially a lot to keep in memory to avoid disk i/o.

So, in essence, whether or not GlobalKTable is a good solution depends on additional factors.  But, hopefully, this post helps describe KTable vs GlobalKTable and when and why you should consider one vs the other.

By the way, the other option to GlobalKTable would be manually ensuring co-partitioning could be creating a new underlying topic(s) for either side of the join and ensuring the same number of underlying partitions AND the same partitioning strategy used.   More on this approach can be found in the Resources and References section below.

Hopefully, this GlobalKTable vs KTable analysis helps.  Let me know in the comments section below.

Resources and References

  • More information on co-partitioning requirements https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#join-co-partitioning-requirements
  • Nice write up on GlobalKTables http://timvanlaer.be/2017-06-28/working-with-globalktables/
  • Historical Perspective on adding GlobalKTables https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649
  • To consider manually ensuring co-partitioning, see “Ensuring data co-partitioning:” section under https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#join-co-partitioning-requirements

 

 

Featured Image credit https://pixabay.com/photos/back-to-nature-climate-4536617/

What and Why Event Logs for Data Engineers?

Why Event Logs diagram

“The idea of structuring data as a stream of events is nothing new, and it is used in many different
fields. Even though the underlying principles are often similar, the terminology is frequently
inconsistent across different fields, which can be quite confusing. Although the jargon can be
intimidating when you first encounter it, don’t let that put you off; many of the ideas are
quite simple when you get down to the core.” –Martin Kleppmann

And so begins Chapter 1 in the book “Making Sense of Stream Processing” by Martin Kleppmann.

This book can be found from numerous different sources for free, and it is the most efficient way to understand why we need Event Logs.

This amazingly succinct ~170-page book the best resource I’ve found for understanding the questions I had such as “Why do we need Event Logs?”, “How do we utilize Event Logs when designing Real-time Applications?”, “Are there different kinds of Event Logs?”, “Are Event Logs the same as Message Queue, Message Bus?”, etc.

A Personal Perspective on Event Logs

As a personal note, when I read this book the first time, I was challenged in designing a system that could handle a fairly large number of concurrent health-related measurement transactions.  Think measurements such as weight, food intake, exercise completed, etc.  On one hand, the system needed to handle these transactions as fast and as efficiently as possible.  On the other hand, the system needed to aggregate and filter these transactions into composite views such as “How many individuals, with one or more kinds of certain attributes, have recorded their weight in the last 30 minutes.?”.  For example, “how many individuals who live in Kentucky and work at a company called Bourbon, Inc. have eaten more than 2000 calories today?”

Answering these types of questions by querying the database where these transactions were recorded wasn’t fast enough.  Extracting this transactional data from the operational data store and loading it into an analytic store wasn’t fast enough either.

But, back to the book.  This book completely opened my eyes to ways I could have designed the previously mentioned architecture.  If you have struggled with anything similar, then this book is the place to start.  As mentioned, it can be found for free.  Go get it and read it.  Heck, you only have to skim Chapters 1 and 2 to get value and determine if you want to go deeper.  Chapter 3 provides more information on Change Data Capture, which we covered here before.

Types of Event Logs?

Now, I know some of you are asking.  Are Event Logs and Message Queue the same thing?  What about the differences between an Event Log and Message Bus?  Is he thinking something like Apache Kafka, Amazon Kinesis, Google Pub/Sub, Azure Event Hubs?

If you are asking these questions, the short answer is yes-and-no.  For this post, the differences are not as important as the concept of an Event Log.  Let’s cover differences in other posts.  If there are any particular areas you’d like to cover, let me know in the comments below.

Even the “Making Sense of Stream Processing” book deflects questions such as these when Martin writes, “If you want to get a bit more sophisticated, you can introduce an event stream, or a message queue, or an event log (or whatever you want to call it).”

As we get deeper into Data Engineering use cases, we can cover more of the differences between Event Logs in other places on this site.  For example, keep this in mind from the book’s Foreword.

“Whenever people are excited about an idea or technology, they come up with buzzwords to describe it. Perhaps you have come across some of the following terms, and wondered what they are about: “stream processing”, “event sourcing”, “CQRS”, “reactive”, and “complex event processing.

Sometimes, such self-important buzzwords are just smoke and mirrors, invented by companies that want to sell you their solutions. But sometimes, they contain a kernel of wisdom that can really help us design better systems.” -Neha Narkhede

Where Event Logs?

For discussion purposes and to help engrain it in my brain as well, let’s explore some diagrams.

An Application Before Event Logs (Micro Level)

Without Event Log Diagram
Without Event Log Diagram

Now, “application” (App) is intentionally left vague here.  At this point, it’s perfectly fine to consider this application to be a Web application or a Microservice or Log Collection agent or an IoT device.

An Application After Event Logs (again, Micro Level)

With Event Log Diagram
With Event Log Diagram

What are your initial reactions to these two diagrams now?

Does the “Without Event Log” diagram look faster and more straight forward?  The “With Event Log” diagram looks more complicated, and we’re not sure of the added benefits at this point, right?

There was a time when I couldn’t disagree with you, but then I faced the experience described above.  So, let’s take a look at what happens to these diagrams when we move to less isolated scenarios.  (I could have said, “let’s look at the big picture” here, but I didn’t feel like saying it that way, and this is my blog.  I get to call the shots around here.  I’m the Big blog bossman.)

Anyhow, let’s get back to the diagrams.

Over time, when more components require data integration, these diagrams evolve to:

Without Event Log

Without Event Log 2 Diagram
Without an Event Log 2 Diagram

 

With Event Log

With Event Log 2 Diagram
With an Event Log 2 Diagram

Now, which looks better?

“With an Event Log 2” diagram looks cleaner to me.

But, let’s continue looking ahead to the inevitable time with our architecture must change.

For example, what if I want to perform some calculations such as aggregations, categorizing, filtering, alerting as transaction Events occur within the system.  Well, with an event log in place and the integration components decoupled, we can add a stream processor.

Adaptability to change is another benefit to utilizing the Event Log.  In addition to adding Stream Processors, what happens if we want to integrate a SaaS application data down the line?  Well, we could simply hang it off our Event Log.

For example, notice Stream Processing and an additional SaaS component integration in the following diagram.

Event Logs Provide Flexibility Diagram
Event Logs Provide Flexibility Diagram

(By the way, why stream processors are covered in a different post.)

At this point, I’ll assume you are visually sold on the concept of the Event Log, or at least interested in exploring more. Let’s list further considerations, such as Benefits, Disadvantages, Types, and Technical Differences.  (Let me know if you have something to add in the following lists, by the way.)

Benefits of Event Logs in your Architecture

  • Loosely Coupled Integration
  • Flexible, adaptable, resilient to architectural change of requirements
  • Planning for Failure — some Event Logs are configurable to handle failures in nodes and networks gracefully
  • Events are replayable; i.e. want to retrain a model
  • Events are processed in a guaranteed order
  • Foundation for real-time processing (or as close as possible to it) with Stream Processors
  • Consistency

Disadvantages of Event Logs

  • May seem more complicated and overkill at first
  • Likely introduces a change into your architecture and way of thinking and change can be challenging

Types of Event Logs

  • Apache Kafka, Apache Pulsar
  • Message Queues (RabbitMQ, ActiveMQ, MQSeries, etc.)
  • Cloud-Native (Kinesis, Pub/Sub, Event Hubs, Confluent)

Technical Differences between Event Log implementations

There are technical trade-offs in your choice of Event Logs including

  • Producer / Consumer decoupling; i.e., multiple consumers of the same event and different points of time
  • Scaleability; how many and how fast can events be processed?
  • Replayability (ability to replay events for a particular point in time)
  • Support for Transactions
  • Exactly Once Processing
  • Resiliency — Ability to set the replication factor of events and the level of acknowledgment required for appends to be considered successful.
  • Out-of-the-box connectors for ingress and egress

 

Event Log Recommended Resources

  • Start with “Making Sense of Stream Processing” by Martin Kleppmann.  Search for it.  You can find it for free from multiple sources at the time of this writing
  • Then, move to Martin’s next book “Designing Data-Intensive Applications”

 

 

Featured Image https://pixabay.com/photos/batch-dry-firewood-forestry-logs-1868104/

Azure Kafka Connect Example – Blob Storage

Azure Kafka Blob Storage Example

In this Azure Kafka tutorial, let’s describe and demonstrate how to integrate Kafka with Azure’s Blob Storage with existing Kafka Connect connectors.  Let’s get a little wacky and cover writing to Azure Blob Storage from Kafka as well as reading from Azure Blob Storage to Kafka.  In this case, “wacky” is a good thing, I hope.

Two types of references are available for your pleasure.  Well, let’s admit the phrase “for your pleasure” is just a saying, you know, and people don’t often mean it when they say it.  Well, I mean it and I hope you find this Kafka with Azure Blob Storage tutorial valuable.

This pleasurable Azure Kafka Azure tutorial contains step-by-step command references, sample configuration file examples for sink and source connectors as well as screencast videos of me demonstrating the setup and execution of the examples.

If you have questions, comments or suggestions for additional content, let me know in the comments below.

Note: It is expected that you have at least some working knowledge of Apache Kafka at this point, but you may not be an expert yet.

The overall goal here is to be focused on Azure Blob Storage Kafka integration through simple-as-possible examples.

Lastly, we are going to demonstrate the examples using Apache Kafka included in Confluent Platform instead of standalone Apache Kafka because the Azure Blob Storage sink and source connectors are commercial offerings from Confluent.

Here we go, let’s boogie.

Requirements

  1. An Azure account with enough permissions to be able to create storage accounts and container (more on this below)
  2. Azure CLI installed (Link in the Resources section below)
  3. Apache Kafka
  4. Download and install the Sink and Source Connectors into your Apache Kafka cluster (Links in the Resources section below)

Azure Blob Storage Setup

I’m going to paste the commands I ran to set up the Storage Container in Azure.  You will need to update the command variable values for your environment wherever appropriate.  Here’s a hint, at minimum, you need to change the tmcgrathstorageaccount and todd.   Those values are mine. You may wish to change other settings like the location variable as well.

1. az login

2. Create a resource group
az group create \
--name todd \
--location centralus

3. Create a storage account
az storage account create \
--name tmcgrathstorageaccount \
--resource-group todd \
--location centralus \
--sku Standard_LRS

For more on SKU types, https://docs.microsoft.com/en-us/rest/api/storagerp/srp_sku_types

4. Create a container
az storage container create \
--account-name tmcgrathstorageaccount \
--name kafka-connect-example \
--auth-mode login

5. For our Kafka Connect examples shown below, we need one of the two keys from the following command’s output.
az storage account keys list \
--account-name tmcgrathstorageaccount \
--resource-group todd \
--output table

Azure Blob Storage with Kafka Overview

When showing examples of connecting Kafka with Blob Storage, this tutorial assumes some familiarity with Apache Kafka, Kafka Connect, and Azure, as previously mentioned, but if you have any questions, just let me know.

Because both the Azure Blob Storage Sink and Source connectors are only available with a Confluent subscription or Confluent Cloud account, demonstrations will be conducted using Confluent Platform running on my laptop.  The goal of this tutorial is to keep things as simple as possible and provide a working example with the least amount of work for you.

Again, we will cover two types of Azure Kafka Blob Storage examples, so this tutorial is organized into two sections.  Section One is writing to Azure Blob Storage from Kafka with the Azure Blob Storage Sink Kafka Connector and the second section is an example of reading from Azure Blob Storage to Kafka.

Kafka Connect Azure Blob Storage Examples

Let’s kick things off with a demo.  In this demo, I’ll run through both the Sink and Source examples.

Now, that we’ve seen working examples, let’s go through the commands that were run and configurations described.

Kafka Connect Azure Blob Storage Sink Example

In the screencast, I showed how to configure and run Kafka Connect with Confluent distribution of Apache Kafka as mentioned above. Afterward seeing a working example, I’ll document each of the steps in case you would like to try.

As you saw if you watched the video, the demo assumes you’ve downloaded the Confluent Platform already. I downloaded the tarball and have my $CONFLUENT_HOME variable set to /Users/todd.mcgrath/dev/confluent-5.4.1

The demo uses an environment variable called AZURE_ACCOUNT_KEY for the Azure Blob Storage Key when using the Azure CLI.

You will need key1 or key2 values from Step 5 in the Azure Blob Storage setup section above and set it in your .properties files.

Steps in screencast

  1. confluent local start
  2. Show sink connector already installed (I previously installed with confluent-hub install confluentinc/kafka-connect-azure-blob-storage:1.3.2)
  3. Note how I copied over the azure-blob-storage-sink.propertiesfile from my Github repo.  The link to Github repo can be found below.
  4. Show updates needed for this file
  5. Show empty Azure Blob Storage container named kafka-connect-example with a command adjusted for your key az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY
  6. Generate 10 events of Avro test data with ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100 iterations=10  See the previous post on test data in Kafka for reference on ways to generate test data into Kafka.
  7. confluent local load azure-bs-sink -- -d azure-blob-storage-sink.properties
  8. az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY
  9. confluent local unload azure-bs-sink
  10. The second example is JSON output, so edit azure-blob-storage-sink.properties file
  11. Generate some different test data with confluent local config datagen-pageviews -- -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (Again, see link in References section below for the previous generation of test data in Kafka post)
  12. Start the sink connector back up with confluent local load azure-bs-sink -- -d azure-blob-storage-sink.properties
  13. List out the new JSON objects landed into Azure with `az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY`

 

Azure Kafka Connect Blob Storage Source Example

If you made it through the Blob Storage Sink example above, you may be thinking the Source example will be pretty easy.  And depending on what time you are reading this, that might be true.  However, if you are reading this in Spring 2020 or so, it’s not exactly straight forward, but it’s not a huge deal either.  I’ll show you what to do.

First, the Azure Blob Storage Source connector is similar to the other source examples in Amazon Kafka S3 as well as GCP Kafka Cloud Storage.  They are similar in a couple of ways.  One, if you are also using the associated sink connector to write from Kafka to S3 or GCS and you are attempting to read this data back into Kafka, you may run into an infinite loop where what is written back to Kafka is written to the cloud storage and back to Kafka and so on.  This means use the Azure Kafka Blob Storage Source connector independent of the sink connector or use an SMT to transform when writing back to Kafka.  I’ll cover both of these below.

Another similarity is Azure Kafka Connector for Blob Storage requires a Confluent license after 30 days.  This means we will use the Confluent Platform in the following demo.

Please note: as warned above, at the time of this writing, I needed to remove some jar files from the source connector in order to proceed.  See “Workaround” section below.

Steps in screencast

  1. confluent local start (I had already installed the Source connector and made the updates described in “Workaround” section below)
  2. I copied over azure-blob-storage-source.propertiesfile from my Github repo.  Link below.
  3. Show no existing topics with kafka-topics --list --bootstrap-server localhost:9092
  4. az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY which shows existing data on Azure Blob Storage from the previous Sink tutorial
  5. Load the source connector confluent local load azure-bs-source -- -d azure-blob-storage-source.properties
  6. kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081
  7. In this example, the destination topic did not exist, so let’s simulate the opposite.  What would we do if the destination topic does exist?
  8. Modify azure-blob-storage-source.properties file.  Uncomment SMT transformation section.

Workaround (Spring 2020)

When attempting to use kafka-connect-azure-blob-storage-source:1.2.2 connector
with Confluent 5.4.1, the connector fails with the following

Caused by: java.lang.ClassCastException: io.netty.channel.kqueue.KQueueEventLoopGroup can
not be cast to io.netty.channel.EventLoopGroup

It can be resolved if the Azure Blob Storage Source’s Netty libs are removed; i.e.
rm -rf ./share/confluent-hub-components/confluentinc-kafka-connect-azure-blob-storage-source/lib/netty-*

Kafka Connect Azure Blob Storage Source Example with Apache Kafka

The Azure Blob Storage Kafka Connect Source is a commercial offering from Confluent as described above, so let me know in the comments below if you find more suitable for self-managed Kafka.  Thanks.

 

Azure Kafka Examples with Azure Blob Storage Helpful Resources

 

Featured image https://pixabay.com/photos/barrel-kegs-wooden-heritage-cask-52934/