Spark Kinesis Example – Moving Beyond Word Count

spark kinesis

If you are looking for Spark with Kinesis example, you are in the right place.  This Spark Kinesis tutorial intends to help you become better at integrating the two.  In this post, I’m going to provide a custom Spark Kinesis code example and a screencast of running it.  We’re going to cover running, configuring, sending sample data and AWS setup.  Finally, I’m going to list out some links for content which helped me become more comfortable in Spark Kinesis code and configuration.  If you have questions or suggestions, please let me know in the comment form below.

Spark Kinesis Tutorial Example Overview

In this example, we’re going to simulate sensor devices recording their temperature to a Kinesis stream.  This Kinesis stream will be read from our Spark Scala program every 2 seconds and notify us of two things:

  1. If a sensor’s temperature is above 100
  2. The top two sensors’ temps over the previous 20 seconds

So, nothing too complicated, but close enough to a possible real-world scenario of reading and analyzing stream(s) of data and acting on certain results.

In this tutorial, here’s how we’re going to cover things in the following order

  1. Check assumptions
  2. Present the code (Scala) and configuration example
  3. Go through AWS Kinesis setup and also Amazon Kinesis Data Generator
  4. Run in IntelliJ
  5. How to build and deploy outside of IntelliJ
  6. Rejoice, savor the moment, and thank the author of this tutorial with $150 PayPal donation

Sound good? I hope so, let’s begin.

Spark with Kinesis TUTORIAL Assumptions

I’m making the following assumptions about you when writing this tutorial.

  1. You have basic understanding of Amazon Kinesis
  2. You have experience writing and deploying Apache Spark programs
  3. You have an AWS account and understand using AWS costs money.  (AKA: moolah, cash money).  Not my money, your money.  It costs you or your company money to run in AWS.
  4. You have set your AWS access id and key appropriately for your environment.

If any of these assumptions are incorrect, you are probably going to struggle with this Spark Kinesis integration tutorial.

SPARK with KINESIS EXAMPLE SCALA CODE

Let’s start with the code…

object SparkKinesisExample extends Logging {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Kinesis Read Sensor Data")
    conf.setIfMissing("spark.master", "local[*]")

    // Typesafe config - load external config from src/main/resources/application.conf
    val kinesisConf = ConfigFactory.load.getConfig("kinesis")

    val appName = kinesisConf.getString("appName")
    val streamName = kinesisConf.getString("streamName")
    val endpointUrl = kinesisConf.getString("endpointUrl")

    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials != null,
      "No AWS credentials found. See http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
    val kinesisClient = new AmazonKinesisClient(credentials)
    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val numStreams = numShards
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()

    val ssc = new StreamingContext(conf, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
    }

    // Union all the streams (in case numStreams > 1)
    val unionStreams = ssc.union(kinesisStreams)

    val sensorData = unionStreams.map { byteArray =>
      val Array(sensorId, temp, status) = new String(byteArray).split(",")
      SensorData(sensorId, temp.toInt, status)
    }

    val hotSensors: DStream[SensorData] = sensorData.filter(_.currentTemp > 100)

    hotSensors.print(1) // remove me if you want... this is just to spit out timestamps

    println(s"Sensors with Temp > 100")
    hotSensors.map { sd =>
      println(s"Sensor id ${sd.id} has temp of ${sd.currentTemp}")
    }

    // Hotest sensors over the last 20 seconds
    hotSensors.window(Seconds(20)).foreachRDD { rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      val hotSensorDF = rdd.toDF()
      hotSensorDF.createOrReplaceTempView("hot_sensors")

      val hottestOverTime = spark.sql("select * from hot_sensors order by currentTemp desc limit 5")
      hottestOverTime.show(2)
    }

    // To make sure data is not deleted by the time we query it interactively
    ssc.remember(Minutes(1))

    ssc.start()
    ssc.awaitTermination()
  }
}
case class SensorData(id: String, currentTemp: Int, status: String)

(Note: The entire code is available from my Github repo.  See links in the Resources section below.)

The first 20 or so lines of the `main` function are just setting things up.  For example, we are reading the particulars of the Kinesis stream (streamName, endpointURL, etc.) from a config file.  You will need to change the config variables in the file `src/main/resources/application.conf` to values appropriate for your Kinesis setup.

Then, we start utilizing the code provided by Amazon.  You’ll see reference to `sparkstreamingkinesisasl` in the build.sbt file in the Github repo.

Next, we create a dynamic number of streams `kinesisStreams` based on the number of shards configured in our Kinesis stream.  Make sure to view the screencast below for further insight on this subject.  In the screencast, I go over setting up Kinesis and running this program.

We utilize the AWS SDK `KinesisUtils` object’s `createStream` method to register each stream.  We set the initial position to start in the stream as the very latest record.  In other words, don’t worry about anything that has already been added to the stream.

(Some of you might be wondering at this point… this doesn’t look like Spark Structured Streaming?  And then you might be thinking, we should be using Structured Streaming for anything new right?  I’d say you’re right.  But, at the time of this writing, Structured Streaming for Kinesis is not available in Spark outside of DataBricks.  Relevant links on this subject below in the Resouces section.)

After we create the streams, we perform a `union` see can analyze each stream as one.  After the union, we convert our stream from an `Array[Byte]` to `DStream[SensorData]`.  If you set up your Kinesis stream with 1 shard as I did, there will only be one stream.

From here, we perform our pseudo business logic.  We are looking for sensors which might be running hot (e.g. over 100).  If this was a real application, our code might trigger an event based on this temperature.

This example also gives us the opportunity to perform some spark streaming windowing.  Windowing allows us to analyze and consider data that previously arrived in the stream and not only data present compute time (the current interation of micro-batch).  For example, if we wanted to the determine hottest two sensors over the past 20 seconds and not just sensor data in a particular batch.  To this, consider the code starting at `hotSensors.window` block.  Within the block, notice the import for implicits.  This allows us to convert our RDDs of `SensorData` to DataFrames later in the block.

That’s it.  I think the code speaks for itself, but as I mentioned above, let me know if you have any questions or suggestions for improvement.

AWS KINESIS SETUP

Setting up Kinesis requires an AWS account.  In this example, it’s nothing fancy.  I created in the us-west-2 region because that’s where the Kinesis Generator is, but I don’t think it really matters.  The Kinesis stream is just 1 shard (aka partition) with default settings on others.  I did need to modify security policies to work, which I show in the screencast.

Amazon Kinesis Generator SETUP

I like being lazy sometimes.  So, when I found that Amazon provides a service to send fake data to Kinesis, I jumped all over it.  I mean, sure, I could write my own Java, Python or Scala program to do it, but using this Kinesis Generator was easier and faster.  I like how it has integrated Faker in order to provide dynamic data.

RUN Spark Kinesis Scala code in IntelliJ

I’m going to run this in IntelliJ because it simulates how I work.  I like to develop and test in IntelliJ first before building and deploying a jar.  So, check out the screencast for some running-in-intellij-fun. In order to run in IntelliJ, I’ve customized my `build.sbt` file and updated the Run/Debug window in Intellij to `intellijRunner` classpath.  Also, as previously mentioned, you need to update the configuration settings in the `src/main/resources/application.conf` file.

Finally, as previously mentioned I assume you have your AWS security (id and key) setup with confirmed working.  I don’t show how I set mine up in the screencast, but I include a link for more information in Resources below.

BUILD and Deploy Spark Kinesis Example

The entire project is configured with SBT assembly plugin.  See`project/assembly.sbt`  To build a deployable jar, run the `assembly` sbt task.  Nothing fancy in the deploy then…just deploy with `spark-submit` and reference the `com.supergloo.SparkKinesisExample` class.

Cash Money Time

I like money.  This is the part where you send me $150.   I’ll wait here until you send it.  Thanks in advance.

Screencast

While I’m waiting for you to send me money, check out the screencast

Spark Kinesis Example

I’m still waiting for you to send the cash $$$ by the way.

Resources

Featured image credit: https://flic.kr/p/6vfaHV

Spark Streaming with Kafka Example

Spark Streaming with Kafka

Spark Streaming with Kafka is becoming so common in data pipelines these days, it’s difficult to find one without the other.   This tutorial will present an example of streaming Kafka from Spark.  In this example, we’ll be feeding weather data into Kafka and then processing this data from Spark Streaming in Scala.  As the data is processed, we will save the results to Cassandra.

Before we dive into the example, let’s look at little background on Spark Kafka integration because there are multiple ways to integrate and it may be confusing.

Kafka and Spark Background

There are the two ways to use Spark Streaming with Kafka: Receiver and Direct.  The receiver option is similar to other unreliable sources such text files and socket.   Similar to these receivers, data received from Kafka is stored in Spark executors and processed by jobs launched by Spark Streaming context.

This approach can lose data under failures, so it’s recommended to enable Write Ahead Logs (WAL) in Spark Streaming (introduced in Spark 1.2).  WAL synchronously saves all the received Kafka data into logs on a distributed file system (e.g HDFS, S3, DSEFS), so that all data can be recovered on possible failure.  Another way of saying this is duplication.  We duplicate the data in order to be resilient.  If you do not like the sound of this then, please keep reading.

In our example, we want zero-data loss, but not the overhead of write ahead logs.  We are going to go with an approach referred to as “Direct”.

Direct Spark Streaming from Kafka was introduced in Spark 1.3.

This approach periodically queries Kafka for the latest offsets in each topic + partition and subsequently defines the offset ranges to process in each batch.

This approach has the following advantages:

  • Parallelism: No need to create multiple input Kafka streams and union them as was often done with the Receiver approach.
  • Efficiency: No need for Write Ahead Log (WAL) which was caused processing overhead and duplication.  As long as you have sufficient retention time windows in Kafka, the messages from Spark Streaming can be recovered from Kafka.
  • Exactly-once semantics: We use Kafka API and not Zookeeper.  Offsets are tracked within Spark Streaming checkpoints (if enabled).

Even with these three advantages, you might be wondering if there are any disadvantages with the Kafka direct approach?  Well, yes, there is one.

Because the direct approach does not update offsets in Zookeeper, Kafka monitoring tools based on Zookeeper will not show progress.  As a possible workaround, you can access the offsets processed by this approach in each batch and update Zookeeper yourself.

In sum, I believe it’s important to note you may see examples of the Receiver based approach in code examples and documentation, but it is not the recommended approach for Kafka Spark integration.

Ok, with this background in mind, let’s dive into the example.

Spark Streaming with Kafka Example

With this history of Kafka Spark Streaming integration in mind, it should be no surprise we are going to go with the direct integration approach.

All the following code is available for download from Github listed in the Resources section below.  We’re going to go fast through these steps.  Also, there is a screencast demo of this tutorial also listed in the Resources section below.

Step 1 Spark Streaming with Kafka Build Setup

Update your build file to include the required Spark Kafka library.  In the provided example, we’re going to use SBT and add the following line:

"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2"

As you’ll notice, the build.sbt file also includes other libraries and configuration related to the `assembly` plugin.  We use the `assembly` plugin to help build fat jars for deployment.

Step 2 Spark Streaming with Kafka Scala Code

Next, we’re going to write the Scala code.  The entire Scala code is found in `com.supergloo.WeatherDataStream`.  We won’t go over line by line here.  Instead, let’s focus on the highlights.

val topics: Set[String] = kafkaTopicRaw.split(",").map(_.trim).toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBroker)

val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]
(ssc, kafkaParams, topics)

We pass our StreamingContext, Kafka config map and the topics to query to the `createDirectStream` function.  The type hints are the type of Kafka message key (String), the type of Kafka message value (String) and key and value message decoders (StringDecoders)

The `createDirectStream` function returns a DStream of (Kafka message key, message value)

Step 3 Spark Streaming with Kafka Build Fat Jar

In SBT, build the fat jar with `sbt assembly` or just `assembly` if in the SBT REPL.

Step 4 Spark Streaming with Kafka Download and Start Kafka

Next, let’s download and install bare-bones Kafka to use for this example.  We can follow the quick step guide found here https://kafka.apache.org/quickstart

You’ll need to update your path appropriately for the following commands
depending on where Kafka; i.e. where is Kafka `bin` dir.  For example, my Kafka `bin` dir is `/Users/toddmcgrath/Development/kafka_2.11-0.10.1.1\bin`

a) Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.properties`

b) Start Kafka `bin/kafka-server-start.sh config/server.properties`

c) Create Kafka topic `bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic raw_weather`

Again, make note of the path for Kafka `bin` as it is needed in later steps.

Step 5 Cassandra Setup

Make sure you have a Cassandra instance running and the schema has been created.  The schema is included in the source of this tutorial in the `cql` directory.   To create, just start up `cqlsh` and source the `create-timeseries.cql` file.  For example

tm@tmcgrath-rmbp15 spark-1.6.3-bin-hadoop2.6 $ cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.9.1346 | DSE 5.0.3 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh> source 'create-timeseries.cql';

Of course, you’ll need to adjust for the location of your `create-timeseries.cql` file.

Step 6 Spark Streaming with Kafka Deploy

Make sure Spark master is running and has available worker resources.  Then, deploy with `spark-submit`.  I assume you are familiar with this already, but here’s an example

~/Development/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --class "com.supergloo.WeatherDataStream" --master spark://todd-mcgraths-macbook-pro.local:7077 target/scala-2.10/kafka-streaming-assembly-1.0.jar

Step 7 Spark Streaming with Kafka Send Data, Watch Processing, Be Merry

Final step, let’s see this baby in action.  Let’s load some data into the appropriate Kafka topic.  There is a CSV file available in the project’s `data/load/` directory.

~/Development/kafka_2.11-0.10.1.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic raw_weather < ny-2008.csv

So, this assumes I’m running the `kafka-console-producer.sh` script from the `data/load/` directory because there is in explicit path location for the `ny-2008.csv` file.

If you go back to where you started the driver, you should see the data flowing through.  Also, if check Cassandra, you see data saved.  If you don’t believe me, check out the screencast below where I demo most of these steps.

Conclusion

This Spark Kafka tutorial provided an example of streaming data from Kafka through Spark and saving to Cassandra.  I hope you found it helpful.  Let me know if you have any questions or suggestions in comments below.

 

References and Resources

Much thanks to the Killrweather application.  Inspiration and portions of the app’s source code was used for this tutorial.  https://github.com/killrweather/killrweather

All the source code, SBT build file, the whole shebang can be found here  Use the `kafka-streaming` directory.  https://github.com/tmcgrath/spark-scala

Latest Spark Kafka documentation starting point

I also recorded a screencast of this tutorial seen here

Spark Streaming with Kafka Example

 

Look Ma, I’m on YouTube.

Featured image credit https://flic.kr/p/7867Jz

Spark Streaming Testing with Scala Example

Spark Streaming Testing

Spark Streaming Testing

How do you create and automate tests of Spark Streaming applications?  In this post, we’ll show an example of one way in Scala.  This post is heavy on code examples and has the added bonus of using a code coverage plugin.

Are the tests in this tutorial examples unit tests?  Or, are they integration tests?  Functional tests?   I don’t know, you tell me in the comments below if you have an opinion.  If I had to choose, I’d say unit tests because we are stubbing the streaming provider.

Pre-requisites

As I’m sure you can guess, you will need some Spark Streaming Scala code to test.  We’re going to use our Spark Streaming example from Slack code in this post.  So, check that out first if you need some streaming scala code to use.  It’s not required to use that code though.  You should be able to get the concepts presented and apply to your own code if desired.  All the testing code and Spark streaming example code is available to pull from Github anyhow.

We’re going to use `sbt` to build and run tests and create coverage reports.  So, if you are not using `sbt` please translate to your build tool accordingly.

Overview

In order to write automated tests for Spark Streaming, we’re going to use a third party library called scalatest.  Also, we’re going to add an sbt plugin called “sbt-coverage”.  Then, with these tools in hand, we can write some Scala test code and create test coverage reports.

Steps

  1. Pull Spark Streaming code example from github
  2. Describe Updates to build.sbt
  3. Create project/plugins.sbt
  4. Write Scala code
  5. Execute tests and coverage reports

Pull Spark Streaming Code Example from Github

If you don’t want to copy-and-paste code, you can pull it from github.  Just pull the spark-course repo from https://github.com/tmcgrath/spark-course and the project we are working from is in the spark-streaming-tests directory.

Updates to Previous build.sbt

build.sbt should be updated to include a new command alias as well as the scalatest 3rd party lib as seen below:

  scalaVersion := "2.11.8"
  
  +addCommandAlias("sanity", ";clean ;compile ;coverage ;test; coverageReport")
  
  resolvers += "jitpack" at "https://jitpack.io"
 @@ -19,5 +21,6 @@ libraryDependencies ++= Seq(
  // comment above line and uncomment the following to run in sbt
  // "org.apache.spark" %% "spark-streaming" % "1.6.1",
    "org.scalaj" %% "scalaj-http" % "2.3.0",
 -  "org.jfarcand" % "wcs" % "1.5" 
 +  "org.jfarcand" % "wcs" % "1.5",
 +  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )

Notice how we add “test” to the end of the libraryDependencies sequence to indicate the library is only needed for tests.

Create project/plugins.sbt

Add a new line for the sbt-coverage plugin as seen here:

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

Write Scala Tests

Actually, before we write the actual tests, we’re going to update our previous SlackStreamingApp’s `main` method to facilitate automated tests.  I know, I know, if we would have written SlackStreamingApp with TDD, then we wouldn’t have to do this, right?  😉

Anyhow, it’s not a huge change.

 object SlackStreamingApp {
 - 
 +
    def main(args: Array[String]) {
      val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
      val ssc = new StreamingContext(conf, Seconds(5))
      val stream = ssc.receiverStream(new SlackReceiver(args(1)))
      stream.print()
 -    if (args.length > 2) {
 -      stream.saveAsTextFiles(args(2))
 -    }
 +
 +    processStream(args, stream)
 +
      ssc.start()
      ssc.awaitTermination()
    }
 - 
 +
 +  def processStream(args: Array[String], stream: DStream[String]): Unit = {
 +    args match {
 +      case Array(_, _, path, _*) => stream.saveAsTextFiles(args(2))
 +      case _ => return
 +    }
 +
 +
 +  }
 +

As you can hopefully see, we just needed to extract the code looking for a command-line arg into a new function called `processStream`.  Also, we need to add one more line to the imports at the top

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

Next, we write the testing code.  To start, we need to create new directories to store the test code.  Create src/test/scala/com/supergloo directories.  Next, we add test code to this directory by creating the following Scala file: src/test/scala/com/supergloo/SlackStreamingTest.scala

package com.supergloo

import com.supergloo.SlackStreamingApp._
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ClockWrapper, Seconds, StreamingContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.io.Path
import scala.util.Try

class SlackStreamingTest extends FlatSpec with Matchers with Eventually with BeforeAndAfter {

  private val master = "local[1]"
  private val appName = "spark-streaming-test"
  private val filePath: String = "target/testfile"

  private var ssc: StreamingContext = _

  private val batchDuration = Seconds(1)

  var clock: ClockWrapper = _

  before {
    val conf = new SparkConf()
      .setMaster(master).setAppName(appName)
      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")

    ssc = new StreamingContext(conf, batchDuration)
    clock = new ClockWrapper(ssc)
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
    Try(Path(filePath + "-1000").deleteRecursively)
  }

  "Slack Streaming App " should " store streams into a file" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    dstream.print()
    processStream(Array("", "", filePath), dstream)


    ssc.start()

    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(1000)

    eventually(timeout(2 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (2)
      wFile.collect().foreach(println)
    }

  }

  "Slack Streaming App " should " store empty streams if no data received" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    dstream.print()
    processStream(Array("", "", filePath), dstream)


    ssc.start()

    clock.advance(1000)

    eventually(timeout(1 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (0)
      wFile.collect().foreach(println)
    }

  }

  "Slack Streaming App " should " not store streams if argument is not passed" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    dstream.print()
    processStream(Array("", ""), dstream)

    val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")

    ssc.start()

    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(2000)

    eventually(timeout(3 seconds)){
      a [InvalidInputException] should be thrownBy {
        wFile.count() should be (0)
      }
    }
  }
}

Next, we need to create addition directories and add ClockWrapper.scala to src/test/scala/org/apache/spark/streaming/.  More on this class later.

package org.apache.spark.streaming

import org.apache.spark.util.ManualClock

/**
  * This class is defined in this package as the ManualClock is
  * private in the "spark" package
  */
class ClockWrapper(ssc: StreamingContext) {

  def getTimeMillis(): Long = manualClock().getTimeMillis()

  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)

  def advance(timeToAdd: Long) = manualClock().advance(timeToAdd)

  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)

  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }
}

(By the way, ClockWrapper is taken from an approach I saw on Spark unit testing.  See “Additional Resouces” section below for link.)

Ok, we’re ready to execute now.

Execute Scala tests and coverage reports

In the spark-streaming-tests directory, we can now issue `sbt sanity` from command-line.  You should see all three tests pass:

[info] SlackStreamingTest:
[info] Slack Streaming App 
[info] - should store streams into a file
[info] Slack Streaming App 
[info] - should store empty streams if no data received
[info] Slack Streaming App 
[info] - should not store streams if argument is not passed
[info] Run completed in 4 seconds, 436 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

 

To review coverage reports, simply open target/scala-2.11/scoverage-report/index.html in a browser.

 

Conclusion

Hopefully, this Spark Streaming unit test example helps start your Spark Streaming testing approach.  We covered a code example, how to run and viewing the test coverage results.  If you have any questions or comments, let me know.  Also, subscribe to the Supergloo YouTube channel for an upcoming screencast from this post.

 

Additional Resources

 

Featured image credit https://flic.kr/p/dgSbYM

Spark Streaming Example – How to Stream from Slack

Spark Streaming Example

Let’s write a Spark Streaming example in Scala, which streams from Slack.  This post will show how to write, configure and execute the code, first.  Then, the source code will be examined in detail.  If you don’t have a Slack team,  you can set one up for free.   We’ll cover that too.

Let’s start with a big picture overview of the steps we will take.

Spark Streaming Example Overview

  1. Setup development environment for Scala and SBT
  2. Write code
  3. Configure Slack for stream access
  4. Start Apache Spark in Standalone mode
  5. Run the Spark Streaming app
  6. Revisit code to describe the fundamental concepts.

So, our initial target is running code.  Then, we’ll examine the source code in detail.

1. Setup Spark Streaming Development Environment for Scala and SBT

Let’s follow SBT directory conventions.  Create a new directory to start.  I’m going to call mine spark-streaming-example.  The following are commands to create the directory, but you can use a window manager if you wish as well.  If this directory structure doesn’t make sense to you or you haven’t compiled Scala code with SBT before, this post is probably isn’t the best for you.  Sorry, I had to write that.  I don’t mean it as a personal shot against you.  I’m sure you are wonderful and interesting person.  This post isn’t super advanced, but I just want to be upfront and honest with you.  It’s better for both of us in the long run.

Anyhow, where were we?  you Scala-compiling-maestro… oh yeah, directory structure.

mkdir spark-streaming-example
cd spark-streaming-example
mkdir src
mkdir src/main
mkdir src/main/scala
mkdir src/main/scala/com
mkdir src/main/scala/com/supergloo

Next, create a build.sbt file in the root of your dev directory.  Ready for a surprise?  Surprise!  My build.sbt will be in the spark-streaming-example/ directory.

The build.sbt I’m using is:

name := "spark-streaming-example"

version := "1.0"

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq("org.apache.spark" % "spark-streaming_2.11" % "1.6.1",

  "org.scalaj" %% "scalaj-http" % "2.2.1",

  "org.jfarcand" % "wcs" % "1.5")

You see what’s happening, right?  I said, RIGHT!  Hope you didn’t jump out of your chair there.  I wasn’t yelling, but just want to make sure you’re still with me.

In a nutshell: I’m going to use Scala 2.11.8 and grab a few dependencies such as Spark Streaming 2.11, Scalaj-http and WCS.  There are links to these are more descriptions later on this post.  In short, we need `wcs` to make a websocket connection to slack and `scalaj-http` is for a http client.  Remember, our first goal is working code and then we’ll come back to more detailed descriptions.  Stay with me.

2. Write Scala Code

I called this step “write Scala code”, but the more I think about it, this isn’t entirely accurate.  In fact, I’m going to write the code and you can copy-and-paste.  Lucky you.  See how much I care about you.

You need two files:

In the src/main/scala/com/supergloo directory, a file called SlackReceiver.scala with following contents:

package com.supergloo

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{TextListener, WebSocket}

import scala.util.parsing.json.JSON
import scalaj.http.Http

/**
* Spark Streaming Example Slack Receiver from Slack
*/
class SlackReceiver(token: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging {

  private val slackUrl = "https://slack.com/api/rtm.start"

  @transient
  private var thread: Thread = _

  override def onStart(): Unit = {
     thread = new Thread(this)
     thread.start()
  }

  override def onStop(): Unit = {
     thread.interrupt()
  }

  override def run(): Unit = {
     receive()
   }

  private def receive(): Unit = {
     val webSocket = WebSocket().open(webSocketUrl())
     webSocket.listener(new TextListener {
       override def onMessage(message: String) {
         store(message)
       }
     })
  }

  private def webSocketUrl(): String = {
    val response = Http(slackUrl).param("token", token).asString.body
    JSON.parseFull(response).get.asInstanceOf[Map[String, Any]].get("url").get.toString
  }

}

And you’ll need another file in the same directory called SlackStreamingApp.scala with following contents:

package com.supergloo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming Example App
  */
object SlackStreamingApp {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
    val ssc = new StreamingContext(conf, Seconds(5))
    val stream = ssc.receiverStream(new SlackReceiver(args(1)))
    stream.print()
    if (args.length > 2) {
      stream.saveAsTextFiles(args(2))
    }
    ssc.start()
    ssc.awaitTermination()
  }

}

Ok, at this point, “we” are finished with code.  And by “we”, I mean you.

I think it would be a good idea to make sure SBT is happy.

So, try `sbt compile`.  For my environment, I’m going to run this from command-line in the spark-streaming-example folder.  In the Resources section of this post, there is a link to YouTube screencast of me running this.  Maybe that could be helpful for you too.  I don’t know.  You tell me.  Actually, don’t tell me if it worked.  Let me know in the page comments what didn’t work.  It works on my machine.  Ever hear that one before?

3. Configure Slack for API access

You need an OAuth token for API access to Slack and to run this Spark Streaming example.  Luckily for us, Slack provides test tokens that do not require going through all the OAuth redirects.  That token will be perfect for this example.

To get a token, go to https://api.slack.com/docs/oauth-test-tokens to list your Slack teams you have joined.  Here’s what my looks like (without the blue arrow):

 

Spark Streaming Example From Slack

I greyed some out to protect the innocent.  The point is, you should see a green box for “Create Token”.  Look again at the screenshot above and where the blue arrow points.  You should have this option.  And if you don’t, there is another option for you.

It’s easy to setup your own, free, Slack team site.  And when you do, by default, the new team setup will have API access enabled.  So, create a new team if you don’t have Create Token button from any of your existing teams.  Start here https://slack.com/create.

Once you have a new team set up or whenever you have a “Create Token” button available on the previously mentioned OAuth test token page, click it to generate a token.  Save that token, because we, and by “we”, I mean you, will need it soon.  But first, “we” need to start Spark so we can run this example.  We are in this together, you and me.  Here we go.

4. Start Apache Spark in Standalone mode

I presume you have an Apache Spark environment to use.  If you don’t, you might be a bit ahead of yourself with a Spark Streaming tutorial like this one.   If you are ahead of yourself, I like your style.  Nothing like jumping into the deep end first.  But, this pool might be empty and you could get hurt.  I don’t mean hurt literally.  It’s more mental than physical.

There are plenty of resources on this site to help get setup running a Spark Cluster Standalone.  As I said, you need that.  But, let’s continue if you want.

For this Spark Streaming tutorial, I’m going to go with the most simple Spark setup possible.  That means we’re going to run Spark in Standalone mode.   You see, I get to make the decisions around here.  I’m a big shot blogger.  Ok, ok, I know, not really a big shot.  But, a guy can dream.  And I actually do not dream of becoming a big shot blogger.  I dream of taking my kids on adventures around the world.  I dream of watching movies.  I sometimes dream of watching movies while my kids are someplace on the other side of the world.

Anyhow, if you are a big shot with your own Spark Cluster running, you can run this example code on that too.  Your call.  Evidently, you are the boss around here.

Ok, boss, start a Spark Standalone Master from command-line:

~/Development/spark-1.5.1-bin-hadoop2.4 $ sbin/start-master.sh

You should call start-master.sh or your Windows equivalent from the location appropriate for your environment.  For me, that’s the spark-1.5.1-bin-hadoop2.4 directory.  You knew that by looking at the example though didn’t you?

Next start a worker:

~/Development/spark-1.5.1-bin-hadoop2.4 $ sbin/start-slave.sh spark://todd-mcgraths-macbook-pro.local:7077

You do not want to add `spark://todd-mcgraths-macbook-pro.local:7077` when starting up your Spark worker.  That’s mine.  Leave it blank or set it to something appropriate for your machine.  todd-mcgraths-macbook-pro.local is my laptop, not yours.

Ok, you should be able to tell if everything is ok with Spark startup.  If not, you are definitely in trouble with this tutorial.  You probably need to slow down a bit there speedy.  But, you are the boss.

You may need to open another command window to run the next step.

5. Run the Spark Streaming app

Scala and Spark fan, here we go.  Listen, I know sbt can be a bear sometimes.  It takes some time for it to become a `simple build tool`.  But, I’m not going to go over that here.  Ok?    

1) Start SBT in the directory where build.sbt is located.

~/Development/spark-streaming-example $ sbt

2) In your sbt console:

run local[5] <your-oauth-token> output

What you should see:

After the SlackStreamingApp starts, you will see JSON retrieved from Slack. Holy moly, let me repeat: JSON from Slack.  We did it!   Dora might yell Lo Hicimos! at this point.  Or maybe Boots would say that.  I can’t remember and don’t care.  You don’t either.  

Depending on your log settings, things might scroll through your console pretty fast.

You can verify by adding messages to the Slack team from OAuth token access.  You’ll also be streaming messages for Slack events such as joining and leaving channels, bots, etc.  

Wow, we actually did it.  You and me, kid.  I had confidence in you the whole time.  I believed in you when no one else did.  Well, honestly, not really.  This is the Internet after all.  But, every once and while, I’m pleasantly surprised.  I still think you’re pretty neat.

6. Revisit Spark Streaming Code – Describe Key Concepts

Ok, let’s revisit the code and start with external dependencies.  As briefly noted in the build.sbt section, we connected to Slack over a WebSocket.  To make a WebSocket connection and parse the incoming JSON data, we used three things: an external WebSocket Scala library (wcs), an external HttpClient library(scalaj-http) and the native JSON parser in Scala.  Again, links to the external libraries in use are located in Resources section below.  We see all three of these in action in two SlackReceiver functions.

  private def receive(): Unit = {
    val webSocket = WebSocket().open(webSocketUrl())
    webSocket.listener(new TextListener {
      override def onMessage(message: String) {
        store(message)
      }
    })
  }

  private def webSocketUrl(): String = {
    val response = Http(slackUrl).param("token", token).asString.body
    JSON.parseFull(response).get.asInstanceOf[Map[String, Any]]
                                      .get("url").get.toString
  }

The webSocketUrl function is using the OAuth token we sent in the first argument to `run`.  More on that soon. Note the parsing the incoming response data as JSON in  JSON.parseFull.  We sent the OAuth token from SlackStreamingApp when we initialized the SlackReceiver:

val stream = ssc.receiverStream(new SlackReceiver(args(1)))

Also, we see in the `webSocketUrl` function we are expecting JSON and the schema key/value pairs of Map[String, Any].

Ok, that covers the external libraries in use.  Let’s keep going.

Recall from earlier Spark Streaming tutorials on this site (links in Resources below), Spark Streaming can be thought of as a micro-batch system.  Instead of waiting and processing streaming data one record at a time, Spark Streaming discretizes the streaming data into micro-batches. Or, in other words, Spark Streaming’s Receivers accept data in parallel and buffer it in the memory of Spark’s workers nodes.

Micro-batches poll stream sources at specified timeframes.  What is the poll frequency for this example app?  It’s every 5 seconds as declared in the SlackStreaming app code:

    val ssc = new StreamingContext(conf, Seconds(5))

And what about StreamingContext?  The StreamingContext is a type of context which is specific to Spark Streaming.  Surprised?  Of course, you are not.  You could just tell by the name StreamingContext, right?  I said RIGHT!?  Did you jump out of your chair that time?  I hope so.  You need a StreamingContext when you are building streaming apps.

Back to the SlackReceiver class now.  Extending `Receiver` is what we do when building custom receiver for Spark Streaming.  And if you haven’t guessed by now, let me tell you, we built a custom receiver for Slack.  Well, would you look at us.  We built a custom receiver.  Somebody get us a trophy.  Or a ribbon.  Or a ribbon trophy.

The class declaration:

class SlackReceiver(token: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable {

There are few things to note about this declaration.  First, `Runnable` trait usage is for convenience to run this sample.  I thought it would make things easier to run this from SBT.

We’re setting StorageLevel to memory only

StorageLevel.MEMORY_ONLY

This is the default.  Nothing fancy here.  This stores RDDs as deserialized objects in the JVM.  If storage needs grow beyond what’s available, it will not spill to disk and will need to be recomputed each time something is needed and is not in memory.  Again, we don’t need anything more in this example.  Check other examples such as MEMORY_AND_DISK, MEMORY_ONLY_SER, DISK_ONLY and others if you want more info on Storage Levels.  This is a Spark Streaming post, dang it.

 

Finally, when extending Receiver we override three functions.  (Or, you might see some examples of calling `run` from within `onStart`, but we’re not going to do that here.  Why?  Because I’m the big shot boss and a “visionary”.  On second thought, I’m a big shot visionary or BSV as they say in the biznass.  <– That’s business spelled wrong for my international peeps.  <– That’s people spelled wrong for my non-slang speaking audience…. dang it, now I’m side tracked.)

Where were we?!   Don’t let me get off track, partner.  We need to override two functions because Receiver is an abstract class:

...
  override def onStart(): Unit = {
    thread = new Thread(this)
    thread.start()
  }

  override def onStop(): Unit = {
    thread.interrupt()
  }
...

`onStart` is spawning a new thread to receive the stream source.  This triggers a call to our overridden Thread `run` function which calls the previously described `receive` function.

`onStop` is there to ensure any spawned threads are stopped when the receiver is stopped.

(Not shown, but Exceptions while receiving can be handled either by restarting the receiver with `restart` or stopped completely by `stop`   See Receiver docs for more information.)

So, that’s the code.  But, let’s also consider how this example was invoked.  One important detail is the use of “5”:

run local[5] <your-oauth-token> output

Why 5?  If we use “local” or “local[1]” as the master URL, only one thread will be used for running tasks.  When using an input DStream based on a Streaming receiver, a single thread will be used to run the receiver which leaves no thread for processing the received data. So, always use “local[n]” as the master URL, where n > number of receivers to run.

When running on a Spark Cluster outside of Standalone mode, the number of cores allocated to the Spark Streaming application must be more than the number of receivers.

Finally, we’ll close with a fairly insignificant detail.  The last argument is “output” which you can see from the SlackStreamingApp is used here:

    if (args.length > 2) {
      stream.saveAsTextFiles(args(2))
    }

This second argument is optional and specifies if the stream data should be saved to disk and to which directory.

 

Conclusion

So, all joking aside, I hope this Spark Streaming example helps you.  I like to help nice people who try.  I hope you are one of those types of people.

You might enjoy signing up for the mailing list, following Twitter and subscribing on YouTube.  You have options.  I think the links to these sites are on the bottom of each page.  To be honest, I’m not entirely sure I want you to follow or subscribe, but I don’t think I can actually prevent you from doing so.  So, you’re in charge boss.   See?  I did it again.  Just having fun.

Take care and let me know if you have any questions or suggestions for this post in the comments below.

Here is a screencast of me running through most of these steps above.

Spark Streaming Example – How to Stream from Slack

Look Ma, I’m on YouTube!  You can subscribe to the supergloo YouTube channel if you want.  My Ma did.  I think.  At least, she told me she did.  (I call her Ma, not Mom, get over it.  I’m from Minnesota.)

 

Resources for this Spark Streaming Example Tutorial

  1. WSC – Asynchronous WebSocket Connector – https://github.com/jfarcand/WCS
  2. HttpClient – https://github.com/scalaj/scalaj-http
  3. Check this site for “Spark Streaming Example Part 1”.  This post was loosely coupled Part 2.  It’s proabably listed in the “Related Posts” section below.

 

Featured image credit: https://flic.kr/p/oMquYF

How-To Apache Spark Streaming with Scala Part 1

Spark Streaming with Scala

Let’s start Apache Spark Streaming by building up our confidence with small steps.  These small steps will create the forward momentum needed when learning new skills.  The quickest way to gain confidence and momentum in learning new software development skills is executing code that performs without error.

In this post, we’re going to setup and run Apache Spark Streaming with Scala code.  Then, we will be confident taking the next step to Part 2 of learning Apache Spark Streaming.

Before we begin though, I assume you already have a high-level understanding of Apache Spark Streaming at this point, but if not, here’s a quick two-minute read on Spark Streaming (opens in new window) from the Learning Apache Spark Summary book.

Overview

Spark comes with some great examples and convenient scripts for running Streaming code.  Let’s make sure you can run these examples.  In case it helps, I made a screencast of me running through these steps.  Link to the screencast below.

Running the NetworkWordCount example out-of-the-box

  1. Open a shell or command prompt on Windows and go to your Spark root directory.
  2. Start Spark Master:  sbin/start-master.sh  **
  3. Start a Worker: sbin/start-slave.sh spark://todd-mcgraths-macbook-pro.local:7077
  4. Start netcat on port 9999: nc -lk 9999  (*** Windows users: https://nmap.org/ncat/  Let me know in page comments if this works well on Windows)
  5. Run network word count using handy run-example script: bin/run-example streaming.NetworkWordCount localhost 9999

** Windows users, please adjust accordingly; i.e. sbin/start-master.cmd instead of sbin/start-master.sh

Here’s a screencast of me running these steps

Apache Spark Streaming with Scala Part 1

Making and Running Our Own NetworkWordCount

Ok, that’s good.  We’ve succeeded in running the Scala Spark Streaming NetworkWordCount example, but what about running our own Spark Streaming program in Scala?  Let’s take another step towards that goal.  In this step, we’re going to setup our own Scala/SBT project, compile, package and deploy a modified NetworkWordCount.  Again, I made a screencast of the following steps with a link to the screencast below.

  1. Choose or create a new directory for a new Spark Streaming Scala project.
  2. Make dirs to make things convenient for SBT: src/main/scala
  3. Create Scala object code file called NetworkWordCount.scala in src/main/scala directory
  4. Copy-and-paste NetworkWordCount.scala code from Spark examples directory to your version created in previous step
  5. Remove or comment out package and StreamingExamples references
  6. Change AppName to “MyNetworkWordCount”
  7. Create a build.sbt file (source code below)
  8. sbt compile to smoke test
  9. Deploy: ~/Development/spark-1.5.1-bin-hadoop2.4/bin/spark-submit –class “NetworkWordCount” –master spark://todd-mcgraths-macbook-pro.local:7077 target/scala-2.11/streaming-example_2.11-1.0.jar localhost 9999
  10. Start netcat on port 9999: nc -lk 9999  and start typing
  11. Check things out in the Spark UI

Apache Spark Streaming with Scala Part 2

build.sbt source

name := "streaming-example"

version := "1.0"

scalaVersion := "2.11.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.5.1",
    "org.apache.spark" %% "spark-streaming" % "1.5.1"
)

If you watched the video, notice this has been corrected to “streaming-example” and not “steaming-example” 🙂

Spark Streaming With Scala Part 1 Conclusion

At this point, I hope you were successful in running both Spark Streaming examples in Scala.  If so, you should be more confident when we continue to explore Spark Streaming in Part 2.   If you have any questions, feel free to add comments below.

 

 

Featured image credit https://flic.kr/p/bVJF32