Spark Streaming Testing with Scala Example

Spark Streaming Testing

Spark Streaming Testing

How do you create and automate tests of Spark Streaming applications?  In this tutorial, we’ll show an example of one way in Scala.  This post is heavy on code examples and has the added bonus of using a code coverage plugin.

Are the tests in this tutorial examples unit tests?  Or, are the examples integration tests?  Functional tests?   I don’t know, you tell me in the comments below if you have an opinion.  If I had to choose, I’d say unit tests because we are stubbing the streaming provider.

Pre-requisites

As I’m sure you can guess, you will need some Spark Streaming Scala code to test.  We’re going to use our Spark Streaming example from Slack code in this post.  So, check that out first if you need some streaming Scala code to use.  It’s not required to use that code though.  You should be able to get the concepts presented and apply to your own code if desired.  All the testing code and Spark streaming example code is available to pull from Github anyhow.

We’re going to use `sbt` to build and run tests and create coverage reports.  So, if you are not using `sbt` please translate to your build tool accordingly.

Spark Streaming Testing Overview

In order to write automated tests for Spark Streaming, we’re going to use a third party library called scalatest.  Also, we’re going to add an sbt plugin called “sbt-coverage”.  Then, with these tools in hand, we can write some Scala test code and create test coverage reports.

Steps

  1. Pull Spark Streaming code example from github
  2. Describe Updates to build.sbt
  3. Create project/plugins.sbt
  4. Write Scala code
  5. Execute tests and coverage reports

Pull Spark Streaming Code Example from Github

If you don’t want to copy-and-paste code, you can pull it from github.  Just pull the spark-course repo from https://github.com/tmcgrath/spark-course and the project we are working from is in the spark-streaming-tests directory.

Updates to the Previous build.sbt

build.sbt should be updated to include a new command alias as well as the scalatest 3rd party lib as seen below:

  scalaVersion := "2.11.8"
  
  +addCommandAlias("sanity", ";clean ;compile ;coverage ;test; coverageReport")
  
  resolvers += "jitpack" at "https://jitpack.io"
 @@ -19,5 +21,6 @@ libraryDependencies ++= Seq(
  // comment above line and uncomment the following to run in sbt
  // "org.apache.spark" %% "spark-streaming" % "1.6.1",
    "org.scalaj" %% "scalaj-http" % "2.3.0",
 -  "org.jfarcand" % "wcs" % "1.5" 
 +  "org.jfarcand" % "wcs" % "1.5",
 +  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )

Notice how we add “test” to the end of the libraryDependencies sequence to indicate the library is only needed for tests.

Create project/plugins.sbt

Add a new line for the sbt-coverage plugin as seen here:

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

Write Scala Tests

Actually, before we write the actual tests, we’re going to update our previous SlackStreamingApp’s `main` method to facilitate automated tests.  I know, I know, if we would have written SlackStreamingApp with TDD, then we wouldn’t have to do this, right?  😉

Anyhow, it’s not a huge change.

 object SlackStreamingApp {
 - 
 +
    def main(args: Array[String]) {
      val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
      val ssc = new StreamingContext(conf, Seconds(5))
      val stream = ssc.receiverStream(new SlackReceiver(args(1)))
      stream.print()
 -    if (args.length > 2) {
 -      stream.saveAsTextFiles(args(2))
 -    }
 +
 +    processStream(args, stream)
 +
      ssc.start()
      ssc.awaitTermination()
    }
 - 
 +
 +  def processStream(args: Array[String], stream: DStream[String]): Unit = {
 +    args match {
 +      case Array(_, _, path, _*) => stream.saveAsTextFiles(args(2))
 +      case _ => return
 +    }
 +
 +
 +  }
 +

As you can hopefully see, we just needed to extract the code looking for a command-line arg into a new function called `processStream`.  Also, we need to add one more line to the imports at the top

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

Next, we write the testing code.  To start, we need to create new directories to store the test code.  Create src/test/scala/com/supergloo directories.  Next, we add test code to this directory by creating the following Scala file: src/test/scala/com/supergloo/SlackStreamingTest.scala

package com.supergloo

import com.supergloo.SlackStreamingApp._
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ClockWrapper, Seconds, StreamingContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.io.Path
import scala.util.Try

class SlackStreamingTest extends FlatSpec with Matchers with Eventually with BeforeAndAfter {

  private val master = "local[1]"
  private val appName = "spark-streaming-test"
  private val filePath: String = "target/testfile"

  private var ssc: StreamingContext = _

  private val batchDuration = Seconds(1)

  var clock: ClockWrapper = _

  before {
    val conf = new SparkConf()
      .setMaster(master).setAppName(appName)
      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")

    ssc = new StreamingContext(conf, batchDuration)
    clock = new ClockWrapper(ssc)
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
    Try(Path(filePath + "-1000").deleteRecursively)
  }

  "Slack Streaming App " should " store streams into a file" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    dstream.print()
    processStream(Array("", "", filePath), dstream)


    ssc.start()

    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(1000)

    eventually(timeout(2 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (2)
      wFile.collect().foreach(println)
    }

  }

  "Slack Streaming App " should " store empty streams if no data received" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    dstream.print()
    processStream(Array("", "", filePath), dstream)


    ssc.start()

    clock.advance(1000)

    eventually(timeout(1 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (0)
      wFile.collect().foreach(println)
    }

  }

  "Slack Streaming App " should " not store streams if argument is not passed" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    dstream.print()
    processStream(Array("", ""), dstream)

    val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")

    ssc.start()

    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(2000)

    eventually(timeout(3 seconds)){
      a [InvalidInputException] should be thrownBy {
        wFile.count() should be (0)
      }
    }
  }
}

Next, we need to create addition directories and add ClockWrapper.scala to src/test/scala/org/apache/spark/streaming/.  More on this class later.

package org.apache.spark.streaming

import org.apache.spark.util.ManualClock

/**
  * This class is defined in this package as the ManualClock is
  * private in the "spark" package
  */
class ClockWrapper(ssc: StreamingContext) {

  def getTimeMillis(): Long = manualClock().getTimeMillis()

  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)

  def advance(timeToAdd: Long) = manualClock().advance(timeToAdd)

  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)

  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }
}

(By the way, ClockWrapper is taken from an approach I saw on Spark unit testing.  See “Additional Resouces” section below for link.)

Ok, we’re ready to execute now.

Execute Scala tests and coverage reports

In the spark-streaming-tests directory, we can now issue `sbt sanity` from command-line.  You should see all three tests pass:

[info] SlackStreamingTest:
[info] Slack Streaming App 
[info] - should store streams into a file
[info] Slack Streaming App 
[info] - should store empty streams if no data received
[info] Slack Streaming App 
[info] - should not store streams if argument is not passed
[info] Run completed in 4 seconds, 436 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

To review coverage reports, simply open target/scala-2.11/scoverage-report/index.html in a browser.

Spark Streaming Testing Conclusion

Hopefully, this Spark Streaming unit test example helps start your Spark Streaming testing approach.  We covered a code example, how to run and viewing the test coverage results.  If you have any questions or comments, let me know.  Also, subscribe to the Supergloo YouTube channel for an upcoming screencast from this post.

Additional Resources

Featured image credit https://flic.kr/p/dgSbYM

Leave a Reply

Your email address will not be published. Required fields are marked *