Spark Streaming with Kafka Example


Spark Streaming with Kafka is becoming so common in data pipelines these days, it’s difficult to find one without the other.   This tutorial will present an example of streaming Kafka from Spark.  In this example, we’ll be feeding weather data into Kafka and then processing this data from Spark Streaming in Scala.  As the data is processed, we will save the results to Cassandra.

(Note: this Spark Streaming Kafka tutorial assumes some familiarity with Spark and Kafka.  For further information, you may wish to reference Kafka tutorial section of this site or Spark Tutorials with Scala and in particular Spark Streaming tutorials)

*** UPDATED 2020 ***
Updated Spark Streaming with Kafka example

Also, for more on Structured Spark Streaming

Structured Spark Streaming examples with CSV, JSON, Avro, and Schema Registry 

Table of Contents

Before we dive into the example, let’s look at a little background on Spark Kafka integration because there are multiple ways to integrate and it may be confusing.

Kafka and Spark Background

There are two ways to use Spark Streaming with Kafka: Receiver and Direct.  The receiver option is similar to other unreliable sources such as text files and socket.   Similar to these receivers, data received from Kafka is stored in Spark executors and processed by jobs launched by Spark Streaming context.

This approach can lose data under failures, so it’s recommended to enable Write Ahead Logs (WAL) in Spark Streaming (introduced in Spark 1.2).  WAL synchronously saves all the received Kafka data into logs on a distributed file system (e.g HDFS, S3, DSEFS), so that all data can be recovered on possible failure.  Another way of saying this is duplication.  We duplicate the data in order to be resilient.  If you do not like the sound of this then, please keep reading.

In our example, we want zero data loss, but not the overhead of write-ahead logs.  We are going to go with an approach referred to as “Direct”.

Direct Spark Streaming from Kafka was introduced in Spark 1.3.

This approach periodically queries Kafka for the latest offsets in each topic + partition and subsequently defines the offset ranges to process in each batch.

This approach has the following advantages:

  • Parallelism: No need to create multiple input Kafka streams and union them as was often done with the Receiver approach.
  • Efficiency: No need for Write Ahead Log (WAL) which was caused by processing overhead and duplication.  As long as you have sufficient retention time windows in Kafka, the messages from Spark Streaming can be recovered from Kafka.
  • Exactly-once semantics: We use Kafka API and not Zookeeper.  Offsets are tracked within Spark Streaming checkpoints (if enabled).

Even with these three advantages, you might be wondering if there are any disadvantages with the Kafka direct approach?  Well, yes, there is one.

Because the direct approach does not update offsets in Zookeeper, Kafka monitoring tools based on Zookeeper will not show progress.  As a possible workaround, you can access the offsets processed by this approach in each batch and update Zookeeper yourself.

In sum, I believe it’s important to note you may see examples of the Receiver based approach in code examples and documentation, but it is not the recommended approach for Kafka Spark integration.

Ok, with this background in mind, let’s dive into the example.

Spark Streaming with Kafka Example

With this history of Kafka Spark Streaming integration in mind, it should be no surprise we are going to go with the direct integration approach.

All the following code is available for download from Github listed in the Resources section below.  We’re going to go fast through these steps.  Also, there is a screencast demo of this tutorial also listed in the Resources section below.

Step 1 Spark Streaming with Kafka Build Setup

Update your build file to include the required Spark Kafka library.  In the provided example, we’re going to use SBT and add the following line:

"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2"

As you’ll notice, the build.sbt file also includes other libraries and configuration related to the `assembly` plugin.  We use the `assembly` plugin to help build fat jars for deployment.

Step 2 Spark Streaming with Kafka Scala Code

Next, we’re going to write the Scala code.  The entire Scala code is found in com.supergloo.WeatherDataStream.  

We won’t go over line by line here.  Instead, let’s focus on the highlights.

val topics: Set[String] = kafkaTopicRaw.split(",").map(_.trim).toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBroker)

val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
(ssc, kafkaParams, topics)

We pass our StreamingContext, Kafka config map and the topics to query to the `createDirectStream` function.  The type hints are the type of Kafka message key (String), the type of Kafka message value (String) and key and value message decoders (StringDecoders)

The createDirectStream function returns a DStream of (Kafka message key, message value)

Step 3 Spark Streaming with Kafka Build Fat Jar

In SBT, build the fat jar with `sbt assembly` or just `assembly` if in the SBT REPL.

Step 4 Spark Streaming with Kafka Download and Start Kafka

Next, let’s download and install bare-bones Kafka to use for this example.  We can follow the quick step guide found here https://kafka.apache.org/quickstart

You’ll need to update your path appropriately for the following commands
depending on where Kafka; i.e. where is Kafka `bin` dir.  For example, my Kafka `bin` dir is `/Users/toddmcgrath/Development/kafka_2.11-0.10.1.1\bin`

a) Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.properties`

b) Start Kafka `bin/kafka-server-start.sh config/server.properties`

c) Create Kafka topic `bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic raw_weather`

Again, make note of the path for Kafka `bin` as it is needed in later steps.

Step 5 Cassandra Setup

Make sure you have a Cassandra instance running and the schema has been created.  The schema is included in the source of this tutorial in the `cql` directory.   To create, just start up `cqlsh` and source the `create-timeseries.cql` file.  For example

tm@tmcgrath-rmbp15 spark-1.6.3-bin-hadoop2.6 $ cqlsh
Connected to Test Cluster at 127.0.0.1:9042.

[cqlsh 5.0.1 | Cassandra 3.0.9.1346 | DSE 5.0.3 | CQL spec 3.4.0 | Native protocol v4]

Use HELP for help. cqlsh> source ‘create-timeseries.cql’;

Of course, you’ll need to adjust for the location of your `create-timeseries.cql` file.

Step 6 Spark Streaming with Kafka Deploy

Make sure Spark master is running and has available worker resources.  Then, deploy with `spark-submit`.  I assume you are familiar with this already, but here’s an example

~/Development/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "com.supergloo.WeatherDataStream" --master spark://todd-mcgraths-macbook-pro.local:7077 target/scala-2.10/kafka-streaming-assembly-1.0.jar

Step 7 Spark Streaming with Kafka Send Data, Watch Processing, Be Merry

Final step, let’s see this baby in action.  Let’s load some data into the appropriate Kafka topic.  There is a CSV file available in the project’s `data/load/` directory.

~/Development/kafka_2.11-0.10.1.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic raw_weather < ny-2008.csv

So, this assumes I’m running the `kafka-console-producer.sh` script from the `data/load/` directory because there is in explicit path location for the `ny-2008.csv` file.

If you go back to where you started the driver, you should see the data flowing through.  Also, if check Cassandra, you see data saved.  If you don’t believe me, check out the screencast below where I demo most of these steps.

Conclusion

This Spark Kafka tutorial provided an example of streaming data from Kafka through Spark and saving to Cassandra.  I hope you found it helpful.  Let me know if you have any questions or suggestions in comments below.

References and Resources

Much thanks to the Killrweather application.  Inspiration and portions of the app’s source code was used for this tutorial.  https://github.com/killrweather/killrweather

All the source code, SBT build file, the whole shebang can be found here  Use the `kafka-streaming` directory.  https://github.com/tmcgrath/spark-scala

Latest Spark Kafka documentation starting point

I also recorded a screencast of this tutorial seen here

Featured image credit https://flic.kr/p/7867Jz

See also  Spark Streaming Testing with Scala by Example
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

2 thoughts on “Spark Streaming with Kafka Example”

  1. Hi there. Great article. I wonder if you’ve managed to get this to work with a newer version of Spark > 2.0. I’ve been unable to find example code that compiles with
    Scala 2.11, Kafka 0.10 and Spark 2.1.1.

    It looks like Python has been dropped from support for
    Kafka 0.10 so I’m trying the Scala code example at https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html. However it gives me errors about not being in a class so I don’t think it’s a complete example.

    If you have any pointers I’d appreciate it. (I’m new these products but have been coding a long time).

    Thanks,
    DJ

    Reply
  2. Hi,

    Thank you for this excellent post. with help of this post, I have managed to create a jar file and submit job to spark, which processed stream and write it HDFS.

    However I have been trying to run commands in spark shell to create spark stream from kafka ingestion.When I tried to import the jar file “spark-streaming-kafka” % “1.6.2” I get below mentioned error

    The path ‘spark-streaming-kafka-XX.jar’ cannot be loaded, because existing classpath entries conflict.

    could you please let me know how to solve this ?

    Thanks in advance.

    Regards,
    Reddy

    Reply

Leave a Comment