How to Debug Scala Spark in IntelliJ

Spark Scala Debug

Have you struggled to configure debugging in IntelliJ for your Spark programs?  Yeah, me too.  Debugging with Scala code was easy, but when I moved to Spark things didn’t work as expected.  So, in this tutorial, let’s cover debugging Scala based Spark programs in IntelliJ tutorial.  We’ll go through a few examples and utilize the occasional help of SBT.  These instructions and screencasts will hopefully allow you to start debugging Spark apps in IntelliJ and help me remember in the future.

We’ll break the topic of debugging Scala based Spark programs into two sections:

  1. Local Spark Debugging
  2. Remote Spark Debugging

 

As you’ll see in this tutorial there a few different options to choose from which depend on your Scala debug needs as well as if you are wishing to debug Spark in standalone mode vs debugging Spark job running on your cluster.  It is assumed that you are already familiar with the concepts and value of debugging your Spark Scala code, but we’ll quickly review a few key concepts before diving into the screencast examples.

SCala Breakpoints and Debuggers

First, let’s cover some background.  What’s the difference between Breakpoints and Debuggers?

    • breakpoint is a marker that you can set to specify where execution should pause when you are running your application
    • A debugger is provided in IntelliJ and is responsible for stopping at breakpoints and displaying the current program state
    • Breakpoints are stored in IntelliJ (not in your application’s code)

 

We’ll go through a few examples in this Scala Spark Debugging tutorial, but first, let’s get the requirements out of the way.

Debug Scala Spark Requirements

Portions of this tutorial require SBT and I’ve provided a sample project available for you to pull from Github below.  See Resources section below.

And here’s a shocker, another requirement is IntelliJ.  I know, I know, I caught you off guard their right?

The ultimate goal will be to use your own code and environment.  I know.  Totally get it.  Pull the aforementioned repo if you want to take a slow approach to debug Scala in Spark.  But, feel free to translate the following to your own code right away as well.  Hey man, I’m not telling you what to do.  You do what you do.  Ain’t no stoppin you now. Sing it with me.

Oh, nearly forgot, one more thing.  If you are entirely new to using IntelliJ for building Scala based Spark apps, you might wish to check out my previous tutorial on Scala Spark IntelliJ.  I’ll mention it in the resource section below as well.

Scala Breakpoints in IntelliJ Debugger Example

Let’s go through some examples.  Depending on your version of IntelliJ, you may not have the second option as I mentioned in the screencast.  But the first one has been working for me for a couple of years.  Don’t just stand there, let’s get to it.

 

Debugging Scala Spark in IntelliJ Part 1

 

Spark Debug Breakpoints in Scala Config Highlights

In the screencast above, there are two options covered.  One or both may work in your environment.  In part 1, we utilize the SBT configuration of the intelliJRunner seen in the `build.sbt` file

lazy val intellijRunner = project.in(file("intellijRunner")).dependsOn(RootProject(file("."))).settings(
  scalaVersion := "2.11.11",
  libraryDependencies ++= sparkDependencies.map(_ % "compile")
).disablePlugins(sbtassembly.AssemblyPlugin)

I showed how to use this `val` from within IntelliJ as a classpath module for one option of debugging Scala based Spark code in IntelliJ

Next, I showed a checkbox option from within IntelliJ itself for including the “Provided” scope.  As I mentioned on the screencast, this second option was new to me.  Must be in newer versions of plugin or IntelliJ.  It wasn’t an option when I first started debugging Spark in IntelliJ.

Let me know in the comments below if you run into issues.  Again, there is a sample project to download from Github in the Resources section below.

 

Remote SPARK DEBUG example

In this next screencast, I show how to set up remote debugging of Scala based Spark code from IntelliJ.  In other words, how do you run remotely deployed Scala programs in the debugger?  Is this even an option?  Well, yes it is an option.  Now, “remotely” might be your Spark code running on a cluster in your cloud environment.  Or, it might be Spark code that has been deployed to a cluster running on the same machine as IntelliJ.  Either of these scenarios will apply.  You just need to modify the variables for your own situation.

In this screencast, I’ll show you the concepts and a working example.  Now, your environment might vary for security access, hostnames, etc. so try to stay focused on the key concepts shown in this Remote Debugging of Scala programs in IntelliJ

Debugging Scala Spark in IntelliJ Part 2

Remote Spark DEBUG Configuration Notes

As you saw, the key in this example is setting the `SPARK_SUBMIT_OPTS` variable–

export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

Next, we configured IntelliJ for a remote debugger based on the`SPARK_SUBMIT_OPTS` values such as `address`.

Hope this helps!  Let me know if you have any questions, suggestions for improvement or any free beer and tacos in the comments below.

 

Further Resources

 

Image credit https://pixabay.com/en/virus-microscope-infection-illness-1812092/

Spark Broadcast and Accumulator Examples in Scala

Spark Shared Variables Broadcast and Accumulators

Spark Broadcast and Accumulator Overview

So far, we’ve learned about distributing processing tasks across a Spark cluster.  But, let’s go a bit deeper in a couple of approaches you may need when designing distributed tasks.  I’d like to start with a question.  What do we do when we need each Spark worker task to coordinate certain variables and values with each other?  I mean, let’s imagine we want each task to know the state of variables or values instead of simply independently returning action results back to the driver program.   If you are thinking of terms such as shared state or “stateful” vs. “stateless”, then you are on the right track.  Or, that’s how I think of the Spark Broadcast and Accumulators.  In this post, we’ll discuss two constructs of sharing variables across a Spark cluster and then review example Scala code.

Spark Shared Variables

When functions are passed to a specific Spark operation, it is executed on a particular remote cluster node.  Usually, the operation is done in a way that different copy of variable(s) are used within the function. These particular variables are carefully copied into the different machines, and the updates to the variables in the said remote machines are not propagated back to the driver program. For this reason, one cannot support the general; read-write shared variables across the tasks and expects them to be efficient. Nevertheless, Spark does provide two different types (limited) of shared variables to two known usage patterns.

  • Broadcast variables
  • Accumulators

Broadcast Variables

Broadcast variables allow Spark developers to keep a secured read-only variable cached on different nodes, other than merely shipping a copy of it with the needed tasks. For an instance, they can be used to give a node a copy of a large input dataset without having to waste time with network transfer I/O. Spark has the ability to distribute broadcast variables using various broadcast algorithms which will in turn largely reduce the cost of communication.

Actions in Spark can be executed through different stages. These stages are separated by distributed “shuffle” operations. Within each stage, Spark automatically broadcasts common data needed in a cached, serialized form which will be de-serialized by each node before the running of each task.  For this reason, if you create broadcast variables explicitly, it should only be done when tasks across multiple stages are in need of same data.

Broadcast variables are created by wrapping with SparkContext.broadcast function as shown in the following Scala code

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res2: Array[Int] = Array(1, 2, 3)

 

Accumulators

As you might assume from the name, Accumulators are variables which may be added to through associated operations.  There are many uses for accumulators including implementing counters or sums.  Spark supports the accumulation of numeric types easily, but programmers can add support for other types.  If there is a particular name for an accumulator in code, it is usually displayed in the Spark UI, which will be useful in understanding the running stage progress.

Accumulators are created from an initial value v; i.e. `SparkContext.accumulator(v)`.  Then the tasks running in the cluster can be added to it using the known “add method” or += operator in Scala. They cannot, however, read the value of it. The driver program has the ability to read the value of the accumulator, using the `value` method as shown below

scala> val accum = sc.accumulator(0, "Accumulator Example")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)

scala> accum.value
res4: Int = 6

 

Spark Broadcast and Spark Accumulators Example Driver Program in Scala

With this background on broadcast and accumulators, let’s take a look at more extensive examples in Scala.  The context of the following example code is developing a web server log file analyzer for certain types of http status codes. We can easily imagine the advantages of using Spark when processing a large volume of log file data.  See the Resources section below for source code download links.

Let’s start with an object containing the `main` method and definition of one broadcast variable and numerous accumulators:

object Boot {

 import utils.Utils._

 def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf(true)
     .setMaster("local[2]")
     .setAppName("SparkAnalyzer")

   val sparkContext = new SparkContext(sparkConf)

   /**
     * Defining list of all HTTP status codes divided into status groups
     * This list is read only, and it is used for parsing access log file in order to count status code groups
     *
     * This example of broadcast variable shows how broadcast value
     */
   val httpStatusList = sparkContext broadcast populateHttpStatusList

   /**
     * Definition of accumulators for counting specific HTTP status codes
     * Accumulator variable is used because of all the updates to this variable in every executor is relayed back to the driver.
     * Otherwise they are local variable on executor and it is not relayed back to driver
     * so driver value is not changed
     */
   val httpInfo = sparkContext accumulator(0, "HTTP 1xx")
   val httpSuccess = sparkContext accumulator(0, "HTTP 2xx")
   val httpRedirect = sparkContext accumulator(0, "HTTP 3xx")
   val httpClientError = sparkContext accumulator(0, "HTTP 4xx")
   val httpServerError = sparkContext accumulator(0, "HTTP 5xx")

   /**
     * Iterate over access.log file and parse every line
     * for every line extract HTTP status code from it and update appropriate accumulator variable
     */
   sparkContext.textFile(getClass.getResource("/access.log").getPath, 2).foreach { line =>
     httpStatusList.value foreach {
       case httpInfoStatus: HttpInfoStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpInfoStatus))) => httpInfo += 1
       case httpSuccessStatus: HttpSuccessStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpSuccessStatus))) => httpSuccess += 1
       case httpRedirectStatus: HttpRedirectStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpRedirectStatus))) => httpRedirect += 1
       case httpClientErrorStatus: HttpClientErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpClientErrorStatus))) => httpClientError += 1
       case httpServerErrorStatus: HttpServerErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpServerErrorStatus))) => httpServerError += 1
       case _ =>
     }
   }

   println("########## START ##########")
   println("Printing HttpStatusCodes result from parsing access log")
   println(s"HttpStatusInfo : ${httpInfo.value}")
   println(s"HttpStatusSuccess : ${httpSuccess.value}")
   println(s"HttpStatusRedirect : ${httpRedirect.value}")
   println(s"HttpStatusClientError : ${httpClientError.value}")
   println(s"HttpStatusServerError : ${httpServerError.value}")
   println("########## END ##########")

   sparkContext.stop()
 }

}

 

As you can hopefully see above, we plan to use the `httpStatusList` when determining which accumulator to update.

`populateHttpStatusList` is available from the import to Utils and looks like

 

object Utils {

  private val httpStatuses = List(
    "100", "101", "103",
    "200", "201", "202", "203", "204", "205", "206",
    "300", "301", "302", "303", "304", "305", "306", "307", "308",
    "400", "401", "402", "403", "404", "405", "406", "407", "408", "409", "410", "411", "412", "413", "414", "415", "416", "417",
    "500", "501", "502", "503", "504", "505", "511"
  )

  def populateHttpStatusList(): List[HttpStatus] = {
      httpStatuses map createHttpStatus
  }

  def createHttpStatus(status: String): HttpStatus = status match {
    case status if (status.startsWith("1")) => HttpInfoStatus(status)
    case status if (status.startsWith("2")) => HttpSuccessStatus(status)
    case status if (status.startsWith("3")) => HttpRedirectStatus(status)
    case status if (status.startsWith("4")) => HttpClientErrorStatus(status)
    case status if (status.startsWith("5")) => HttpServerErrorStatus(status)
  }

}

AccessLogParser could be considered a wrapper for regular expression boogie woogie as seen next

object AccessLogParser extends Serializable {
  import Utils._

  private val ddd = "\\d{1,3}"
  private val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
  private val client = "(\\S+)"
  private val user = "(\\S+)"
  private val dateTime = "(\\[.+?\\])"
  private val request = "\"(.*?)\""
  private val status = "(\\d{3})"
  private val bytes = "(\\S+)"
  private val referer = "\"(.*?)\""
  private val agent = "\"(.*?)\""
  private val accessLogRegex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
  private val p = Pattern.compile(accessLogRegex)

  /**
    * Extract HTTP status code and create HttpStatus instance for given status code
    */
  def parseHttpStatusCode(logLine: String): Option[HttpStatus] = {
    val matcher = p.matcher(logLine)
    if(matcher.find) {
      Some(createHttpStatus(matcher.group(6)))
    }
    else {
      None
    }
  }

}

The real credit for this regex mastery goes to alvinj at https://github.com/alvinj and in particular https://github.com/alvinj/ScalaApacheAccessLogParser/blob/master/src/main/scala/AccessLogParser.scala

Resources

 

Featured image credit https://flic.kr/p/wYHqe

IntelliJ Scala and Apache Spark – Well, Now You Know

Intellij Scala Spark

IntelliJ Scala and Spark Setup Overview

In this post, we’re going to review one way to setup IntelliJ for Scala and Spark development.  The IntelliJ Scala combination is the best, free setup for Scala and Spark development.  And I have nothing against ScalaIDE (Eclipse for Scala) or using editors such as Sublime.  I switched from Eclipse years ago and haven’t looked back.  I’ve also sincerely tried to follow the Pragmatic Programmer suggestion of using one editor (IDE), but I keep coming back to IntelliJ when doing JVM-based development.

But, you probably don’t really care about all my history, though.  Let’s get back to you.  You’re here to setup IntelliJ with Scala and hopefully use it with Spark, right?

In this tutorial, we’re going to try to go fast with lots of screenshots.  If you have questions or comments on how to improve, let me know.

Assumptions

I’m going to make assumptions about you in this post.

  1. You are not a newbie to programming and computers.  You know how to download and install software.
  2. You might need to update these instructions for your environment.  YMMV.  I’m not going to cover every nuance between Linux, OS X and Windows.  And no, I’m not going to cover SunOS vs Solaris for you old timers like me.
  3. You will speak up if you have questions or suggestions on how to improve.  There should be a comments section at the bottom of this post.
  4. You’re a fairly nice person who appreciates a variety of joke formats now and again.

If you have any issues or concerns with these assumptions, please leave now.  It will be better for both of us.

Prerequisites (AKA: Requirements for your environment)

Configuration Steps (AKA: Ándale arriba arriba)

  1. Start IntelliJ for first time
  2. Install Scala plugin
  3. Create New Project for Scala Spark development
  4. Scala Smoketest.  Create and run Scala HelloMundo program
  5. Scala Spark Smoketest.  Create and run a Scala Spark program
  6. Eat, drink and be merry

Ok, let’s go.

1. Start IntelliJ for first time

Is this your first time running IntelliJ?  If so, start here.  Otherwise, move to #2.

When you start IntelliJ for the first time, it will guide you through a series of screens similar to the following.

At one point, you will be asked if you would like to install the Scala plugin from “Featured” plugins screen such as this:

intellij scala

 

Do that.  Click Install to install the Scala plugin.

 

2. Install Scala plugin

If this is not the first time, you’ve launched IntelliJ and you do not have the Scala plugin installed, then stay here.  To install the Scala plugin, here’s a screencast how to do it from a Mac.  (Note: I already have it installed, so you need to check the box)

Intellij Scala Install Part 1 of 3

 

3. Create New Project for Scala Spark development

Ok, we want to create a super simple project to make sure we are on the right course.  Here’s a screencast of me being on the correct course for Scala Intellij projects

Intellij Scala Spark Simple Project Part 2 of 3

 

4. Create and Run Scala HelloMundo program

Well, nothing to see here.  Take a break if you want.  We are halfway home.  See the screencast in the previous step.  That’s it, because we ran the HelloMundo code in that screencast already.

 

5. Create and Run Scala Spark program

Let’s create another Scala object and add some Spark API calls to it.  Again, let’s make this as simple (AKA: KISS principle)  as possible to make sure we are on the correct course.  In this step, we create a new Scala object and import Spark jars as library dependencies in IntelliJ.  Everything doesn’t run perfectly, so watch how to address it in the video.  Oooh, we’re talking bigtime drama here people.  Hold on.

Here’s a screencast

Scala Intellij Spark Part 3 of 3

Did I surprise with the Scala 2.11 vs. Scala 2.10 snafu?  I don’t mean to mess with you.  Just trying to keep it interesting.  Check out the other Spark tutorials on this site or Spark with Scala course on where I deal with this fairly common scenario in much more detail.  This is a post about Intellij Scala and Spark.

Notice how I’m showing that I have a Standalone Spark cluster running.  You need to have one running in order for this Spark Scala example to run correctly.  See Standalone Spark cluster if need some help with this setup.

Code for the Scala Spark program

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

/**
  * Created by toddmcgrath on 6/15/16.
  */
object SimpleScalaSpark {

  def main(args: Array[String]) {
    val logFile = "/Users/toddmcgrath/Development/spark-1.6.1-bin-hadoop2.4/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }

}

 

6. Conclusion (AKA: eat, drink and be merry)

You’re now set.  Next step for you might be adding SBT into the mix.  But, for now, let’s just enjoy this moment.  You just completed Spark with Scala in IntelliJ.

If you have suggestions on how to improve this tutorial or any other feedback or ideas, let me know in the comments below.

 

Scala with IntelliJ Additional Resources

For those looking for even more Scala with IntelliJ, you might find the Just Enough Scala for Apache Spark course valuable.

Apache Spark, Cassandra and Game of Thrones

Spark Cassandra tutorial

Apache Spark with Cassandra is a powerful combination in data processing pipelines.  In this post, we will build a Scala application with the Spark Cassandra combo and query battle data from Game of Thrones.  Now, we’re not going to make any show predictions!   But, we will show the most aggressive kings as well as kings which were attacked the most.  So, you’ve got that goin for you.

Overview

Our primary focus is the technical highlights of Spark Cassandra integration with Scala.  To do so, we will load up Cassandra with Game of Thrones battle data and then query it from Spark using Scala.  We’ll use Spark from both a shell as well as deploying a Spark Driver program to a cluster.  We’ll have examples of Scala case class marshalling courtesy of the DataStax connector as well as using SparkSQL to produce DataFrames.   We’re also mix in some sbt configuration as well.

There are screencasts and relevant links at the bottom of this post in the “Resources” section.

The intended audience of this Spark Cassandra tutorial is someone with beginning to intermediate experience with Scala and Apache Spark.  If you would like to reach this level quickly and efficiently, please check out our On-Demand Apache Spark with Scala Training Course.

Pre-Requisites

  1. Apache Cassandra (see resources below)
  2. Downloaded Game of Thrones data (see resources below)
  3. Apache Spark

Outline

  1. Import data into Cassandra
  2. Write Scala code
  3. Test Spark Cassandra code in SBT shell
  4. Deploy Spark Cassandra to Spark Cluster with SBT and `spark-submit`

Spark Cassandra Example

Part 1: Prepare Cassandra

Let’s import the GOT battle data into Cassandra.  To keep things simple, I’m going to use a local running Cassandra instance.  I started Cassandra with bin/cassandra script on my Mac.  (use cassandra.bat on Windows, but you knew that already.).

Next, connect to Cassandra with cqlsh and create a keyspace to use.  This tutorial creates a “gameofthrones” keyspace:

CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

From here, we create a table for the battle data.

CREATE TABLE gameofthrones.battles (
name TEXT,
year INT,
battle_number INT,
attacker_king TEXT,
defender_king TEXT,
attacker_1 TEXT,
attacker_2 TEXT,
attacker_3 TEXT,
attacker_4 TEXT,
defender_1 TEXT,
defender_2 TEXT,
defender_3 TEXT,
defender_4 TEXT,
attacker_outcome TEXT,
battle_type TEXT,
major_death TEXT,
major_capture TEXT,
attacker_size TEXT,
defender_size TEXT,
attacker_commander TEXT,
defender_commander TEXT,
summer TEXT,
location TEXT,
region TEXT,
note TEXT,
PRIMARY KEY(battle_number)
);

Then import the battles data using Cassandra COPY shown below.  (see Resouces section below for where to download data).  BTW, I needed to run a Perl script to update the end-of-line encodings from Mac to Unix on the CSV file using perl -pi -e 's/\r/\n/g.  Your mileage may vary.

COPY battles (
name,
year,
battle_number,
attacker_king,
defender_king,
attacker_1,
attacker_2,
attacker_3,
attacker_4,
defender_1,
defender_2,
defender_3,
defender_4,
attacker_outcome,
battle_type,
major_death,
major_capture,
attacker_size,
defender_size,
attacker_commander,
defender_commander,
summer,
location,
region,
note)
FROM 'battles.csv'  // update this location as necessary
WITH HEADER = true;

That wraps Part 1.  Let’s move on to Part 2 where we’ll write some Scala code.

Part 2: Spark Cassandra Scala Code

(Note: All of the following sample code if available from Github.  Link in Resources section below.)

To begin, let’s layout the skeleton structure of the project –

mkdir got-battles // name it anything you'd like

cd got-battles  // if you named it got-battles

mkdir src

mkdir src/main

mkdir src/main/scala

mkdir src/main/scala/com

mkdir src/main/scala/com/supergloo

mkdir project

Next, we’re going to add some files for sbt and the sbt-assembly plugin.

First the build file for sbt

got-battles/build.sbt file:

name := "spark-cassandra-example"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

// https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnU
assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
  (old) => {
    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
    case x => old(x)
  }
}

scalaVersion := "2.10.6"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
// use provided line when building assembly jar
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
// comment above line and uncomment the following to run in sbt
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"
)

and the 1 line got-battles/project/assembly.sbt file:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

And now let’s create the Spark driver code in got-battles/src/main/scala/com/supergloo called SparkCassandra.scala

package com.supergloo
 
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
 
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd._
import org.apache.spark.sql.cassandra._


/**
  * Simple Spark Cassandra 
  * One example with Scala case class marshalling
  * Another example using Spark SQL 
  */
object SparkCassandra {

  case class Battle(    
    battle_number: Integer,
    year: Integer,
    attacker_king: String,
    defender_king: String
  )

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("SparkCassandraExampleApp")

    if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5]

    conf.set("spark.cassandra.connection.host", "127.0.0.1")  // so yes, I'm assuming Cassandra is running locally here.
                                                              // adjust as needed

    val sc = new SparkContext(conf)

    // Spark Cassandra Example one which marshalls to Scala case classes
    val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles").
                                        select("battle_number","year","attacker_king","defender_king").toArray
    
    battles.foreach { b: Battle =>
        println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king))
    }


    // Spark Cassandra Example Two - Create DataFrame from Spark SQL read
    val sqlContext = new SQLContext(sc)

    val df = sqlContext.read
              .format("org.apache.spark.sql.cassandra")
              .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" ))
              .load()

    df.show

    // Game of Thrones Battle analysis 

    // Who were the most aggressive kings?  (most attacker_kings)
    val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count"))
    countsByAttack.show()

    // Which kings were attacked the most?  (most defender_kings)
    val countsByDefend = df.groupBy("defender_king").count().sort(desc("count"))
    countsByDefend.show()

    sc.stop()
    
  }
}

 

Part 3: Run Spark Cassandra Scala Code from SBT Console

Start up the sbt console via sbt.  Once ready, you can issue the run command with an argument for your Spark Master location; i.e. run local[5]

(Again, there’s a screencast at the end of this post which shows an example of running this command.  See Resources section below.)

Depending on your log level, you should see various SparkCassandra outputs from the SparkCassandra code.  These console outputs from Cassandra is the indicator of success.  Oh yeah.  Say it with me now.  Oh yeahhhhh

Running code in the sbt console is a convenient way to make and test changes rapidly.  As I developed this code, there was a terminal open in one window and an editor open in another window.  Whenever I made a Scala source code change and saved, I could simply re-run in the sbt console.

So now, let’s say we’ve reached the point of wanting to deploy this Spark program.  Let’s find out in the next section.

Part 4: Assemble Spark Cassandra Scala Code and Deploy to Spark Cluster

To build a jar and deploy to a Spark cluster, we need to make a small change to our build.sbt file.  As you may have noticed from the code above, there are comments in the file which indicate what needs to be changed.  We should uncomment this line:

// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",

and comment out this line:

   "org.apache.spark" %% "spark-sql" % "1.6.1",

then, we can run sbt assembly from command-line to produce a Spark deployable jar.  If you use the sample build.sbt file this will produce target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

To deploy, use spark-submit with the appropriate arguments; i.e.

spark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

Conclusion

So, what do you think?  When you run the code, you can see the most aggressive kings and the kings which were attacked the most.  Without giving it away, I think one could argue whether Mance Rayder should be tied with Renly Baratheon on the most attacked list.  But, that’s not really the point of this tutorial.  As for the code and setup, do you have any questions, opinions or suggestions for next steps?   

Spark Cassandra Tutorial Screencast

In the following screencast, I run through the steps described in this tutorial.  Stay tuned because there is blooper footage at the end of the screencast.  Because I mean, why not bloopers.

Spark Cassandra Tutorial

 

Spark Cassandra Tutorial Resources

  1. All source code including the battles.csv file I scrubbed using the perl script described above at Apache Spark Cassandra Example code
  2. https://github.com/datastax/spark-cassandra-connector
  3. DataFrames with Cassandra Connector
  4. Game of Thrones Data: https://github.com/chrisalbon/war_of_the_five_kings_dataset

 

 

Featured image credit https://flic.kr/p/dvpku1

 

Spark Scala with 3rd Party JARs Deploy to a Cluster

Spark Apache Cluster Deploy with 3rd Party Jars

Overview

In this Apache Spark cluster deploy tutorial, we’ll cover how to deploy Spark driver programs to a Spark cluster when the driver program utilizes third-party jars.  In this case, we’re going to use code examples from previous Spark SQL and Spark Streaming tutorials.

At the end of this tutorial, there is a screencast of all the steps.  Also, see Reference section below for Apache Spark Cluster Deploy Part I and II, source code reference and links to the Spark SQL and Spark Streaming tutorials.

Steps

  1. Source code layout
  2. Install assembly plugin
  3. Review build.sbt file
  4. Package Spark Driver Program for Deploy
  5. Deploy to Spark Cluster

1. Source Code Layout

As you see, there is nothing extraordinary about our source code layout.  We’re going to build with `sbt`, so there are the usual suspect directories and files including: src/main/scala, project, project/build.properites and build.sbt file.

2. Install SBT Assembly Plugin

In order to package our 3rd party dependencies into a convenient “fat jar”, we’re going to install and use the sbt-assembly plugin.  The plugin is described as “The goal is simple: Create a fat JAR of your project with all of its dependencies”.  Installing this plugin is simple, just create a file called `assembly.sbt` and add it to your project/ directory.  In our case, the file contains one line:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

3. Review build.sbt file for Apache Spark Cluster Deploy

name := "spark-sql-examples"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
  "com.databricks" %% "spark-csv" % "1.3.0",
  "mysql" % "mysql-connector-java" % "5.1.12"
)

Again, links to source code are included below in the reference section.  At the root of the spark-sql/ directory, there is the above `build.sbt` file.  In this file, there are a couple of lines worth discussing.  First, the line beginning with `assemblyOption` is an instruction to our sbt-assembly plugin to not include Scala jars in the “fat jar” we’re going to build for Spark deploy.

Next, notice how we indicate the “spark-sql” library is already provided.

"org.apache.spark" %% "spark-sql" % "1.6.1" % "provided"

This indicates to sbt-assembly to not include it in the jar we’re going to assemble for deploy.

Everything else seems fairly standard to me, but let me know if you have any questions in the comments.  Let’s keep going.

4. Package Spark Driver Program for Deploy

From your shell or editor, simply run `sbt assembly` to produce the fat jar.  In this case, the file created will be target/scala-2.11/spark-sql-examples-assembly-1.0.jar.  This is the jar will deploy with `spark-submit`.

5. Deploying Fat Jar to the Spark Cluster

Nothing out of the ordinary here.  Just run `spark-submit` using the jar produced in Step 4.  For example

spark-1.6.1-bin-hadoop2.4/bin/spark-submit --class "com.supergloo.SparkSQLJDBCApp" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.11/spark-sql-examples-assembly-1.0.jar

Screencast

Here’s a screencast from the Apache Spark with Scala training course which performs the steps above.

Spark Deploy with 3rd Party Jars

Resources

If interested in further Apache Spark with Scala training, make sure to checkout out our Apache Spark with Scala training course

Featured image credit: https://flic.kr/p/qsyGca

Apache Spark Cluster Part 2: Deploy Scala Program to Spark Cluster

How do you deploy a Scala program to a Spark Cluster?  In this tutorial, we’ll cover how to build, deploy and run a Scala driver program to a Spark Cluster.  The focus will be on a simple example in order to gain confidence and set the foundation for more advanced examples in the future.   To keep things interesting, we’re going to add some SBT and Sublime 3 editor for fun.

This post assumes Scala and SBT experience, but if not, it’s a chance to gain further understanding of the Scala language and simple build tool (SBT).

Requirements

Steps to Deploy Scala Program to Spark Cluster

1. Create a directory for the project: mkdir sparksample

2. Create some directories for SBT:

cd sparksample

mkdir project

mkdir src/main/scala

Ok, so you should now be in the sparksample directory and have project/ and src/ dirs.

(3. We’re going to sprinkle this Spark tutorial with using Sublime 3 text editor and SBT plugins.  So, this step isn’t necessary for deploying a scala program to a spark cluster.  This is an optional step.)

In any text editor, create a plugins.sbt file in projects directory.

Add the sublime plugin according to: Added sublime plugin:https://github.com/orrsella/sbt-sublime)

4. Create a SBT file in root directory.  For this tutorial, the root directory is sparksample/.  Name the file “sparksample.sbt” with the following content

name := "Spark Sample"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"

 

5. Create a file named SparkPi.scala in the src/main/scala directory.  Because this is an introductory tutorial, let’s keep things simple and cut-and-paste this code from the Spark samples.  The code is:

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = 100000 * slices 
    val count = spark.parallelize(1 to n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

 

6. Start SBT from command prompt: sbt

Running sbt may trigger many file downloads of 3rd party library jars.  It depends on if you attempted something similar with SBT in the past and whether your local cache already has the files.

(If you want to continue with Sublime example, run the ‘gen-sublime’ command from SBT console and open the Sublime project.  In the next step, step 6, you can create the sample scala code in Sublime.)

7. In SBT console, run ‘package’ to create a jar.  The jar will be created in the target/ directory.  Note the name of the generated jar; if you follow the previous sparksample.sbt step exactly, the filename wil be spark-sample_2.10-1.0.jar

8. Exit SBT, or in a different terminal window, call the “spark-submit” script with the appropriate –master arg value.  For example:

../spark-1.6.1-bin-hadoop2.4/bin/spark-submit --class "SparkPi" --master spark://todd-mcgraths-macbook-pro.local:7077 target/scala-2.10/spark-sample_2.10-1.0.jar

So, in this example, it’s safe to presume I have the following directory structure:

parentdir

-spark-1.6.1-bin/hadoop2.4

-sparksample

We can assume this because I’m running ../spark-1.6.0-bin-hadoop2.4/bin/spark-submit from the sparksample directory.

9.  You should see output “Pi is roughly…” and if you goto Spark UI, you should see the “Spark Pi” in completed applications:

Spark Cluster completed application
Completed Application after running in Spark Cluster

Conclusion

That’s it.  You’ve built, deployed and ran a Scala driver program to Spark Cluster.  Simple, I know, but with this experience, you are in good position to move to more complex examples and use cases.  Let me know if you have any questions in the comments below.

Screencast

Here’s a screencast of the steps above:

Spark deploy jar to cluster example

 

Further Reference

  • http://spark.apache.org/docs/latest/submitting-applications.html

Apache Spark Cluster Part 1: Run Standalone

Spark console

Running an Apache Spark Cluster on your local machine is natural, early step towards Apache Spark proficiency.  Let’s start understanding Spark cluster options by to running a cluster on a local machine.  Running a local cluster is called “standalone” mode.  This post will describe pitfalls to avoid and review how to run Spark Cluster locally, deploy to a local running Spark cluster, describe fundamental cluster concepts like Masters and Workers and finally set the stage for more advanced cluster options.

Let’s begin

1. Start Master from a command prompt *

./sbin/start-master.sh

You should see something like the following:

starting org.apache.spark.deploy.master.Master, logging to /Users/toddmcgrath/Development/spark-1.1.0-bin-hadoop2.4/sbin/../logs/spark-toddmcgrath-org.apache.spark.deploy.master.Master-1-todd-mcgraths-macbook-pro.local.out

Open this file to check things out.  You should be able to determine that http://localhost:8080 is now available for viewing:

Spark UI
Spark UI

 

The Spark Application Master is responsible for brokering resource requests by finding a suitable set of workers to run the Spark applications.

 

2. Start a Worker

todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ bin/spark-class org.apache.spark.deploy.worker.Worker spark://todd-mcgraths-macbook-pro.local:7077

 

Gotcha Warning: I tried shortcut to starting a Spark Worker by expecting some defaults.   I made my first screencast here: http://youtu.be/pUB620wcqm0

Three fails:

1. bin/spark-class org.apache.spark.deploy.worker.Worker

2.bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

3. bin/spark-class org.apache.spark.deploy.worker.Worker spark://127.0.0.1:7077

Finally, I tried using the URL from console:

toddmcgrath$ bin/spark-class org.apache.spark.deploy.worker.Worker spark://todd-mcgraths-macbook-pro.local:7077

———

Verify the Worker by viewing http://localhost:8080.  You should see the worker:

spark-worker-running

 

Spark Workers are responsible for processing requests sent from the Spark Master.

 

3.  Connect REPL to Spark Cluster (KISS Principle)

todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ ./bin/spark-shell --master spark://todd-mcgraths-macbook-pro.local:7077

If all goes well, you should see something similar to the following:

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
14/12/06 12:44:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2014-12-06 12:44:33.306 java[22811:1607] Unable to load realm info from SCDynamicStore
14/12/06 12:44:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context available as sc.

 

And there you are.  Ready to proceed.  For more detailed analysis of standalone configuration options and scripts, see https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts

This example of running a spark cluster locally is to ensure we’re ready to take on more difficult concepts such as using cluster managers such as YARN and Mesos.  Also, we’ll cover configuring a Spark cluster at Amazon.

Also, before we move on to more advance Spark cluster setups, we’ll cover deploying and running a driver program to a Spark cluster.

 

 

* This post will use a Mac, so translate to your OS accordingly.

Apache Spark: Examples of Actions

Spark Action Examples

Unlike Transformations which produce RDDs, action functions produce a value back to the Spark driver program.  Actions may trigger a previously constructed, lazy RDD to be evaluated.


reduce
collect
count
first
take
takeSample
countByKey
saveAsTextFile

reduce(func)

Aggregate the elements of a dataset through func

scala> val names1 = sc.parallelize(List("abe", "abby", "apple"))
names1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1467] at parallelize at <console>:12

scala> names1.reduce((t1,t2) => t1 + t2)
res778: String = abbyabeapple

scala> names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2)
res779: Int = 12

// another way to show

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1473] at map at <console>:14

scala> names2.flatMap(t => Array(t._2)).reduce(_ + _)
res783: Int = 19

map API signature with stripped implicits: map[U](f: (T) ⇒ U): RDD[U]

Back to Top

collect(func)

collect returns the elements of the dataset as an array back to the driver program.

collect is often used in previously provided examples such as Spark Transformation Examples in order to show the values of the return.  The REPL, for example, will print the values of the array back to the console.  This can be helpful in debugging programs.

Examples

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

Formal API : collect(): Array[T]

Return an array containing all of the elements in this RDD.

Back to Top

count()

Number of elements in the RDD

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1476] at parallelize at <console>:12

scala> names2.count
res784: Long = 3

Back to Top

first()

Return the first element in the RDD

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1477] at parallelize at <console>:12

scala> names2.first
res785: String = apple

Back to Top

take(n)

From Spark Programming Guide,”Return an array with the first n elements of the dataset.”

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1478] at parallelize at <console>:12

scala> names2.take(2)
res786: Array[String] = Array(apple, beatty)

Back to Top

takeSample(withReplacement: Boolean, n:Int, [seed:Int]): Array[T]

Similar to take, in return type of array with size of n.  Includes boolean option of with or without replacement and random generator seed.

scala> val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
teams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1482] at parallelize at <console>:12

scala> teams.takeSample(true, 3)
res789: Array[String] = Array(white sox, twins, white sox)

scala> teams.takeSample(true, 3)
res790: Array[String] = Array(cubs, white sox, twins)

Back to Top

 

countByKey()

This is only available on RDDs of (K,V) and returns a hashmap of (K, count of K)

scala> val hockeyTeams = sc.parallelize(List("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
hockeyTeams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:12

scala> hockeyTeams.map(k => (k,1)).countByKey
res0: scala.collection.Map[String,Long] = Map(jets -> 1, blackhawks -> 1, red wings -> 1, oilers -> 1, whalers -> 1, wild -> 3)

CountByKey would have been helpful to show in most popular baby names spark example from an earlier post.

scala> val babyNamesToTotalCount = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
babyNamesToTotalCount: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[21] at map at <console>:12

scala> babyNamesToTotalCount.countByKey
res2: scala.collection.Map[String,Long] = Map(JADEN -> 65, KACPER -> 2, RACHEL -> 63, JORDYN -> 33, JANA -> 1, CESIA -> 1, IBRAHIM -> 22, LUIS -> 65, DESMOND -> 5, AMANI -> 6, ELIMELECH -> 7, LILA -> 39, NEYMAR -> 1, JOSUE -> 31, LEELA -> 1, DANNY -> 25, GARY -> 3, SIMA -> 10, GOLDY -> 14, SADIE -> 41, MARY -> 40, LINDSAY -> 10, JAKOB -> 2, AHARON -> 5, LEVI -> 39, MADISYN -> 3, HADASSAH -> 5, MALIA -> 10, ANTONIA -> 2, RAIZY -> 16, ISAIAS -> 1, AMINA -> 9, DECLAN -> 33, GILLIAN -> 1, ARYANA -> 1, GRIFFIN -> 25, BRYANNA -> 6, SEBASTIEN -> 1, JENCARLOS -> 1, ELSA -> 1, HANA -> 3, MASON -> 194, SAVANNA -> 6, ROWAN -> 6, DENNIS -> 15, JEROME -> 1, BROOKLYNN -> 2, MIRANDA -> 11, KYLER -> 1, HADLEY -> 2, STEPHANIE -> 46, CAMILA -> 45, MAKENNA -> 3, CARMINE -> 5, KATRINA -> 1, AMALIA -> 1, EN...

Back to Top

saveAsTextFile(filepath)

Write out the elements of the data set as a text file in a filepath directory on the filesystem, HDFS or any other Hadoop-supported file system.

scala> val onlyInterestedIn = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
onlyInterestedIn: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[27] at map at <console>:12

scala> onlyInterestedIn.saveAsTextFile("results.csv")

Produces:

todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ ls -al results.csv/
total 824
drwxr-xr-x   8 toddmcgrath  staff     272 Dec  3 06:53 .
drwxr-xr-x@ 17 toddmcgrath  staff     578 Dec  3 06:54 ..
-rw-r--r--   1 toddmcgrath  staff       8 Dec  3 06:53 ._SUCCESS.crc
-rw-r--r--   1 toddmcgrath  staff    1600 Dec  3 06:53 .part-00000.crc
-rw-r--r--   1 toddmcgrath  staff    1588 Dec  3 06:53 .part-00001.crc
-rw-r--r--   1 toddmcgrath  staff       0 Dec  3 06:53 _SUCCESS
-rw-r--r--   1 toddmcgrath  staff  203775 Dec  3 06:53 part-00000
-rw-r--r--   1 toddmcgrath  staff  202120 Dec  3 06:53 part-00001

Back to Top

 

 

Apache Spark: Examples of Transformations

Spark Transformation Examples

Spark Transformation Examples

Transformation functions produce a new Resilient Distributed Dataset (RDD).  Resilient distributed datasets are Spark’s main programming abstraction.  RDDs are automatically parallelized across the cluster.

In the Scala Spark transformation code examples below, it could be very helpful for you reference the previous post in the Spark with Scala tutorials; especially when there are references to baby_names.csv file.


map
flatMap
filter
mapPartitions
mapPartitionsWithIndex
sample
Hammer Time (Can’t Touch This)
union
intersection
distinct
The Keys (To Success? The Florida Keys? To the Castle?)
groupByKey
reduceByKey
aggregateByKey
sortByKey
join

map(func)

What does it do? Pass each element of the RDD through the supplied function; i.e. `func`

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[360] at map at <console>:14

What did this example do?  Iterates over every line in the babyNames RDD (originally created from baby_names.csv file) and splits into new RDD of Arrays of Strings.  The arrays contain a String separated by comma characters in the source RDD (CSV).

Back to Top

flatMap(func)

“Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).”

Compare flatMap to map in the following

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

`flatMap` is helpful with nested datasets.  It may be beneficial to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections).  This is unlike CSV which has no hierarchical structural.

By the way, these examples may blur the line between Scala and Spark for you.  These examples highlight Scala and not necessarily Spark.   In a sense, the only Spark specific portion of this code example is the use of `parallelize` from a SparkContext.  When calling `parallelize`, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel.  Being able to operate in parallel is a Spark feature.

Adding `collect` to both the `flatMap` and `map` results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use `collect` as seen in the following:

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x))
res202: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[373] at flatMap at <console>:13

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x))
res203: org.apache.spark.rdd.RDD[List[Int]] = MappedRDD[375] at map at <console>:13

Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

Back to Top

filter(func)

Filter creates a new RDD by passing in the supplied func used to filter the results.  For those people with relational database background or coming from a SQL perspective, it may be helpful think of `filter` as the `where` clause in a SQL statement.

Spark filter examples

val file = sc.textFile("catalina.out")
val errors = file.filter(line => line.contains("ERROR"))

Formal API: filter(f: (T) ⇒ Boolean): RDD[T]

Back to Top

mapPartitions(func)

Consider `mapPartitions` a tool for performance optimization if you have the horsepower.  It won’t do much for you when running examples on your local machine compared to running across a cluster.  It’s the same as `map`, but works with Spark RDD partitions.  Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets.  Or, put another way, you could say it is distributed over partitions.

// from laptop
scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[450] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res383: Array[Int] = Array(1, 4, 7)

// compare to the same, but with default parallelize
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[452] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res384: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]

Back to Top

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides a function with an Int value to indicate the index position of the partition.

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[455] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res389: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9)

When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:

scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[457] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res390: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

Formal API signature (implicts stripped) and definition from Spark Scala API docs:

mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

“Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn’t modify the keys.”

Back to Top

sample(withReplacement,fraction, seed)

Return a random sample subset RDD of the input RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[470] at parallelize at <console>:12

scala> parallel.sample(true,.2).count
res403: Long = 3

scala> parallel.sample(true,.2).count
res404: Long = 2

scala> parallel.sample(true,.1)
res405: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[473] at sample at <console>:15

Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

Back to Top

The Next Three (AKA: Hammer Time)

Stop.  Hammer Time.  The next three functions (union, intersection and distinct) really play well off of each other when describing.  Can’t Touch this.

union(a different rdd)

Simple.  Return the union of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).collect
res408: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Back to Top

intersection(a different rdd)

Simple.  Similar to union but return the intersection of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.intersection(par2).collect
res409: Array[Int] = Array(8, 9, 5, 6, 7)

Formal API: intersection(other: RDD[T]): RDD[T]

Back to Top

distinct([numTasks])

Another simple one.  Return a new RDD with distinct elements within a source RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).distinct.collect
res412: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Formal API: distinct(): RDD[T]

Back to Top

The Keys

The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key,value RDDs.  So, this section will be known as “The Keys”.  Cool name, huh?  Well, not really, but it sounded much better than The Keys and the Values which for some unexplained reason, triggers memories of “The Young and the Restless”.)

The following key functions are available though org.apache.spark.rdd.PairRDDFunctions which are operations available only on RDDs of key-value pairs.  “These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when you import org.apache.spark.SparkContext._.”

For the following, we’re going to use the baby_names.csv file introduced in previous post What is Apache Spark?

All the following examples presume the baby_names.csv file has been loaded and split such as:

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[495] at textFile at <console>:12

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[496] at map at <console>:14

Back to Top

groupByKey([numTasks])

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. ”

The following groups all names to counties in which they appear over the years.

scala> val namesToCounties = rows.map(name => (name(1),name(2)))
namesToCounties: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[513] at map at <console>:16

scala> namesToCounties.groupByKey.collect
res429: Array[(String, Iterable[String])] = Array((BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)), (MATTEO,CompactBuffer(NEW YORK, SUFFOLK, NASSAU, KINGS, WESTCHESTER, WESTCHESTER, KINGS, SUFFOLK, NASSAU, QUEENS, QUEENS, NEW YORK, NASSAU, QUEENS, KINGS, SUFFOLK, WESTCHESTER, WESTCHESTER, SUFFOLK, KINGS, NASSAU, QUEENS, SUFFOLK, NASSAU, WESTCHESTER)), (HAZEL,CompactBuffer(ERIE, MONROE, KINGS, NEW YORK, KINGS, MONROE, NASSAU, SUFFOLK, QUEENS, KINGS, SUFFOLK, NEW YORK, KINGS, SUFFOLK)), (SKYE,CompactBuffer(NASSAU, KINGS, MONROE, BRONX, KINGS, KINGS, NASSAU)), (JOSUE,CompactBuffer(SUFFOLK, NASSAU, WESTCHESTER, BRONX, KINGS, QUEENS, SUFFOLK, QUEENS, NASSAU, WESTCHESTER, BRONX, BRONX, QUEENS, SUFFOLK, KINGS, WESTCHESTER, QUEENS, NASSAU, SUFFOLK, BRONX, KINGS, ...

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

reduceByKey(func, [numTasks])

Operates on (K,V) pairs of course, but the func must be of type (V,V) => V

Let’s sum the yearly name counts over the years in the CSV.  Notice we need to filter out the header row.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...

Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Ok, I admit, this one drives me a bit nuts.  Why wouldn’t we just use reduceByKey?  I don’t feel smart enough to know when to use aggregateByKey over reduceByKey.  For example, the same results may be produced:

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...


scala> filteredRows.map ( n => (n(1), n(4))).aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).sortBy(_._2).collect

And again,  the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

There’s a gist of aggregateByKey as well.

Back to Top

sortByKey([ascending], [numTasks])

This simply sorts the (K,V) pair by K.  Try it out.  See examples above on where babyNames originates.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey().foreach (println _)

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey(false).foreach (println _) // opposite order

Back to Top

join(otherDataset, [numTasks])

If you have relational database experience, this will be easy.  It’s joining of two datasets.  Other joins are available as well such as leftOuterJoin and rightOuterJoin.

scala> val names1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1))
names1: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1441] at map at <console>:14

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1443] at map at <console>:14

scala> names1.join(names2).collect
res735: Array[(String, (Int, Int))] = Array((apple,(1,1)))

scala> names1.leftOuterJoin(names2).collect
res736: Array[(String, (Int, Option[Int]))] = Array((abby,(1,None)), (apple,(1,Some(1))), (abe,(1,None)))

scala> names1.rightOuterJoin(names2).collect
res737: Array[(String, (Option[Int], Int))] = Array((apple,(Some(1),1)), (beatty,(None,1)), (beatrice,(None,1)))

 

Back to Top

 

Featured image credit https://flic.kr/p/8R8uP9