Spark Streaming with Kafka is becoming so common in data pipelines these days, it’s difficult to find one without the other. This tutorial will present an example of streaming Kafka from Spark. In this example, we’ll be feeding weather data into Kafka and then processing this data from Spark Streaming in Scala. As the data is processed, we will save the results to Cassandra.
Before we dive into the example, let’s look at little background on Spark Kafka integration because there are multiple ways to integrate and it may be confusing.
Kafka and Spark Background
There are the two ways to use Spark Streaming with Kafka: Receiver and Direct. The receiver option is similar to other unreliable sources such text files and socket. Similar to these receivers, data received from Kafka is stored in Spark executors and processed by jobs launched by Spark Streaming context.
This approach can lose data under failures, so it’s recommended to enable Write Ahead Logs (WAL) in Spark Streaming (introduced in Spark 1.2). WAL synchronously saves all the received Kafka data into logs on a distributed file system (e.g HDFS, S3, DSEFS), so that all data can be recovered on possible failure. Another way of saying this is duplication. We duplicate the data in order to be resilient. If you do not like the sound of this then, please keep reading.
In our example, we want zero-data loss, but not the overhead of write ahead logs. We are going to go with an approach referred to as “Direct”.
Direct Spark Streaming from Kafka was introduced in Spark 1.3.
This approach periodically queries Kafka for the latest offsets in each topic + partition and subsequently defines the offset ranges to process in each batch.
This approach has the following advantages:
- Parallelism: No need to create multiple input Kafka streams and union them as was often done with the Receiver approach.
- Efficiency: No need for Write Ahead Log (WAL) which was caused processing overhead and duplication. As long as you have sufficient retention time windows in Kafka, the messages from Spark Streaming can be recovered from Kafka.
- Exactly-once semantics: We use Kafka API and not Zookeeper. Offsets are tracked within Spark Streaming checkpoints (if enabled).
Even with these three advantages, you might be wondering if there are any disadvantages with the Kafka direct approach? Well, yes, there is one.
Because the direct approach does not update offsets in Zookeeper, Kafka monitoring tools based on Zookeeper will not show progress. As a possible workaround, you can access the offsets processed by this approach in each batch and update Zookeeper yourself.
In sum, I believe it’s important to note you may see examples of the Receiver based approach in code examples and documentation, but it is not the recommended approach for Kafka Spark integration.
Ok, with this background in mind, let’s dive into the example.
Spark Streaming with Kafka Example
With this history of Kafka Spark Streaming integration in mind, it should be no surprise we are going to go with the direct integration approach.
All the following code is available for download from Github listed in the Resources section below. We’re going to go fast through these steps. Also, there is a screencast demo of this tutorial also listed in the Resources section below.
Step 1 Spark Streaming with Kafka Build Setup
Update your build file to include the required Spark Kafka library. In the provided example, we’re going to use SBT and add the following line:
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2"
As you’ll notice, the build.sbt file also includes other libraries and configuration related to the `assembly` plugin. We use the `assembly` plugin to help build fat jars for deployment.
Step 2 Spark Streaming with Kafka Scala Code
Next, we’re going to write the Scala code. The entire Scala code is found in `com.supergloo.WeatherDataStream`. We won’t go over line by line here. Instead, let’s focus on the highlights.
val topics: Set[String] = kafkaTopicRaw.split(",").map(_.trim).toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBroker) val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topics)
We pass our StreamingContext, Kafka config map and the topics to query to the `createDirectStream` function. The type hints are the type of Kafka message key (String), the type of Kafka message value (String) and key and value message decoders (StringDecoders)
The `createDirectStream` function returns a DStream of (Kafka message key, message value)
Step 3 Spark Streaming with Kafka Build Fat Jar
In SBT, build the fat jar with `sbt assembly` or just `assembly` if in the SBT REPL.
Step 4 Spark Streaming with Kafka Download and Start Kafka
Next, let’s download and install bare-bones Kafka to use for this example. We can follow the quick step guide found here https://kafka.apache.org/quickstart
You’ll need to update your path appropriately for the following commands
depending on where Kafka; i.e. where is Kafka `bin` dir. For example, my Kafka `bin` dir is `/Users/toddmcgrath/Development/kafka_2.11-0.10.1.1\bin`
a) Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.properties`
b) Start Kafka `bin/kafka-server-start.sh config/server.properties`
c) Create Kafka topic `bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic raw_weather`
Again, make note of the path for Kafka `bin` as it is needed in later steps.
Step 5 Cassandra Setup
Make sure you have a Cassandra instance running and the schema has been created. The schema is included in the source of this tutorial in the `cql` directory. To create, just start up `cqlsh` and source the `create-timeseries.cql` file. For example
tm@tmcgrath-rmbp15 spark-1.6.3-bin-hadoop2.6 $ cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 188.8.131.526 | DSE 5.0.3 | CQL spec 3.4.0 | Native protocol v4] Use HELP for help. cqlsh> source 'create-timeseries.cql';
Of course, you’ll need to adjust for the location of your `create-timeseries.cql` file.
Step 6 Spark Streaming with Kafka Deploy
Make sure Spark master is running and has available worker resources. Then, deploy with `spark-submit`. I assume you are familiar with this already, but here’s an example
~/Development/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "com.supergloo.WeatherDataStream" --master spark://todd-mcgraths-macbook-pro.local:7077 target/scala-2.10/kafka-streaming-assembly-1.0.jar
Step 7 Spark Streaming with Kafka Send Data, Watch Processing, Be Merry
Final step, let’s see this baby in action. Let’s load some data into the appropriate Kafka topic. There is a CSV file available in the project’s `data/load/` directory.
~/Development/kafka_2.11-0.10.1.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic raw_weather < ny-2008.csv
So, this assumes I’m running the `kafka-console-producer.sh` script from the `data/load/` directory because there is in explicit path location for the `ny-2008.csv` file.
If you go back to where you started the driver, you should see the data flowing through. Also, if check Cassandra, you see data saved. If you don’t believe me, check out the screencast below where I demo most of these steps.
This Spark Kafka tutorial provided an example of streaming data from Kafka through Spark and saving to Cassandra. I hope you found it helpful. Let me know if you have any questions or suggestions in comments below.
References and Resources
Much thanks to the Killrweather application. Inspiration and portions of the app’s source code was used for this tutorial. https://github.com/killrweather/killrweather
All the source code, SBT build file, the whole shebang can be found here Use the `kafka-streaming` directory. https://github.com/tmcgrath/spark-scala
Latest Spark Kafka documentation starting point
I also recorded a screencast of this tutorial seen here
Look Ma, I’m on YouTube.
Featured image credit https://flic.kr/p/7867Jz