How to Debug Scala Spark in IntelliJ

Spark Scala Debug

Have you struggled to configure debugging in IntelliJ for your Spark programs?  Yeah, me too.  Debugging with Scala code was easy, but when I moved to Spark things didn’t work as expected.  So, in this tutorial, let’s cover debugging Scala based Spark programs in IntelliJ tutorial.  We’ll go through a few examples and utilize the occasional help of SBT.  These instructions and screencasts will hopefully allow you to start debugging Spark apps in IntelliJ and help me remember in the future.

We’ll break the topic of debugging Scala based Spark programs into two sections:

  1. Local Spark Debugging
  2. Remote Spark Debugging

As you’ll see in this tutorial there a few different options to choose from which depend on your Scala debug needs as well as if you are wishing to debug Spark in standalone mode vs debugging Spark job running on your cluster.  It is assumed that you are already familiar with the concepts and value of debugging your Spark Scala code, but we’ll quickly review a few key concepts before diving into the screencast examples.

SCala Breakpoints and Debuggers

First, let’s cover some background.  What’s the difference between Breakpoints and Debuggers?

      • breakpoint is a marker that you can set to specify where execution should pause when you are running your application
      • A debugger is provided in IntelliJ and is responsible for stopping at breakpoints and displaying the current program state
    • Breakpoints are stored in IntelliJ (not in your application’s code)

We’ll go through a few examples in this Scala Spark Debugging tutorial, but first, let’s get the requirements out of the way.

Debug Scala Spark Requirements

Portions of this tutorial require SBT and I’ve provided a sample project available for you to pull from Github below.  See Resources section below.

And here’s a shocker, another requirement is IntelliJ.  I know, I know, I caught you off guard their right?

The ultimate goal will be to use your own code and environment.  I know.  Totally get it.  Pull the aforementioned repo if you want to take a slow approach to debug Scala in Spark.  But, feel free to translate the following to your own code right away as well.  Hey man, I’m not telling you what to do.  You do what you do.  Ain’t no stoppin you now. Sing it with me. 🙂

Oh, nearly forgot, one more thing.  If you are entirely new to using IntelliJ for building Scala based Spark apps, you might wish to check out my previous tutorial on Scala Spark IntelliJ.  I’ll mention it in the resource section below as well.

Scala Breakpoints in IntelliJ Debugger Example

Let’s go through some examples.  Depending on your version of IntelliJ, you may not have the second option as I mentioned in the screencast.  But the first one has been working for me for a couple of years.  Don’t just stand there, let’s get to it.

Spark Debug Breakpoints in Scala Config Highlights

In the screencast above, there are two options covered.  One or both may work in your environment.  In part 1, we utilize the SBT configuration of the intelliJRunner seen in the `build.sbt` file

lazy val intellijRunner = project.in(file("intellijRunner")).dependsOn(RootProject(file("."))).settings(
  scalaVersion := "2.11.11",
  libraryDependencies ++= sparkDependencies.map(_ % "compile")
).disablePlugins(sbtassembly.AssemblyPlugin)

I showed how to use this `val` from within IntelliJ as a classpath module for one option of debugging Scala based Spark code in IntelliJ

Next, I showed a checkbox option from within IntelliJ itself for including the “Provided” scope.  As I mentioned on the screencast, this second option was new to me.  Must be in newer versions of the plugin or IntelliJ.  It wasn’t an option when I first started debugging Spark in IntelliJ.

Let me know in the comments below if you run into issues.  Again, there is a sample project to download from Github in the Resources section below.

Remote Spark Debug example

In this next screencast, I show how to set up remote debugging of Scala-based Spark code from IntelliJ.  In other words, how do you run remotely deployed Scala programs in the debugger?  Is this even an option?  Well, yes it is an option.  Now, “remotely” might be your Spark code running on a cluster in your cloud environment.  Or, it might be Spark code that has been deployed to a cluster running on the same machine as IntelliJ.  Either of these scenarios will apply.  You just need to modify the variables for your own situation.

In this screencast, I’ll show you the concepts and a working example.  Now, your environment might vary for security access, hostnames, etc. so try to stay focused on the key concepts shown in this Remote Debugging of Scala programs in IntelliJ

Remote Spark DEBUG Configuration Notes

As you saw, the key in this example is setting the `SPARK_SUBMIT_OPTS` variable--

export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

Next, we configured IntelliJ for a remote debugger based on the`SPARK_SUBMIT_OPTS` values such as `address`.

Hope this helps!  Let me know if you have any questions, suggestions for improvement or any free beer and tacos in the comments below.

Further Resources

Image credit https://pixabay.com/en/virus-microscope-infection-illness-1812092/

Spark Broadcast and Accumulator Examples in Scala

Spark Shared Variables Broadcast and Accumulators

Spark Broadcast and Accumulator Overview

So far, we’ve learned about distributing processing tasks across a Spark cluster.  But, let’s go a bit deeper in a couple of approaches you may need when designing distributed tasks.  I’d like to start with a question.  What do we do when we need each Spark worker task to coordinate certain variables and values with each other?  I mean, let’s imagine we want each task to know the state of variables or values instead of simply independently returning action results back to the driver program.   If you are thinking of terms such as shared state or “stateful” vs. “stateless”, then you are on the right track.  Or, that’s how I think of the Spark Broadcast and Accumulators.  In this post, we’ll discuss two constructs of sharing variables across a Spark cluster and then review example Scala code.

Spark Shared Variables

When functions are passed to a specific Spark operation, it is executed on a particular remote cluster node.  Usually, the operation is done in a way that different copy of variable(s) are used within the function. These particular variables are carefully copied into the different machines, and the updates to the variables in the said remote machines are not propagated back to the driver program. For this reason, one cannot support the general; read-write shared variables across the tasks and expects them to be efficient. Nevertheless, Spark does provide two different types (limited) of shared variables to two known usage patterns.

    • Broadcast variables
  • Accumulators

Spark Broadcast Variables

Broadcast variables allow Spark developers to keep a secured read-only variable cached on different nodes, other than merely shipping a copy of it with the needed tasks. For instance, they can be used to give a node a copy of a large input dataset without having to waste time with network transfer I/O. Spark has the ability to distribute broadcast variables using various broadcast algorithms which will in turn largely reduce the cost of communication.

Actions in Spark can be executed through different stages. These stages are separated by distributed “shuffle” operations. Within each stage, Spark automatically broadcasts common data needed in a cached, serialized form which will be de-serialized by each node before the running of each task.  For this reason, if you create broadcast variables explicitly, it should only be done when tasks across multiple stages are in need of the same data.

Broadcast variables are created by wrapping with SparkContext.broadcast function as shown in the following Scala code

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res2: Array[Int] = Array(1, 2, 3)

Spark Accumulators

As you might assume from the name, Accumulators are variables which may be added to through associated operations.  There are many uses for accumulators including implementing counters or sums.  Spark supports the accumulation of numeric types easily, but programmers can add support for other types.  If there is a particular name for an accumulator in code, it is usually displayed in the Spark UI, which will be useful in understanding the running stage progress.

Accumulators are created from an initial value v; i.e. `SparkContext.accumulator(v)`.  Then the tasks running in the cluster can be added to it using the known “add method” or += operator in Scala. They cannot, however, read the value of it. The driver program has the ability to read the value of the accumulator, using the `value` method as shown below

scala> val accum = sc.accumulator(0, "Accumulator Example")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)

scala> accum.value
res4: Int = 6

Spark Broadcast and Spark Accumulators Examples

With this background on broadcast and accumulators, let’s take a look at more extensive examples in Scala.  The context of the following example code is developing a web server log file analyzer for certain types of http status codes. We can easily imagine the advantages of using Spark when processing a large volume of log file data.  See the Resources section below for source code download links.

Let’s start with an object containing the `main` method and definition of one broadcast variable and numerous accumulators:

object Boot {

 import utils.Utils._

 def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf(true)
     .setMaster("local[2]")
     .setAppName("SparkAnalyzer")

   val sparkContext = new SparkContext(sparkConf)

   /**
     * Defining list of all HTTP status codes divided into status groups
     * This list is read only, and it is used for parsing access log file in order to count status code groups
     *
     * This example of broadcast variable shows how broadcast value
     */
   val httpStatusList = sparkContext broadcast populateHttpStatusList

   /**
     * Definition of accumulators for counting specific HTTP status codes
     * Accumulator variable is used because of all the updates to this variable in every executor is relayed back to the driver.
     * Otherwise they are local variable on executor and it is not relayed back to driver
     * so driver value is not changed
     */
   val httpInfo = sparkContext accumulator(0, "HTTP 1xx")
   val httpSuccess = sparkContext accumulator(0, "HTTP 2xx")
   val httpRedirect = sparkContext accumulator(0, "HTTP 3xx")
   val httpClientError = sparkContext accumulator(0, "HTTP 4xx")
   val httpServerError = sparkContext accumulator(0, "HTTP 5xx")

   /**
     * Iterate over access.log file and parse every line
     * for every line extract HTTP status code from it and update appropriate accumulator variable
     */
   sparkContext.textFile(getClass.getResource("/access.log").getPath, 2).foreach { line =>
     httpStatusList.value foreach {
       case httpInfoStatus: HttpInfoStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpInfoStatus))) => httpInfo += 1
       case httpSuccessStatus: HttpSuccessStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpSuccessStatus))) => httpSuccess += 1
       case httpRedirectStatus: HttpRedirectStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpRedirectStatus))) => httpRedirect += 1
       case httpClientErrorStatus: HttpClientErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpClientErrorStatus))) => httpClientError += 1
       case httpServerErrorStatus: HttpServerErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpServerErrorStatus))) => httpServerError += 1
       case _ =>
     }
   }

   println("########## START ##########")
   println("Printing HttpStatusCodes result from parsing access log")
   println(s"HttpStatusInfo : ${httpInfo.value}")
   println(s"HttpStatusSuccess : ${httpSuccess.value}")
   println(s"HttpStatusRedirect : ${httpRedirect.value}")
   println(s"HttpStatusClientError : ${httpClientError.value}")
   println(s"HttpStatusServerError : ${httpServerError.value}")
   println("########## END ##########")

   sparkContext.stop()
 }

}

As you can hopefully see above, we plan to use the `httpStatusList` when determining which accumulator to update.

`populateHttpStatusList` is available from the import to Utils and looks like

object Utils {

  private val httpStatuses = List(
    "100", "101", "103",
    "200", "201", "202", "203", "204", "205", "206",
    "300", "301", "302", "303", "304", "305", "306", "307", "308",
    "400", "401", "402", "403", "404", "405", "406", "407", "408", "409", "410", "411", "412", "413", "414", "415", "416", "417",
    "500", "501", "502", "503", "504", "505", "511"
  )

  def populateHttpStatusList(): List[HttpStatus] = {
      httpStatuses map createHttpStatus
  }

  def createHttpStatus(status: String): HttpStatus = status match {
    case status if (status.startsWith("1")) => HttpInfoStatus(status)
    case status if (status.startsWith("2")) => HttpSuccessStatus(status)
    case status if (status.startsWith("3")) => HttpRedirectStatus(status)
    case status if (status.startsWith("4")) => HttpClientErrorStatus(status)
    case status if (status.startsWith("5")) => HttpServerErrorStatus(status)
  }

}

AccessLogParser could be considered a wrapper for regular expression boogie woogie as seen next

object AccessLogParser extends Serializable {
  import Utils._

  private val ddd = "\\d{1,3}"
  private val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
  private val client = "(\\S+)"
  private val user = "(\\S+)"
  private val dateTime = "(\\[.+?\\])"
  private val request = "\"(.*?)\""
  private val status = "(\\d{3})"
  private val bytes = "(\\S+)"
  private val referer = "\"(.*?)\""
  private val agent = "\"(.*?)\""
  private val accessLogRegex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
  private val p = Pattern.compile(accessLogRegex)

  /**
    * Extract HTTP status code and create HttpStatus instance for given status code
    */
  def parseHttpStatusCode(logLine: String): Option[HttpStatus] = {
    val matcher = p.matcher(logLine)
    if(matcher.find) {
      Some(createHttpStatus(matcher.group(6)))
    }
    else {
      None
    }
  }

}

The real credit for this regex mastery goes to alvinj at https://github.com/alvinj and in particular https://github.com/alvinj/ScalaApacheAccessLogParser/blob/master/src/main/scala/AccessLogParser.scala

Resources

Featured image credit https://flic.kr/p/wYHqe

IntelliJ Scala and Apache Spark – Well, Now You Know

Intellij Scala Spark

IntelliJ Scala and Spark Setup Overview

In this tutorial, we’re going to review one way to setup IntelliJ for Scala and Spark development.  The IntelliJ Scala combination is the best, free setup for Scala and Spark development.  And I have nothing against ScalaIDE (Eclipse for Scala) or using editors such as Sublime.  I switched from Eclipse years ago and haven’t looked back.  I’ve also sincerely tried to follow the Pragmatic Programmer suggestion of using one editor (IDE), but I keep coming back to IntelliJ when doing JVM-based development.

But, you probably don’t really care about all my history, though.  Let’s get back to you.  You’re here to setup IntelliJ with Scala and hopefully use it with Spark, right?

In this tutorial, we’re going to try to go fast with lots of screenshots.  If you have questions or comments on how to improve, let me know.

After you complete this Spark with IntelliJ tutorial, I know you’ll find the Spark Debug in IntelliJ tutorial helpful as well.

Assumptions

I’m going to make assumptions about you in this post.

  1. You are not a newbie to programming and computers.  You know how to download and install software.
  2. You might need to update these instructions for your environment.  YMMV.  I’m not going to cover every nuance between Linux, OS X and Windows.  And no, I’m not going to cover SunOS vs Solaris for you old timers like me.
  3. You will speak up if you have questions or suggestions on how to improve.  There should be a comments section at the bottom of this post.
  4. You’re a fairly nice person who appreciates a variety of joke formats now and again.

If you have any issues or concerns with these assumptions, please leave now.  It will be better for both of us.

Prerequisites (AKA: Requirements for your environment)

Configuration Steps (AKA: Ándale arriba arriba)

  1. Start IntelliJ for first time
  2. Install Scala plugin
  3. Create New Project for Scala Spark development
  4. Scala Smoketest.  Create and run Scala HelloMundo program
  5. Scala Spark Smoketest.  Create and run a Scala Spark program
  6. Eat, drink and be merry

Ok, let’s go.

1. Start IntelliJ for first time

Is this your first time running IntelliJ?  If so, start here.  Otherwise, move to #2.

When you start IntelliJ for the first time, it will guide you through a series of screens similar to the following.

At one point, you will be asked if you would like to install the Scala plugin from “Featured” plugins screen such as this:

intellij scala

Do that.  Click Install to install the Scala plugin.

2. Install Scala plugin

If this is not the first time, you’ve launched IntelliJ and you do not have the Scala plugin installed, then stay here.  To install the Scala plugin, here’s a screencast how to do it from a Mac.  (Note: I already have it installed, so you need to check the box)

3. Create New Project for Scala Spark development

Ok, we want to create a super simple project to make sure we are on the right course.  Here’s a screencast of me being on the correct course for Scala Intellij projects

4. Create and Run Scala HelloMundo program

Well, nothing to see here.  Take a break if you want.  We are halfway home.  See the screencast in the previous step.  That’s it, because we ran the HelloMundo code in that screencast already.

5. Create and Run Scala Spark program

Let’s create another Scala object and add some Spark API calls to it.  Again, let’s make this as simple (AKA: KISS principle)  as possible to make sure we are on the correct course.  In this step, we create a new Scala object and import Spark jars as library dependencies in IntelliJ.  Everything doesn’t run perfectly, so watch how to address it in the video.  Oooh, we’re talking bigtime drama here people.  Hold on.

Here’s a screencast

Did I surprise with the Scala 2.11 vs. Scala 2.10 snafu?  I don’t mean to mess with you.  Just trying to keep it interesting.  Check out the other Spark tutorials on this site or Spark with Scala course on where I deal with this fairly common scenario in much more detail.  This is a post about Intellij Scala and Spark.

Notice how I’m showing that I have a Standalone Spark cluster running.  You need to have one running in order for this Spark Scala example to run correctly.  See Standalone Spark cluster if need some help with this setup.

Code for the Scala Spark program

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

/**
  * Created by toddmcgrath on 6/15/16.
  */
object SimpleScalaSpark {

  def main(args: Array[String]) {
    val logFile = "/Users/toddmcgrath/Development/spark-1.6.1-bin-hadoop2.4/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }

}

6. Conclusion (AKA: eat, drink and be merry)

You’re now set.  Next step for you might be adding SBT into the mix.  But, for now, let’s just enjoy this moment.  You just completed Spark with Scala in IntelliJ.

If you have suggestions on how to improve this tutorial or any other feedback or ideas, let me know in the comments below.

Scala with IntelliJ Additional Resources

As mentioned above, don’t forget about the next tutorial How to Debug Scala Spark in IntelliJ.

IntelliJ Spark Scala YouTube Playlist

For those looking for even more Scala with IntelliJ, you might find the Just Enough Scala for Apache Spark course valuable.

Apache Spark, Cassandra and Game of Thrones

Spark Cassandra tutorial

Apache Spark with Cassandra is a powerful combination in data processing pipelines.  In this tutorial, we will build a Scala application with Spark and  Cassandra with battle data from Game of Thrones.  Now, we’re not going to make any show predictions!   But, we will show the most aggressive kings as well as kings which were attacked the most.  So, you’ve got that goin for you, which is kind of nice.

Spark Cassandra Overview

Our primary focus is the technical highlights of Spark Cassandra integration with Scala.  To do so, we will load up Cassandra with Game of Thrones battle data and then query it from Spark using Scala.  We’ll use Spark from both a shell as well as deploying a Spark Driver program to a cluster.  We’ll have examples of Scala case class marshalling courtesy of the DataStax connector as well as using SparkSQL to produce DataFrames.   We’re also mix in some sbt configuration as well.

There are screencasts and relevant links at the bottom of this post in the “Resources” section.

The intended audience of this Spark Cassandra tutorial is someone with beginning to intermediate experience with Scala and Apache Spark.  If you would like to reach this level quickly and efficiently, please check out our On-Demand Apache Spark with Scala Training Course.

Spark Cassandra Pre-Requisites

  1. Apache Cassandra (see resources below)
  2. Downloaded Game of Thrones data (see resources below)
  3. Apache Spark

Tutorial Outline

  1. Import data into Cassandra
  2. Write Scala code
  3. Test Spark Cassandra code in SBT shell
  4. Deploy Spark Cassandra to Spark Cluster with SBT and `spark-submit`

Spark Cassandra Example

Part 1: Prepare Cassandra

Let’s import the GOT battle data into Cassandra.  To keep things simple, I’m going to use a local running Cassandra instance.  I started Cassandra with bin/cassandra script on my Mac.  (use cassandra.bat on Windows, but you knew that already.).

Next, connect to Cassandra with cqlsh and create a keyspace to use.  This tutorial creates a “gameofthrones” keyspace:

CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

From here, we create a table for the battle data.

CREATE TABLE gameofthrones.battles (
name TEXT,
year INT,
battle_number INT,
attacker_king TEXT,
defender_king TEXT,
attacker_1 TEXT,
attacker_2 TEXT,
attacker_3 TEXT,
attacker_4 TEXT,
defender_1 TEXT,
defender_2 TEXT,
defender_3 TEXT,
defender_4 TEXT,
attacker_outcome TEXT,
battle_type TEXT,
major_death TEXT,
major_capture TEXT,
attacker_size TEXT,
defender_size TEXT,
attacker_commander TEXT,
defender_commander TEXT,
summer TEXT,
location TEXT,
region TEXT,
note TEXT,
PRIMARY KEY(battle_number)
);

Then import the battles data using Cassandra COPY shown below.  (see Resouces section below for where to download data).  BTW, I needed to run a Perl script to update the end-of-line encodings from Mac to Unix on the CSV file using perl -pi -e 's/\r/\n/g.  Your mileage may vary.

COPY battles (
name,
year,
battle_number,
attacker_king,
defender_king,
attacker_1,
attacker_2,
attacker_3,
attacker_4,
defender_1,
defender_2,
defender_3,
defender_4,
attacker_outcome,
battle_type,
major_death,
major_capture,
attacker_size,
defender_size,
attacker_commander,
defender_commander,
summer,
location,
region,
note)
FROM 'battles.csv'  // update this location as necessary
WITH HEADER = true;

That wraps Part 1.  Let’s move on to Part 2 where we’ll write some Scala code.

Part 2: Spark Cassandra Scala Code

(Note: All of the following sample code if available from Github.  Link in Resources section below.)

To begin, let’s layout the skeleton structure of the project –

mkdir got-battles // name it anything you'd like

cd got-battles  // if you named it got-battles

mkdir src

mkdir src/main

mkdir src/main/scala

mkdir src/main/scala/com

mkdir src/main/scala/com/supergloo

mkdir project

Next, we’re going to add some files for sbt and the sbt-assembly plugin.

First the build file for sbt

got-battles/build.sbt file:

name := "spark-cassandra-example"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

// https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnU
assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
  (old) => {
    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
    case x => old(x)
  }
}

scalaVersion := "2.10.6"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
// use provided line when building assembly jar
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
// comment above line and uncomment the following to run in sbt
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"
)

and the 1 line got-battles/project/assembly.sbt file:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

And now let’s create the Spark driver code in got-battles/src/main/scala/com/supergloo called SparkCassandra.scala

package com.supergloo
 
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
 
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd._
import org.apache.spark.sql.cassandra._


/**
  * Simple Spark Cassandra 
  * One example with Scala case class marshalling
  * Another example using Spark SQL 
  */
object SparkCassandra {

  case class Battle(    
    battle_number: Integer,
    year: Integer,
    attacker_king: String,
    defender_king: String
  )

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("SparkCassandraExampleApp")

    if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5]

    conf.set("spark.cassandra.connection.host", "127.0.0.1")  // so yes, I'm assuming Cassandra is running locally here.
                                                              // adjust as needed

    val sc = new SparkContext(conf)

    // Spark Cassandra Example one which marshalls to Scala case classes
    val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles").
                                        select("battle_number","year","attacker_king","defender_king").toArray
    
    battles.foreach { b: Battle =>
        println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king))
    }


    // Spark Cassandra Example Two - Create DataFrame from Spark SQL read
    val sqlContext = new SQLContext(sc)

    val df = sqlContext.read
              .format("org.apache.spark.sql.cassandra")
              .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" ))
              .load()

    df.show

    // Game of Thrones Battle analysis 

    // Who were the most aggressive kings?  (most attacker_kings)
    val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count"))
    countsByAttack.show()

    // Which kings were attacked the most?  (most defender_kings)
    val countsByDefend = df.groupBy("defender_king").count().sort(desc("count"))
    countsByDefend.show()

    sc.stop()
    
  }
}

Part 3: Run Spark Cassandra Scala Code from SBT Console

Start up the sbt console via sbt.  Once ready, you can issue the run command with an argument for your Spark Master location; i.e. run local[5]

(Again, there’s a screencast at the end of this post which shows an example of running this command.  See Resources section below.)

Depending on your log level, you should see various SparkCassandra outputs from the SparkCassandra code.  These console outputs from Cassandra is the indicator of success.  Oh yeah.  Say it with me now.  Oh yeahhhhh

Running code in the sbt console is a convenient way to make and test changes rapidly.  As I developed this code, there was a terminal open in one window and an editor open in another window.  Whenever I made a Scala source code change and saved, I could simply re-run in the sbt console.

So now, let’s say we’ve reached the point of wanting to deploy this Spark program.  Let’s find out in the next section.

Part 4: Assemble Spark Cassandra Scala Code and Deploy to Spark Cluster

To build a jar and deploy to a Spark cluster, we need to make a small change to our build.sbt file.  As you may have noticed from the code above, there are comments in the file which indicate what needs to be changed.  We should uncomment this line:

// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",

and comment out this line:

   "org.apache.spark" %% "spark-sql" % "1.6.1",

then, we can run sbt assembly from command-line to produce a Spark deployable jar.  If you use the sample build.sbt file this will produce target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

To deploy, use spark-submit with the appropriate arguments; i.e.

spark-submit – class "com.supergloo.SparkCassandra" – master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit – class "com.supergloo.SparkCassandra" – master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

Conclusion

So, what do you think?  When you run the code, you can see the most aggressive kings and the kings which were attacked the most.  Without giving it away, I think one could argue whether Mance Rayder should be tied with Renly Baratheon on the most attacked list.  But, that’s not really the point of this tutorial.  As for the code and setup, do you have any questions, opinions or suggestions for next steps?   

Spark Cassandra Tutorial Screencast

In the following screencast, I run through the steps described in this tutorial.  Stay tuned because there is blooper footage at the end of the screencast.  Because I mean, why not bloopers.

Spark Cassandra Tutorial Resources

  1. All source code including the battles.csv file I scrubbed using the perl script described above at Apache Spark Cassandra Example code
  2. https://github.com/datastax/spark-cassandra-connector
  3. DataFrames with Cassandra Connector
  4. Game of Thrones Data: https://github.com/chrisalbon/war_of_the_five_kings_dataset

And don’t forget the Spark Scala tutorials and speaking of Cassandra, you may find the Spark Thrift Server with Cassandra tutorial interesting too.

Featured image credit https://flic.kr/p/dvpku1

Spark Scala with 3rd Party JARs Deploy to a Cluster

Spark Apache Cluster Deploy with 3rd Party Jars

Overview

In this Apache Spark cluster deploy tutorial, we’ll cover how to deploy Spark driver programs to a Spark cluster when the driver program utilizes third-party jars.  In this case, we’re going to use code examples from previous Spark SQL and Spark Streaming tutorials.

At the end of this tutorial, there is a screencast of all the steps.  Also, see Reference section below for Apache Spark Cluster Deploy Part I and II, source code reference and links to the Spark SQL and Spark Streaming tutorials.

Steps

  1. Source code layout
  2. Install assembly plugin
  3. Review build.sbt file
  4. Package Spark Driver Program for Deploy
  5. Deploy to Spark Cluster

1. Source Code Layout

As you see, there is nothing extraordinary about our source code layout.  We’re going to build with `sbt`, so there are the usual suspect directories and files including: src/main/scala, project, project/build.properites and build.sbt file.

2. Install SBT Assembly Plugin

In order to package our 3rd party dependencies into a convenient “fat jar”, we’re going to install and use the sbt-assembly plugin.  The plugin is described as “The goal is simple: Create a fat JAR of your project with all of its dependencies”.  Installing this plugin is simple, just create a file called `assembly.sbt` and add it to your project/ directory.  In our case, the file contains one line:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

3. Review build.sbt file for Apache Spark Cluster Deploy

name := "spark-sql-examples"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
  "com.databricks" %% "spark-csv" % "1.3.0",
  "mysql" % "mysql-connector-java" % "5.1.12"
)

Again, links to source code are included below in the reference section.  At the root of the spark-sql/ directory, there is the above `build.sbt` file.  In this file, there are a couple of lines worth discussing.  First, the line beginning with `assemblyOption` is an instruction to our sbt-assembly plugin to not include Scala jars in the “fat jar” we’re going to build for Spark deploy.

Next, notice how we indicate the “spark-sql” library is already provided.

"org.apache.spark" %% "spark-sql" % "1.6.1" % "provided"

This indicates to sbt-assembly to not include it in the jar we’re going to assemble for deploy.

Everything else seems fairly standard to me, but let me know if you have any questions in the comments.  Let’s keep going.

4. Package Spark Driver Program for Deploy

From your shell or editor, simply run `sbt assembly` to produce the fat jar.  In this case, the file created will be target/scala-2.11/spark-sql-examples-assembly-1.0.jar.  This is the jar will deploy with `spark-submit`.

5. Deploying Fat Jar to the Spark Cluster

Nothing out of the ordinary here.  Just run `spark-submit` using the jar produced in Step 4.  For example

spark-1.6.1-bin-hadoop2.4/bin/spark-submit – class "com.supergloo.SparkSQLJDBCApp" – master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.11/spark-sql-examples-assembly-1.0.jar

Screencast

Here’s a screencast from the Apache Spark with Scala training course which performs the steps above.

Resources

If interested in further Apache Spark with Scala training, make sure to checkout out our Apache Spark with Scala training course

Featured image credit: https://flic.kr/p/qsyGca