Spark Kinesis Example – Moving Beyond Word Count

spark kinesis

If you are looking for Spark with Kinesis example, you are in the right place.  This Spark Streaming with Kinesis tutorial intends to help you become better at integrating the two.

In this tutorial, we’ll examine some custom Spark Kinesis code and also show a screencast of running it.  In addition, we’re going to cover running, configuring, sending sample data and AWS setup.  Finally, I’m going to list out some links for the content which helped me become more comfortable in Spark Kinesis code and configuration.

Ready?  Here we go.

If you have questions or suggestions, please let me know in the comment form below.

Spark Kinesis Tutorial Example Overview

In this example, we’re going to simulate sensor devices recording their temperature to a Kinesis stream.  This Kinesis stream will be read from our Spark Scala program every 2 seconds and notify us of two things:

  1. If a sensor’s temperature is above 100
  2. The top two sensors’ temps over the previous 20 seconds

So, nothing too complicated, but close enough to a possible real-world scenario of reading and analyzing stream(s) of data and acting on certain results.

In this tutorial, here’s how we’re going to cover things in the following order

  1. Check assumptions
  2. Present the code (Scala) and configuration example
  3. Go through AWS Kinesis setup and also Amazon Kinesis Data Generator
  4. Run in IntelliJ
  5. How to build and deploy outside of IntelliJ
  6. Rejoice, savor the moment, and thank the author of this tutorial with $150 PayPal donation

Sound good? I hope so, let’s begin.

Spark with Kinesis Tutorial Assumptions

I’m making the following assumptions about you when writing this tutorial.

  1. You have a basic understanding of Amazon Kinesis
  2. You have experience writing and deploying Apache Spark programs
  3. You have an AWS account and understand using AWS costs money.  (AKA: moolah, cash money).  It costs you or your company money to run in AWS.
  4. You have set your AWS access id and key appropriately for your environment.

If any of these assumptions are incorrect, you are probably going to struggle with this Spark Kinesis integration tutorial.  See Spark Tutorials in Scala or Spark Tutorial in Python and What is Apache Spark for background information.

Spark Kinesis Example Scala Code

Let’s start with the code…

object SparkKinesisExample extends Logging {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Kinesis Read Sensor Data")
    conf.setIfMissing("spark.master", "local[*]")

    // Typesafe config - load external config from src/main/resources/application.conf
    val kinesisConf = ConfigFactory.load.getConfig("kinesis")

    val appName = kinesisConf.getString("appName")
    val streamName = kinesisConf.getString("streamName")
    val endpointUrl = kinesisConf.getString("endpointUrl")

    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
    require(credentials != null,
      "No AWS credentials found. See http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
    val kinesisClient = new AmazonKinesisClient(credentials)
    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val numStreams = numShards
    val batchInterval = Milliseconds(2000)
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()

    val ssc = new StreamingContext(conf, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until numStreams).map { i =>
      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
    }

    // Union all the streams (in case numStreams > 1)
    val unionStreams = ssc.union(kinesisStreams)

    val sensorData = unionStreams.map { byteArray =>
      val Array(sensorId, temp, status) = new String(byteArray).split(",")
      SensorData(sensorId, temp.toInt, status)
    }

    val hotSensors: DStream[SensorData] = sensorData.filter(_.currentTemp > 100)

    hotSensors.print(1) // remove me if you want... this is just to spit out timestamps

    println(s"Sensors with Temp > 100")
    hotSensors.map { sd =>
      println(s"Sensor id ${sd.id} has temp of ${sd.currentTemp}")
    }

    // Hotest sensors over the last 20 seconds
    hotSensors.window(Seconds(20)).foreachRDD { rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._

      val hotSensorDF = rdd.toDF()
      hotSensorDF.createOrReplaceTempView("hot_sensors")

      val hottestOverTime = spark.sql("select * from hot_sensors order by currentTemp desc limit 5")
      hottestOverTime.show(2)
    }

    // To make sure data is not deleted by the time we query it interactively
    ssc.remember(Minutes(1))

    ssc.start()
    ssc.awaitTermination()
  }
}
case class SensorData(id: String, currentTemp: Int, status: String)

(Note: The entire code is available from my Github repo.  See links in the Resources section below.)

The first 20 or so lines of the `main` function are just setting things up.  For example, we are reading the particulars of the Kinesis stream (streamName, endpointURL, etc.) from a config file.  You will need to change the config variables in the file `src/main/resources/application.conf` to values appropriate for your Kinesis setup.

Then, we start utilizing the code provided by Amazon.  You’ll see reference to `sparkstreamingkinesisasl` in the build.sbt file in the Github repo.

Next, we create a dynamic number of streams `kinesisStreams` based on the number of shards configured in our Kinesis stream.  Make sure to view the screencast below for further insight on this subject.  In the screencast, I go over setting up Kinesis and running this program.

We utilize the AWS SDK `KinesisUtils` object’s `createStream` method to register each stream.  We set the initial position to start in the stream as the very latest record.  In other words, don’t worry about anything that has already been added to the stream.

(Some of you might be wondering at this point… this doesn’t look like Spark Structured Streaming?  And then you might be thinking, we should be using Structured Streaming for anything new right?  I’d say you’re right.  But, at the time of this writing, Structured Streaming for Kinesis is not available in Spark outside of DataBricks.  Relevant links on this subject below in the Resouces section.)

After we create the streams, we perform a `union` see can analyze each stream as one.  After the union, we convert our stream from an `Array[Byte]` to `DStream[SensorData]`.  If you set up your Kinesis stream with 1 shard as I did, there will only be one stream.

From here, we perform our pseudo business logic.  We are looking for sensors which might be running hot (e.g. over 100).  If this was a real application, our code might trigger an event based on this temperature.

This example also gives us the opportunity to perform some spark streaming windowing.  Windowing allows us to analyze and consider data that previously arrived in the stream and not only data present compute time (the current iteration of micro-batch).  For example, if we wanted to determine the hottest two sensors over the past 20 seconds and not just sensor data in a particular batch.  To this, consider the code starting at `hotSensors.window` block.  Within the block, notice the import for implicits.  This allows us to convert our RDDs of `SensorData` to DataFrames later in the block.

That’s it.  I think the code speaks for itself, but as I mentioned above, let me know if you have any questions or suggestions for improvement.

AWS KINESIS SETUP

Setting up Kinesis requires an AWS account.  In this example, it’s nothing fancy.  I created in the us-west-2 region because that’s where the Kinesis Generator is, but I don’t think it really matters.  The Kinesis stream is just 1 shard (aka partition) with default settings on others.  I did need to modify security policies to work, which I show in the screencast.

Amazon Kinesis Generator Setup

I like being lazy sometimes.  So, when I found that Amazon provides a service to send fake data to Kinesis, I jumped all over it.  I mean, sure, I could write my own Java, Python or Scala program to do it, but using this Kinesis Generator was easier and faster.  I like how it has integrated Faker in order to provide dynamic data.

Running Spark Kinesis Scala code in IntelliJ

I’m going to run this in IntelliJ because it simulates how I work.  I like to develop and test in IntelliJ first before building and deploying a jar.  So, check out the screencast for some running-in-intellij-fun. In order to run in IntelliJ, I’ve customized my `build.sbt` file and updated the Run/Debug window in Intellij to `intellijRunner` classpath.  Also, as previously mentioned, you need to update the configuration settings in the `src/main/resources/application.conf` file.

(Might be interested in Debugging Spark in IntelliJ here)

Finally, as previously mentioned I assume you have your AWS security (id and key) setup with confirmed working.  I don’t show how I set mine up in the screencast, but I include a link for more information in Resources below.

Build and Deploy Spark Kinesis Example

The entire project is configured with SBT assembly plugin.  See`project/assembly.sbt`  To build a deployable jar, run the `assembly` sbt task.  Nothing fancy in the deploy then…just deploy with `spark-submit` and reference the `com.supergloo.SparkKinesisExample` class.

(For more info, might be interested Spark Deploy tutorial or Spark EC2 Deploy Tutorial)

Cash Money Time

I like money.  This is the part where you send me $150.   I’ll wait here until you send it.  Thanks in advance.  🙂

Spark Kinesis Screencast

While I’m waiting for you to send me money, check out the screencast

Spark Kinesis Example

I’m still waiting for you to send the cash $$$ by the way.

Resources

Featured image credit: https://flic.kr/p/6vfaHV

2 thoughts on “Spark Kinesis Example – Moving Beyond Word Count

  1. Is Kinesis with Spark (regular not structured) production stable for large applications and is it a good integration? I don’t see Kinesis with Spark often. I have an AWS environment and I need a streaming platform – I was thinking Kafka and Spark Structured Streaming but saw Kinesis used in AWS env at several places for streaming in AWS.

Leave a Reply

Your email address will not be published. Required fields are marked *