Spark Structured Streaming with Kafka Example – Part 1

Spark Structured Streaming with Kafka Examples

In this post, let’s explore an example of updating an existing Spark Streaming application to newer Spark Structured Streaming.  We will start simple and then move to a more advanced Kafka Spark Structured Streaming examples.

My original Kafka Spark Streaming post is three years old now.  On the Spark side, the data abstractions have evolved from RDDs to DataFrames and DataSets. RDDs are not the preferred abstraction layer anymore and the previous Spark Streaming with Kafka example utilized DStreams which was the Spark Streaming abstraction over streams of data at the time.  Some of you might recall that DStreams was built on the foundation of RDDs.

From the Spark DStream API docs

“A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a StreamingContext or it can be generated by transforming existing DStreams using operations such as mapwindow and reduceByKeyAndWindow. While a Spark Streaming program is running, each DStream periodically generates a RDD, either from live data or by transforming the RDD generated by a parent DStream.”

Because we try not to use RDDs anymore, it can be confusing when there are still Spark tutorials, documentation, and code examples that still show RDD examples.  I’ve updated the previous Spark Streaming with Kafka example to point to this new Spark Structured Streaming with Kafka Example example to try to help clarify.

Ok, let’s show a demo and look at some code.

All source code is available on Github.  See link below.

Spark Structured Streaming with Kafka Examples Overview

We are going to show a couple of demos with Spark Structured Streaming code in Scala reading and writing to Kafka.  The Kafka cluster will consist of three multiple brokers (nodes), schema registry, and Zookeeper all wrapped in a convenient docker-compose example.  Let me know if you have any ideas to make things easier or more efficient.

The Scala code examples will be shown running within IntelliJ as well as deploying to a Spark cluster.

The source code and docker-compose file are available on Github.  See the Resources section below for links.


If you want to run these Kafka Spark Structured Streaming examples exactly as shown below, you will need:

Structured Streaming with Kafka Demo

Let’s take a look at a demo.


The following items or concepts were shown in the demo--

  • Startup Kafka Cluster with docker-compose -up
  • Need kafkacatas described in Generate Test Data in Kafka Cluster (used an example from a previous tutorial)
  • Run the Spark Kafka example in IntelliJ
  • Build a Jar and deploy the Spark Structured Streaming example in a Spark cluster with spark-submit

This demo assumes you are already familiar with the basics of Spark, so I don’t cover it.

Spark Structured Streaming with Kafka CSV Example

For reading CSV data from Kafka with Spark Structured streaming, these are the steps to perform.

  1. Loaded CSV data into Kafka with cat data/cricket.csv | kafkacat -b localhost:19092 -t cricket_csv
  2. Ran the example  in IntelliJ
  3. The code to highlight is the inputDF DataFrame and use of  the selectExprfunction where we utilized the CASTbuilt SparkSQL function to deserialize the Kafka key and value from the INPUT_CSV topic into a new DataFrame called inputCSV
  4. We output inputCSVto the console with writeStream.

Spark Structured Streaming with Kafka JSON Example

For reading JSON values from Kafka, it is similar to the previous CSV example with a few differences noted in the following steps.

  1. Load JSON example data into Kafka with cat data/cricket.json | kafkacat -b localhost:19092 -t cricket_json -J
  2. Notice the inputJsonDFDataFrame creation.  A couple of noteworthy items are casting to String before using the from_jsonfunction.  We pass the from_jsondeserializer a StructTypeas defined in structCricket.
  3. Next, we create a filtered DataFrame called selectDF and output to the console.

Spark Structured Streaming with Kafka Avro

Reading Avro serialized data from Kafka in Spark Structured Streaming is a bit more involved.

  1. First, load some example Avro data into Kafka with cat data/cricket.json | kafka-avro-console-producer --broker-list localhost:19092 --topic cricket_avro --property value.schema="$(jq -r tostring data/cricket.avsc)"
  2. In the Scala code, we create and register a custom UDF called deserializeand use it in two different ways: once in the creation of valueDataFrameand the other in the creation of jsonDf.  This custom UDF is using a simple implementation of the Confluent AbstractKafkaAvroDeserializer.
  3. To make the data more useful, we convert to a DataFrame by using the Confluent Kafka Schema Registry.  In particular, check out the creation of avroDf . A couple of things to note here.  We seem to have to compute two conversions: 1)  deserialize from Avro to JSON and then 2) convert from JSON with from_jsonfunction similar to previous  JSON example but using a DataType from the spark-avro library this time.

I don’t honestly know if this the most efficient straightforward way when using Avro formatted data with Kafka and Spark Structured Streaming, but I definitely want/need to use the Schema Registry.  If you have some suggestions, please let me know.

Also, as noted in the source code, it appears there might be a different option available from Databricks’ available version of thefrom_avrofunction.  I’ll try it out in the next post.

Spark Structured Streaming Kafka Deploy Example

The build.sbt and project/assembly.sbt files are set to build and deploy to an external Spark cluster.  As shown in the demo, just run assembly and then deploy the jar.

Spark Structured Streaming Kafka Example Conclusion

As mentioned above, RDDs have evolved quite a bit in the last few years.  Kafka has evolved quite a bit as well.  However, one aspect which doesn’t seem to have evolved much is the Spark Kafka integration.  As you see in the SBT file, the integration is still using 0.10 of the Kafka API.  It doesn’t matter for this example, but it does prevent us from using more advanced Kafka constructs like Transaction support introduced in 0.11.  In other words, it doesn’t appear we can effectively set the `isolation level` to `read_committed`  from Spark Kafka consumer in other words.

The Bigger Picture

Hopefully, these examples are helpful for your particular use case(s).  I’d be curious to hear more about what you are attempting to do with Spark reading from Kafka.  Do you plan to build a Stream Processor where you will be writing results back to Kafka?  Or, will you be writing results to an object store or data warehouse and not back to Kafka?

My definition of a Stream Processor in this case is taking source data from an Event Log (Kafka in this case), performing some processing on it, and then writing the results back to Kafka.  These results could be utilized downstream from Microservice or used in Kafka Connect to sink the results into an analytic data store.

While I’m obviously a fan of Spark, I’m curious to hear your reasons to use Spark with Kafka.  You have other options, so I’m interested in hearing from you. Let me know in the comments below.




Featured image credit

Spark Kinesis Example – Moving Beyond Word Count

spark kinesis

If you are looking for Spark with Kinesis example, you are in the right place.  This Spark Streaming with Kinesis tutorial intends to help you become better at integrating the two.

In this tutorial, we’ll examine some custom Spark Kinesis code and also show a screencast of running it.  In addition, we’re going to cover running, configuring, sending sample data and AWS setup.  Finally, I’m going to list out some links for the content which helped me become more comfortable in Spark Kinesis code and configuration.

Ready?  Here we go.

If you have questions or suggestions, please let me know in the comment form below.

Spark Kinesis Tutorial Example Overview

In this example, we’re going to simulate sensor devices recording their temperature to a Kinesis stream.  This Kinesis stream will be read from our Spark Scala program every 2 seconds and notify us of two things:

  1. If a sensor’s temperature is above 100
  2. The top two sensors’ temps over the previous 20 seconds

So, nothing too complicated, but close enough to a possible real-world scenario of reading and analyzing stream(s) of data and acting on certain results.

In this tutorial, here’s how we’re going to cover things in the following order

  1. Check assumptions
  2. Present the code (Scala) and configuration example
  3. Go through AWS Kinesis setup and also Amazon Kinesis Data Generator
  4. Run in IntelliJ
  5. How to build and deploy outside of IntelliJ
  6. Rejoice, savor the moment, and thank the author of this tutorial with $150 PayPal donation

Sound good? I hope so, let’s begin.

Spark with Kinesis Tutorial Assumptions

I’m making the following assumptions about you when writing this tutorial.

  1. You have a basic understanding of Amazon Kinesis
  2. You have experience writing and deploying Apache Spark programs
  3. You have an AWS account and understand using AWS costs money.  (AKA: moolah, cash money).  It costs you or your company money to run in AWS.
  4. You have set your AWS access id and key appropriately for your environment.

If any of these assumptions are incorrect, you are probably going to struggle with this Spark Kinesis integration tutorial.  See Spark Tutorials in Scala or Spark Tutorial in Python and What is Apache Spark for background information.

Spark Kinesis Example Scala Code

Let’s start with the code…

object SparkKinesisExample extends Logging {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Kinesis Read Sensor Data")
    conf.setIfMissing("spark.master", "local[*]")

    // Typesafe config - load external config from src/main/resources/application.conf
    val kinesisConf = ConfigFactory.load.getConfig("kinesis")

    val appName = kinesisConf.getString("appName")
    val streamName = kinesisConf.getString("streamName")
    val endpointUrl = kinesisConf.getString("endpointUrl")

    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials != null,
      "No AWS credentials found. See")
    val kinesisClient = new AmazonKinesisClient(credentials)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val numStreams = numShards
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()

    val ssc = new StreamingContext(conf, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

    // Union all the streams (in case numStreams > 1)
    val unionStreams = ssc.union(kinesisStreams)

    val sensorData = { byteArray =>
      val Array(sensorId, temp, status) = new String(byteArray).split(",")
      SensorData(sensorId, temp.toInt, status)

    val hotSensors: DStream[SensorData] = sensorData.filter(_.currentTemp > 100)

    hotSensors.print(1) // remove me if you want... this is just to spit out timestamps

    println(s"Sensors with Temp > 100") { sd =>
      println(s"Sensor id ${} has temp of ${sd.currentTemp}")

    // Hotest sensors over the last 20 seconds
    hotSensors.window(Seconds(20)).foreachRDD { rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      val hotSensorDF = rdd.toDF()

      val hottestOverTime = spark.sql("select * from hot_sensors order by currentTemp desc limit 5")

    // To make sure data is not deleted by the time we query it interactively

case class SensorData(id: String, currentTemp: Int, status: String)

(Note: The entire code is available from my Github repo.  See links in the Resources section below.)

The first 20 or so lines of the `main` function are just setting things up.  For example, we are reading the particulars of the Kinesis stream (streamName, endpointURL, etc.) from a config file.  You will need to change the config variables in the file `src/main/resources/application.conf` to values appropriate for your Kinesis setup.

Then, we start utilizing the code provided by Amazon.  You’ll see reference to `spark--streaming--kinesis--asl` in the build.sbt file in the Github repo.

Next, we create a dynamic number of streams `kinesisStreams` based on the number of shards configured in our Kinesis stream.  Make sure to view the screencast below for further insight on this subject.  In the screencast, I go over setting up Kinesis and running this program.

We utilize the AWS SDK `KinesisUtils` object’s `createStream` method to register each stream.  We set the initial position to start in the stream as the very latest record.  In other words, don’t worry about anything that has already been added to the stream.

(Some of you might be wondering at this point… this doesn’t look like Spark Structured Streaming?  And then you might be thinking, we should be using Structured Streaming for anything new right?  I’d say you’re right.  But, at the time of this writing, Structured Streaming for Kinesis is not available in Spark outside of DataBricks.  Relevant links on this subject below in the Resouces section.)

After we create the streams, we perform a `union` see can analyze each stream as one.  After the union, we convert our stream from an `Array[Byte]` to `DStream[SensorData]`.  If you set up your Kinesis stream with 1 shard as I did, there will only be one stream.

From here, we perform our pseudo business logic.  We are looking for sensors which might be running hot (e.g. over 100).  If this was a real application, our code might trigger an event based on this temperature.

This example also gives us the opportunity to perform some spark streaming windowing.  Windowing allows us to analyze and consider data that previously arrived in the stream and not only data present compute time (the current iteration of micro-batch).  For example, if we wanted to determine the hottest two sensors over the past 20 seconds and not just sensor data in a particular batch.  To this, consider the code starting at `hotSensors.window` block.  Within the block, notice the import for implicits.  This allows us to convert our RDDs of `SensorData` to DataFrames later in the block.

That’s it.  I think the code speaks for itself, but as I mentioned above, let me know if you have any questions or suggestions for improvement.


Setting up Kinesis requires an AWS account.  In this example, it’s nothing fancy.  I created in the us-west-2 region because that’s where the Kinesis Generator is, but I don’t think it really matters.  The Kinesis stream is just 1 shard (aka partition) with default settings on others.  I did need to modify security policies to work, which I show in the screencast.

Amazon Kinesis Generator Setup

I like being lazy sometimes.  So, when I found that Amazon provides a service to send fake data to Kinesis, I jumped all over it.  I mean, sure, I could write my own Java, Python or Scala program to do it, but using this Kinesis Generator was easier and faster.  I like how it has integrated Faker in order to provide dynamic data.

Running Spark Kinesis Scala code in IntelliJ

I’m going to run this in IntelliJ because it simulates how I work.  I like to develop and test in IntelliJ first before building and deploying a jar.  So, check out the screencast for some running-in-intellij-fun. In order to run in IntelliJ, I’ve customized my `build.sbt` file and updated the Run/Debug window in Intellij to `intellijRunner` classpath.  Also, as previously mentioned, you need to update the configuration settings in the `src/main/resources/application.conf` file.

(Might be interested in Debugging Spark in IntelliJ here)

Finally, as previously mentioned I assume you have your AWS security (id and key) setup with confirmed working.  I don’t show how I set mine up in the screencast, but I include a link for more information in Resources below.

Build and Deploy Spark Kinesis Example

The entire project is configured with SBT assembly plugin.  See`project/assembly.sbt`  To build a deployable jar, run the `assembly` sbt task.  Nothing fancy in the deploy then…just deploy with `spark-submit` and reference the `com.supergloo.SparkKinesisExample` class.

(For more info, might be interested Spark Deploy tutorial or Spark EC2 Deploy Tutorial)

Cash Money Time

I like money.  This is the part where you send me $150.   I’ll wait here until you send it.  Thanks in advance.  🙂

Spark Kinesis Screencast

While I’m waiting for you to send me money, check out the screencast

I’m still waiting for you to send the cash $$$ by the way.


Featured image credit:

Spark Streaming with Kafka Example

Spark Streaming with Kafka

Spark Streaming with Kafka is becoming so common in data pipelines these days, it’s difficult to find one without the other.   This tutorial will present an example of streaming Kafka from Spark.  In this example, we’ll be feeding weather data into Kafka and then processing this data from Spark Streaming in Scala.  As the data is processed, we will save the results to Cassandra.

(Note: this Spark Streaming Kafka tutorial assumes some familiarity with Spark and Kafka.  For further information, you may wish to reference Kafka tutorial section of this site or Spark Tutorials with Scala and in particular Spark Streaming tutorials)

*** UPDATED 2020 ***
Updated tutorial at

Structured Spark Streaming examples with CSV, JSON, Avro, and Schema Registry 

Before we dive into the example, let’s look at a little background on Spark Kafka integration because there are multiple ways to integrate and it may be confusing.

Kafka and Spark Background

There are two ways to use Spark Streaming with Kafka: Receiver and Direct.  The receiver option is similar to other unreliable sources such as text files and socket.   Similar to these receivers, data received from Kafka is stored in Spark executors and processed by jobs launched by Spark Streaming context.

This approach can lose data under failures, so it’s recommended to enable Write Ahead Logs (WAL) in Spark Streaming (introduced in Spark 1.2).  WAL synchronously saves all the received Kafka data into logs on a distributed file system (e.g HDFS, S3, DSEFS), so that all data can be recovered on possible failure.  Another way of saying this is duplication.  We duplicate the data in order to be resilient.  If you do not like the sound of this then, please keep reading.

In our example, we want zero data loss, but not the overhead of write-ahead logs.  We are going to go with an approach referred to as “Direct”.

Direct Spark Streaming from Kafka was introduced in Spark 1.3.

This approach periodically queries Kafka for the latest offsets in each topic + partition and subsequently defines the offset ranges to process in each batch.

This approach has the following advantages:

  • Parallelism: No need to create multiple input Kafka streams and union them as was often done with the Receiver approach.
  • Efficiency: No need for Write Ahead Log (WAL) which was caused by processing overhead and duplication.  As long as you have sufficient retention time windows in Kafka, the messages from Spark Streaming can be recovered from Kafka.
  • Exactly-once semantics: We use Kafka API and not Zookeeper.  Offsets are tracked within Spark Streaming checkpoints (if enabled).

Even with these three advantages, you might be wondering if there are any disadvantages with the Kafka direct approach?  Well, yes, there is one.

Because the direct approach does not update offsets in Zookeeper, Kafka monitoring tools based on Zookeeper will not show progress.  As a possible workaround, you can access the offsets processed by this approach in each batch and update Zookeeper yourself.

In sum, I believe it’s important to note you may see examples of the Receiver based approach in code examples and documentation, but it is not the recommended approach for Kafka Spark integration.

Ok, with this background in mind, let’s dive into the example.

Spark Streaming with Kafka Example

With this history of Kafka Spark Streaming integration in mind, it should be no surprise we are going to go with the direct integration approach.

All the following code is available for download from Github listed in the Resources section below.  We’re going to go fast through these steps.  Also, there is a screencast demo of this tutorial also listed in the Resources section below.

Step 1 Spark Streaming with Kafka Build Setup

Update your build file to include the required Spark Kafka library.  In the provided example, we’re going to use SBT and add the following line:

"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2"

As you’ll notice, the build.sbt file also includes other libraries and configuration related to the `assembly` plugin.  We use the `assembly` plugin to help build fat jars for deployment.

Step 2 Spark Streaming with Kafka Scala Code

Next, we’re going to write the Scala code.  The entire Scala code is found in `com.supergloo.WeatherDataStream`.  We won’t go over line by line here.  Instead, let’s focus on the highlights.

val topics: Set[String] = kafkaTopicRaw.split(",").map(_.trim).toSet
val kafkaParams = Map[String, String]("" -> kafkaBroker)

val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
(ssc, kafkaParams, topics)

We pass our StreamingContext, Kafka config map and the topics to query to the `createDirectStream` function.  The type hints are the type of Kafka message key (String), the type of Kafka message value (String) and key and value message decoders (StringDecoders)

The `createDirectStream` function returns a DStream of (Kafka message key, message value)

Step 3 Spark Streaming with Kafka Build Fat Jar

In SBT, build the fat jar with `sbt assembly` or just `assembly` if in the SBT REPL.

Step 4 Spark Streaming with Kafka Download and Start Kafka

Next, let’s download and install bare-bones Kafka to use for this example.  We can follow the quick step guide found here

You’ll need to update your path appropriately for the following commands
depending on where Kafka; i.e. where is Kafka `bin` dir.  For example, my Kafka `bin` dir is `/Users/toddmcgrath/Development/kafka_2.11-\bin`

a) Start Zookeeper `bin/ config/`

b) Start Kafka `bin/ config/`

c) Create Kafka topic `bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic raw_weather`

Again, make note of the path for Kafka `bin` as it is needed in later steps.

Step 5 Cassandra Setup

Make sure you have a Cassandra instance running and the schema has been created.  The schema is included in the source of this tutorial in the `cql` directory.   To create, just start up `cqlsh` and source the `create-timeseries.cql` file.  For example

tm@tmcgrath-rmbp15 spark-1.6.3-bin-hadoop2.6 $ cqlsh
Connected to Test Cluster at
[cqlsh 5.0.1 | Cassandra | DSE 5.0.3 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh> source 'create-timeseries.cql';

Of course, you’ll need to adjust for the location of your `create-timeseries.cql` file.

Step 6 Spark Streaming with Kafka Deploy

Make sure Spark master is running and has available worker resources.  Then, deploy with `spark-submit`.  I assume you are familiar with this already, but here’s an example

~/Development/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "com.supergloo.WeatherDataStream" --master spark://todd-mcgraths-macbook-pro.local:7077 target/scala-2.10/kafka-streaming-assembly-1.0.jar

Step 7 Spark Streaming with Kafka Send Data, Watch Processing, Be Merry

Final step, let’s see this baby in action.  Let’s load some data into the appropriate Kafka topic.  There is a CSV file available in the project’s `data/load/` directory.

~/Development/kafka_2.11- --broker-list localhost:9092 --topic raw_weather < ny-2008.csv

So, this assumes I’m running the `` script from the `data/load/` directory because there is in explicit path location for the `ny-2008.csv` file.

If you go back to where you started the driver, you should see the data flowing through.  Also, if check Cassandra, you see data saved.  If you don’t believe me, check out the screencast below where I demo most of these steps.


This Spark Kafka tutorial provided an example of streaming data from Kafka through Spark and saving to Cassandra.  I hope you found it helpful.  Let me know if you have any questions or suggestions in comments below.

References and Resources

Much thanks to the Killrweather application.  Inspiration and portions of the app’s source code was used for this tutorial.

All the source code, SBT build file, the whole shebang can be found here  Use the `kafka-streaming` directory.

Latest Spark Kafka documentation starting point

I also recorded a screencast of this tutorial seen here

Look Ma, I’m on YouTube.

Featured image credit

Spark Streaming Testing with Scala Example

Spark Streaming Testing

Spark Streaming Testing

How do you create and automate tests of Spark Streaming applications?  In this tutorial, we’ll show an example of one way in Scala.  This post is heavy on code examples and has the added bonus of using a code coverage plugin.

Are the tests in this tutorial examples unit tests?  Or, are the examples integration tests?  Functional tests?   I don’t know, you tell me in the comments below if you have an opinion.  If I had to choose, I’d say unit tests because we are stubbing the streaming provider.


As I’m sure you can guess, you will need some Spark Streaming Scala code to test.  We’re going to use our Spark Streaming example from Slack code in this post.  So, check that out first if you need some streaming Scala code to use.  It’s not required to use that code though.  You should be able to get the concepts presented and apply to your own code if desired.  All the testing code and Spark streaming example code is available to pull from Github anyhow.

We’re going to use `sbt` to build and run tests and create coverage reports.  So, if you are not using `sbt` please translate to your build tool accordingly.

Spark Streaming Testing Overview

In order to write automated tests for Spark Streaming, we’re going to use a third party library called scalatest.  Also, we’re going to add an sbt plugin called “sbt-coverage”.  Then, with these tools in hand, we can write some Scala test code and create test coverage reports.


  1. Pull Spark Streaming code example from github
  2. Describe Updates to build.sbt
  3. Create project/plugins.sbt
  4. Write Scala code
  5. Execute tests and coverage reports

Pull Spark Streaming Code Example from Github

If you don’t want to copy-and-paste code, you can pull it from github.  Just pull the spark-course repo from and the project we are working from is in the spark-streaming-tests directory.

Updates to the Previous build.sbt

build.sbt should be updated to include a new command alias as well as the scalatest 3rd party lib as seen below:

  scalaVersion := "2.11.8"
  +addCommandAlias("sanity", ";clean ;compile ;coverage ;test; coverageReport")
  resolvers += "jitpack" at ""
 @@ -19,5 +21,6 @@ libraryDependencies ++= Seq(
  // comment above line and uncomment the following to run in sbt
  // "org.apache.spark" %% "spark-streaming" % "1.6.1",
    "org.scalaj" %% "scalaj-http" % "2.3.0",
 -  "org.jfarcand" % "wcs" % "1.5" 
 +  "org.jfarcand" % "wcs" % "1.5",
 +  "org.scalatest" %% "scalatest" % "2.2.6" % "test"

Notice how we add “test” to the end of the libraryDependencies sequence to indicate the library is only needed for tests.

Create project/plugins.sbt

Add a new line for the sbt-coverage plugin as seen here:

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

Write Scala Tests

Actually, before we write the actual tests, we’re going to update our previous SlackStreamingApp’s `main` method to facilitate automated tests.  I know, I know, if we would have written SlackStreamingApp with TDD, then we wouldn’t have to do this, right?  😉

Anyhow, it’s not a huge change.

 object SlackStreamingApp {
    def main(args: Array[String]) {
      val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
      val ssc = new StreamingContext(conf, Seconds(5))
      val stream = ssc.receiverStream(new SlackReceiver(args(1)))
 -    if (args.length > 2) {
 -      stream.saveAsTextFiles(args(2))
 -    }
 +    processStream(args, stream)
 +  def processStream(args: Array[String], stream: DStream[String]): Unit = {
 +    args match {
 +      case Array(_, _, path, _*) => stream.saveAsTextFiles(args(2))
 +      case _ => return
 +    }
 +  }

As you can hopefully see, we just needed to extract the code looking for a command-line arg into a new function called `processStream`.  Also, we need to add one more line to the imports at the top

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

Next, we write the testing code.  To start, we need to create new directories to store the test code.  Create src/test/scala/com/supergloo directories.  Next, we add test code to this directory by creating the following Scala file: src/test/scala/com/supergloo/SlackStreamingTest.scala

package com.supergloo

import com.supergloo.SlackStreamingApp._
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ClockWrapper, Seconds, StreamingContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try

class SlackStreamingTest extends FlatSpec with Matchers with Eventually with BeforeAndAfter {

  private val master = "local[1]"
  private val appName = "spark-streaming-test"
  private val filePath: String = "target/testfile"

  private var ssc: StreamingContext = _

  private val batchDuration = Seconds(1)

  var clock: ClockWrapper = _

  before {
    val conf = new SparkConf()
      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")

    ssc = new StreamingContext(conf, batchDuration)
    clock = new ClockWrapper(ssc)

  after {
    if (ssc != null) {
    Try(Path(filePath + "-1000").deleteRecursively)

  "Slack Streaming App " should " store streams into a file" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    processStream(Array("", "", filePath), dstream)


    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))

    eventually(timeout(2 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (2)


  "Slack Streaming App " should " store empty streams if no data received" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    processStream(Array("", "", filePath), dstream)



    eventually(timeout(1 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (0)


  "Slack Streaming App " should " not store streams if argument is not passed" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    processStream(Array("", ""), dstream)

    val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")


    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))

    eventually(timeout(3 seconds)){
      a [InvalidInputException] should be thrownBy {
        wFile.count() should be (0)

Next, we need to create addition directories and add ClockWrapper.scala to src/test/scala/org/apache/spark/streaming/.  More on this class later.

package org.apache.spark.streaming

import org.apache.spark.util.ManualClock

  * This class is defined in this package as the ManualClock is
  * private in the "spark" package
class ClockWrapper(ssc: StreamingContext) {

  def getTimeMillis(): Long = manualClock().getTimeMillis()

  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)

  def advance(timeToAdd: Long) = manualClock().advance(timeToAdd)

  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)

  private def manualClock(): ManualClock = {

(By the way, ClockWrapper is taken from an approach I saw on Spark unit testing.  See “Additional Resouces” section below for link.)

Ok, we’re ready to execute now.

Execute Scala tests and coverage reports

In the spark-streaming-tests directory, we can now issue `sbt sanity` from command-line.  You should see all three tests pass:

[info] SlackStreamingTest:
[info] Slack Streaming App 
[info] - should store streams into a file
[info] Slack Streaming App 
[info] - should store empty streams if no data received
[info] Slack Streaming App 
[info] - should not store streams if argument is not passed
[info] Run completed in 4 seconds, 436 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

To review coverage reports, simply open target/scala-2.11/scoverage-report/index.html in a browser.

Spark Streaming Testing Conclusion

Hopefully, this Spark Streaming unit test example helps start your Spark Streaming testing approach.  We covered a code example, how to run and viewing the test coverage results.  If you have any questions or comments, let me know.  Also, subscribe to the Supergloo YouTube channel for an upcoming screencast from this post.

Additional Resources

Featured image credit

Spark Streaming Example – How to Stream from Slack

Spark Streaming Example

Let’s write a Spark Streaming example which streams from Slack in Scala.  This tutorial will show how to write, configure and execute the code, first.  Then, the source code will be examined in detail.  If you don’t have a Slack team,  you can set one up for free.   We’ll cover that too.  Sound fun?  Okee dokee, let’s do it.

This Spark Streaming tutorial assumes some familiarity with Spark Streaming.  For a getting started tutorial see Spark Streaming with Scala Example or see the Spark Streaming tutorials.

Let’s start with a big picture overview of the steps we will take.

Spark Streaming Example Overview

  1. Setup development environment for Scala and SBT
  2. Write code
  3. Configure Slack for stream access
  4. Start Apache Spark in Standalone mode
  5. Run the Spark Streaming app
  6. Revisit code to describe the fundamental concepts.

So, our initial target is running code.  Then, we’ll examine the source code in detail.

1. Setup Spark Streaming Development Environment for Scala and SBT

Let’s follow SBT directory conventions.  Create a new directory to start.  I’m going to call mine spark-streaming-example.  The following are commands to create the directory, but you can use a window manager if you wish as well.  If this directory structure doesn’t make sense to you or you haven’t compiled Scala code with SBT before, this post is probably isn’t the best for you.  Sorry, I had to write that.  I don’t mean it as a personal shot against you.  I’m sure you are a wonderful and interesting person.  This post isn’t super advanced, but I just want to be upfront and honest with you.  It’s better for both of us in the long run.

Anyhow, where were we?  oh yeah, directory structure.

mkdir spark-streaming-example
cd spark-streaming-example
mkdir src
mkdir src/main
mkdir src/main/scala
mkdir src/main/scala/com
mkdir src/main/scala/com/supergloo

Next, create a build.sbt file in the root of your dev directory.  Ready for a surprise?  Surprise!  My build.sbt will be in the spark-streaming-example/ directory.

The build.sbt I’m using is:

name := "spark-streaming-example"

version := "1.0"

scalaVersion := "2.11.8"

resolvers += "jitpack" at ""

libraryDependencies ++= Seq("org.apache.spark" % "spark-streaming_2.11" % "1.6.1",

  "org.scalaj" %% "scalaj-http" % "2.2.1",

  "org.jfarcand" % "wcs" % "1.5")

You see what’s happening, right?  I said, RIGHT!  Hope you didn’t jump out of your chair there.  I wasn’t yelling, but just want to make sure you’re still with me.

In a nutshell: I’m going to use Scala 2.11.8 and grab a few dependencies such as Spark Streaming 2.11, Scalaj-http and WCS.  There are links to these are more descriptions later on this post.  In short, we need `wcs` to make a websocket connection to slack and `scalaj-http` is for a http client.  Remember, our first goal is working code and then we’ll come back to more detailed descriptions.  Stay with me.

2. Write Scala Code

I called this step “write Scala code”, but the more I think about it, this isn’t entirely accurate.  In fact, I’m going to write the code and you can copy-and-paste.  Lucky you.  But, feel free to type it if you want.  You’ll probably remember it more that way.

You need two files:

In the src/main/scala/com/supergloo directory, a file called SlackReceiver.scala with following contents:

package com.supergloo

import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{TextListener, WebSocket}

import scala.util.parsing.json.JSON
import scalaj.http.Http

* Spark Streaming Example Slack Receiver from Slack
class SlackReceiver(token: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {

  private val slackUrl = ""

  private var thread: Thread = _

  override def onStart(): Unit = {
     thread = new Thread(this)

  override def onStop(): Unit = {

  override def run(): Unit = {

  private def receive(): Unit = {
     val webSocket = WebSocket().open(webSocketUrl())
     webSocket.listener(new TextListener {
       override def onMessage(message: String) {

  private def webSocketUrl(): String = {
    val response = Http(slackUrl).param("token", token).asString.body
    JSON.parseFull(response).get.asInstanceOf[Map[String, Any]].get("url").get.toString


And you’ll need another file in the same directory called SlackStreamingApp.scala with following contents:

package com.supergloo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

  * Spark Streaming Example App
object SlackStreamingApp {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
    val ssc = new StreamingContext(conf, Seconds(5))
    val stream = ssc.receiverStream(new SlackReceiver(args(1)))
    if (args.length > 2) {


Ok, at this point, “we” are finished with code.  And by “we”, I mean you.

I think it would be a good idea to make sure SBT is happy.

So, try `sbt compile`.  For my environment, I’m going to run this from command-line in the spark-streaming-example folder.  In the Resources section of this post, there is a link to YouTube screencast of me running this.  Maybe that could be helpful for you too.  I don’t know.  You tell me.  Actually, don’t tell me if it worked.  Let me know in the page comments what didn’t work.  It works on my machine.  Ever hear that one before?

3. Configure Slack for API access

You need an OAuth token for API access to Slack and to run this Spark Streaming example.  Luckily for us, Slack provides test tokens that do not require going through all the OAuth redirects.  That token will be perfect for this example.

To get a token, go to to list the Slack teams you have joined.  Here’s what my looks like (without the blue arrow):

Spark Streaming Example From Slack

I greyed some out to protect the innocent.  The point is, you should see a green box for “Create Token”.  Look again at the screenshot above and where the blue arrow points.  You should have this option.  And if you don’t, there is another option for you.

It’s easy to set up your own, free, Slack team site.  And when you do, by default, the new team setup will have API access enabled.  So, create a new team if you don’t have Create Token button from any of your existing teams.  Start here

Once you have a new team set up or whenever you have a “Create Token” button available on the previously mentioned OAuth test token page, click it to generate a token.  Save that token, because you will need it soon.  But first, “we” need to start Spark so we can run this example.  We are in this together, you and me.  Here we go.

4. Start Apache Spark in Standalone mode

I presume you have an Apache Spark environment to use.  If you don’t, you might be a bit ahead of yourself with a Spark Streaming tutorial like this one.   If you are ahead of yourself, I like your style.  Nothing like jumping into the deep end first.  But, this pool might be empty and you could get hurt.  I don’t mean hurt literally.  It’s more mental than physical.

There are plenty of resources on this site to help get the setup running a Spark Cluster Standalone.  As I said, you need that.  But, let’s continue if you want.

For this Spark Streaming in Scala tutorial, I’m going to go with the most simple Spark setup possible.  See Spark Streaming in Scala section for additional tutorials.

That means we’re going to run Spark in Standalone mode.   You see, I get to make the decisions around here.  I’m a big shot blogger.  Ok, ok, I know, not really a big shot.  But, a guy can dream.  And I actually do not dream of becoming a big shot blogger.  I dream of taking my kids on adventures around the world.  I dream of watching movies.  I sometimes dream of watching movies while my kids are someplace on the other side of the world.

Anyhow, if you are a big shot with your own Spark Cluster running, you can run this example code on that too.  Your call.  Evidently, you are the boss around here.

Ok, boss, start a Spark Standalone Master from command-line:

~/Development/spark-1.5.1-bin-hadoop2.4 $ sbin/

You should call or your Windows equivalent from the location appropriate for your environment.  For me, that’s the spark-1.5.1-bin-hadoop2.4 directory.  You knew that by looking at the example though didn’t you?

Next start a worker:

~/Development/spark-1.5.1-bin-hadoop2.4 $ sbin/ spark://todd-mcgraths-macbook-pro.local:7077

You do not want to add `spark://todd-mcgraths-macbook-pro.local:7077` when starting up your Spark worker.  That’s mine.  Leave it blank or set it to something appropriate for your machine.  todd-mcgraths-macbook-pro.local is my laptop, not yours.

Ok, you should be able to tell if everything is ok with Spark startup.  If not, you are definitely in trouble with this tutorial.  You probably need to slow down a bit there speedy.  But, you are the boss.

You may need to open another command window to run the next step.

5. Run the Spark Streaming app

Scala and Spark fan, here we go.  Listen, I know sbt can be a bear sometimes.  It takes some time for it to become a `simple build tool`.  But, I’m not going to go over that here.  Ok?    

1) Start SBT in the directory where build.sbt is located.

~/Development/spark-streaming-example $ sbt

2) In your sbt console:

run local[5] <your-oauth-token> output

What you should see:

After the SlackStreamingApp starts, you will see JSON retrieved from Slack. Holy moly, let me repeat: JSON from Slack.  We did it!   Dora might yell Lo Hicimos! at this point.  Or maybe Boots would say that.  I can’t remember and don’t care.  You don’t either.  

Depending on your log settings, things might scroll through your console pretty fast.

You can verify by adding messages to the Slack team from OAuth token access.  You’ll also be streaming messages for Slack events such as joining and leaving channels, bots, etc.  

Wow, we actually did it.  You and me, kid.  I had confidence in you the whole time.  I believed in you when no one else did.  Well, honestly, not really.  This is the Internet after all.  But, every once and while, I’m pleasantly surprised.  I still think you’re pretty neat.

6. Revisit Spark Streaming Code -- Describe Key Concepts

Ok, let’s revisit the code and start with external dependencies.  As briefly noted in the build.sbt section, we connected to Slack over a WebSocket.  To make a WebSocket connection and parse the incoming JSON data, we used three things: an external WebSocket Scala library (wcs), an external HttpClient library(scalaj-http) and the native JSON parser in Scala.  Again, links to the external libraries in use are located in Resources section below.  We see all three of these in action in two SlackReceiver functions.

  private def receive(): Unit = {
    val webSocket = WebSocket().open(webSocketUrl())
    webSocket.listener(new TextListener {
      override def onMessage(message: String) {

  private def webSocketUrl(): String = {
    val response = Http(slackUrl).param("token", token).asString.body
    JSON.parseFull(response).get.asInstanceOf[Map[String, Any]]

The webSocketUrl function is using the OAuth token we sent in the first argument to `run`.  More on that soon. Note the parsing the incoming response data as JSON in  JSON.parseFull.  We sent the OAuth token from SlackStreamingApp when we initialized the SlackReceiver:

val stream = ssc.receiverStream(new SlackReceiver(args(1)))

Also, we see in the `webSocketUrl` function we are expecting JSON and the schema key/value pairs of Map[String, Any].

Ok, that covers the external libraries in use.  Let’s keep going.

Recall from earlier Spark Streaming tutorials on this site (links in Resources below), Spark Streaming can be thought of as a micro-batch system.  Instead of waiting and processing streaming data one record at a time, Spark Streaming discretizes the streaming data into micro-batches. Or, in other words, Spark Streaming’s Receivers accept data in parallel and buffer it in the memory of Spark worker nodes.

Micro-batches poll stream sources at specified timeframes.  What is the poll frequency for this example app?  It’s every 5 seconds as declared in the SlackStreaming app code:

    val ssc = new StreamingContext(conf, Seconds(5))

And what about StreamingContext?  The StreamingContext is a type of context which is specific to Spark Streaming.  Surprised?  Of course, you are not.  You could just tell by the name StreamingContext, right?  I said RIGHT!?  Did you jump out of your chair that time?  I hope so.  You need a StreamingContext when you are building streaming apps.

Back to the SlackReceiver class now.  Extending `Receiver` is what we do when building a custom receiver for Spark Streaming.  And if you haven’t guessed by now, let me tell you, we built a custom receiver for Slack.  Well, would you look at us?  We built a custom receiver.  Somebody get us a trophy.  Or a ribbon.  Or a ribbon trophy.

The class declaration:

class SlackReceiver(token: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable {

There are a few things to note about this declaration.  First, `Runnable` trait usage is for convenience to run this sample.  I thought it would make things easier to run this from SBT.

We’re setting StorageLevel to memory only


This is the default.  Nothing fancy here.  This stores RDDs as deserialized objects in the JVM.  If storage needs grow beyond what’s available, it will not spill to disk and will need to be recomputed each time something is needed and is not in memory.  Again, we don’t need anything more in this example.  Check other examples such as MEMORY_AND_DISK, MEMORY_ONLY_SER, DISK_ONLY and others if you want more info on Storage Levels.  This is a Spark Streaming post, dang it.

Finally, when extending Receiver we override three functions.  (Or, you might see some examples of calling `run` from within `onStart`, but we’re not going to do that here.  Why?  Because I’m the big shot boss and a “visionary”.  On second thought, I’m a big shot visionary or BSV as they say in the business.)

Where were we?!   Don’t let me get off track, partner.  We need to override two functions because Receiver is an abstract class:

  override def onStart(): Unit = {
    thread = new Thread(this)

  override def onStop(): Unit = {

`onStart` is spawning a new thread to receive the stream source.  This triggers a call to our overridden Thread `run` function which calls the previously described `receive` function.

`onStop` is there to ensure any spawned threads are stopped when the receiver is stopped.

(Not shown, but Exceptions while receiving can be handled either by restarting the receiver with `restart` or stopped completely by `stop`   See Receiver docs for more information.)

So, that’s the code.  But, let’s also consider how this example was invoked.  One important detail is the use of “5”:

run local[5] <your-oauth-token> output

Why 5?  If we use “local” or “local[1]” as the master URL, only one thread will be used for running tasks.  When using an input DStream based on a Streaming receiver, a single thread will be used to run the receiver which leaves no thread for processing the received data. So, always use “local[n]” as the master URL, where n > number of receivers to run.

When running on a Spark Cluster outside of Standalone mode, the number of cores allocated to the Spark Streaming application must be more than the number of receivers.

Finally, we’ll close with a fairly insignificant detail.  The last argument is “output” which you can see from the SlackStreamingApp is used here:

    if (args.length > 2) {

This second argument is optional and specifies if the stream data should be saved to disk and to which directory.


So, all joking aside, I hope this Spark Streaming example helps you.  I like to help nice people who try.  I hope you are one of those types of people.

You might enjoy signing up for the mailing list, following Twitter and subscribing on YouTube.  You have options.  I think the links to these sites are on the bottom of each page.  To be honest, I’m not entirely sure I want you to follow or subscribe, but I don’t think I can actually prevent you from doing so.  So, you’re in charge boss.   See?  I did it again.  Just having fun.

Take care and let me know if you have any questions or suggestions for this post in the comments below.

Here is a screencast of me running through most of these steps above.

Look Ma, I’m on YouTube!  You can subscribe to the supergloo YouTube channel if you want.  My Ma did.  I think.  At least, she told me she did.  (I call her Ma, not Mom, get over it.  I’m from Minnesota.)

Resources for this Spark Streaming Example Tutorial

  1. WSC -- Asynchronous WebSocket Connector --
  2. HttpClient --
  3. Check this site for “Spark Streaming Example Part 1”.  This post was loosely coupled Part 2.  It’s probably listed in the “Related Posts” section below.
  4. Spark tutorials landing page

Featured image credit: