Spark Structured Streaming with Kafka Example – Part 1

Spark Structured Streaming with Kafka Examples

In this post, let’s explore an example of updating an existing Spark Streaming application to newer Spark Structured Streaming.  We will start simple and then move to a more advanced Kafka Spark Structured Streaming examples.

My original Kafka Spark Streaming post is three years old now.  On the Spark side, the data abstractions have evolved from RDDs to DataFrames and DataSets. RDDs are not the preferred abstraction layer anymore and the previous Spark Streaming with Kafka example utilized DStreams which was the Spark Streaming abstraction over streams of data at the time.  Some of you might recall that DStreams was built on the foundation of RDDs.

From the Spark DStream API docs

“A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a StreamingContext or it can be generated by transforming existing DStreams using operations such as mapwindow and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.”

Because we try not to use RDDs anymore, it can be confusing when there are still Spark tutorials, documentation, and code examples that still show RDD examples.  I’ve updated the previous Spark Streaming with Kafka example to point to this new Spark Structured Streaming with Kafka Example example to try to help clarify.

Ok, let’s show a demo and look at some code.

All source code is available on Github.  See link below.

Spark Structured Streaming with Kafka Examples Overview

We are going to show a couple of demos with Spark Structured Streaming code in Scala reading and writing to Kafka.  The Kafka cluster will consist of three multiple brokers (nodes), schema registry, and Zookeeper all wrapped in a convenient docker-compose example.  Let me know if you have any ideas to make things easier or more efficient.

The Scala code examples will be shown running within IntelliJ as well as deploying to a Spark cluster.

The source code and docker-compose file are available on Github.  See the Resources section below for links.

Requirements

If you want to run these Kafka Spark Structured Streaming examples exactly as shown below, you will need:

Structured Streaming with Kafka Demo

Let’s take a look at a demo.

 

The following items or concepts were shown in the demo--

  • Startup Kafka Cluster with docker-compose -up
  • Need kafkacatas described in Generate Test Data in Kafka Cluster (used an example from a previous tutorial)
  • Run the Spark Kafka example in IntelliJ
  • Build a Jar and deploy the Spark Structured Streaming example in a Spark cluster with spark-submit

This demo assumes you are already familiar with the basics of Spark, so I don’t cover it.

Spark Structured Streaming with Kafka CSV Example

For reading CSV data from Kafka with Spark Structured streaming, these are the steps to perform.

  1. Loaded CSV data into Kafka with cat data/cricket.csv | kafkacat -b localhost:19092 -t cricket_csv
  2. Ran the example  in IntelliJ
  3. The code to highlight is the inputDF DataFrame and use of  the selectExprfunction where we utilized the CASTbuilt SparkSQL function to deserialize the Kafka key and value from the INPUT_CSV topic into a new DataFrame called inputCSV
  4. We output inputCSVto the console with writeStream.

Spark Structured Streaming with Kafka JSON Example

For reading JSON values from Kafka, it is similar to the previous CSV example with a few differences noted in the following steps.

  1. Load JSON example data into Kafka with cat data/cricket.json | kafkacat -b localhost:19092 -t cricket_json -J
  2. Notice the inputJsonDFDataFrame creation.  A couple of noteworthy items are casting to String before using the from_jsonfunction.  We pass the from_jsondeserializer a StructTypeas defined in structCricket.
  3. Next, we create a filtered DataFrame called selectDF and output to the console.

Spark Structured Streaming with Kafka Avro

Reading Avro serialized data from Kafka in Spark Structured Streaming is a bit more involved.

  1. First, load some example Avro data into Kafka with cat data/cricket.json | kafka-avro-console-producer --broker-list localhost:19092 --topic cricket_avro --property value.schema="$(jq -r tostring data/cricket.avsc)"
  2. In the Scala code, we create and register a custom UDF called deserializeand use it in two different ways: once in the creation of valueDataFrameand the other in the creation of jsonDf.  This custom UDF is using a simple implementation of the Confluent AbstractKafkaAvroDeserializer.
  3. To make the data more useful, we convert to a DataFrame by using the Confluent Kafka Schema Registry.  In particular, check out the creation of avroDf . A couple of things to note here.  We seem to have to compute two conversions: 1)  deserialize from Avro to JSON and then 2) convert from JSON with from_jsonfunction similar to previous  JSON example but using a DataType from the spark-avro library this time.

I don’t honestly know if this the most efficient straightforward way when using Avro formatted data with Kafka and Spark Structured Streaming, but I definitely want/need to use the Schema Registry.  If you have some suggestions, please let me know.

Also, as noted in the source code, it appears there might be a different option available from Databricks’ available version of thefrom_avrofunction.  I’ll try it out in the next post.

Spark Structured Streaming Kafka Deploy Example

The build.sbt and project/assembly.sbt files are set to build and deploy to an external Spark cluster.  As shown in the demo, just run assembly and then deploy the jar.

Spark Structured Streaming Kafka Example Conclusion

As mentioned above, RDDs have evolved quite a bit in the last few years.  Kafka has evolved quite a bit as well.  However, one aspect which doesn’t seem to have evolved much is the Spark Kafka integration.  As you see in the SBT file, the integration is still using 0.10 of the Kafka API.  It doesn’t matter for this example, but it does prevent us from using more advanced Kafka constructs like Transaction support introduced in 0.11.  In other words, it doesn’t appear we can effectively set the `isolation level` to `read_committed`  from Spark Kafka consumer in other words.

The Bigger Picture

Hopefully, these examples are helpful for your particular use case(s).  I’d be curious to hear more about what you are attempting to do with Spark reading from Kafka.  Do you plan to build a Stream Processor where you will be writing results back to Kafka?  Or, will you be writing results to an object store or data warehouse and not back to Kafka?

My definition of a Stream Processor in this case is taking source data from an Event Log (Kafka in this case), performing some processing on it, and then writing the results back to Kafka.  These results could be utilized downstream from Microservice or used in Kafka Connect to sink the results into an analytic data store.

While I’m obviously a fan of Spark, I’m curious to hear your reasons to use Spark with Kafka.  You have other options, so I’m interested in hearing from you. Let me know in the comments below.

Resources

 

 

Featured image credit https://pixabay.com/photos/water-rapids-stream-cascade-872016/

Share! Share! Share! Chant it with me now

Leave a Reply

Your email address will not be published. Required fields are marked *