Spark FAIR Scheduler Example

Spark FAIR Scheduler Example

Scheduling in Spark can be a confusing topic.  When someone says “scheduling” in Spark, do they mean scheduling applications running on the same cluster?  Or, do they mean the internal scheduling of Spark tasks within the Spark application?  So, before we cover an example of utilizing the Spark FAIR Scheduler, let’s make sure we’re on the same page in regards to Spark scheduling.

In this Spark Fair Scheduler tutorial, we’re going to cover an example of how we schedule certain processing within our application with higher priority and potentially more resources.

What is the Spark FAIR Scheduler?

By default, Spark’s internal scheduler runs jobs in FIFO fashion.  When we use the term “jobs” in describing the default scheduler, we are referring to internal Spark jobs within the Spark application.   The use of the word “jobs” is often intermingled between a Spark application a Spark job.  But, applications vs jobs are two very different constructs.  “Oyy yoy yoy” as my grandma used to say when things became more complicated.  Sometimes it’s difficult to translate Spark terminology sometimes.  We are talking about jobs in this post.

Anyhow, as we know, jobs are divided into stages and the first job gets priority on all available resources. Then, the second job gets priority, etc.  As a visual review, the following diagram shows what we mean by jobs and stages.  

Spark Fair Scheduler
Spark Internals

Notice how there are multiple jobs.  We know this because the “Jobs” tab in the Spark UI as well.

If the jobs at the head of the queue are long-running, then later jobs may be delayed significantly.

This is where the Spark FAIR scheduler comes in…

The FAIR scheduler supports the grouping of jobs into pools.  It also allows setting different scheduling options (e.g. weight) for each pool. This can be useful to create high priority pools for some jobs vs others.  This approach is modeled after the Hadoop Fair Scheduler.

How do we utilize the Spark FAIR Scheduler?

Let’s run through an example of configuring and implementing the Spark FAIR Scheduler.  The following are the steps we will take

    • Run a simple Spark Application and review the Spark UI History Server
    • Create a new Spark FAIR Scheduler pool in an external XML file
    • Set the `spark.scheduler.pool` to the pool created in external XML file
    • Update code to use threads to trigger use of FAIR pools and rebuild
    • Re-deploy the Spark Application with
        • `spark.scheduler.mode` configuration variable to FAIR
      • `spark.scheduler.allocation.file` configuration variable to point to the XML file
  • Run and review Spark UI History Server

Here’s a screen case of me running through all these steps

How to Spark FAIR scheduler

Also, for more context, I’ve outlined all the steps below.

Run a simple Spark Application with default FIFO settings

In this tutorial on Spark FAIR scheduling, we’re going to use a simple Spark application.  The code reads in a bunch of CSV files about 850MB and calls a `count` and prints out values.  In the screencast above, I was able to verify the use of pools in the regular Spark UI but if you are using a simple Spark application to verify and it completes you may want to utilize the Spark History Server to monitor metrics.  (By the way, see the Spark Performance Monitor with History Server tutorial for more information on History Server).

Create a new Spark FAIR Scheduler pool

There is more than one way to create FAIR pools.  In this example, we will create a new file with the following content

<?xml version="1.0"?>

  <pool name="fair_pool">
  <pool name="a_different_pool">

Save this file to the file system so we can reference it later.

A note about the file options.  Hopefully obvious, but we configure pools in the `pool` nodes and give it a name.  Then we have three options for each pool:

  • `schedulingMode` — which is either FAIR or FIFO
  • `weight` — Controls this pool’s share of the cluster relative to other pools. Pools have a weight of 1 by default. Giving a specific pool a weight of 2, for example, it will get 2x more resources as other active pools
  • `minShare` — Pools can be set a minimum share of CPU cores to allocate
Update code to utilize the new FAIR POOls

The code in use can be found on my work-in-progress Spark 2 repo

Set Scheduler Configuration During Spark-Submit

We’re going to add two configuration variables when we re-run our application:

  • `spark.scheduler.mode` configuration variable to FAIR
  • `spark.scheduler.allocation.file` configuration variable to point to the previously created XML file
Verify Pools are being utilized

Let’s go back to the Spark UI and review while the updated application with new spark-submit configuration variables is running.  We can now see the pools are in use!  Just in case you had any doubt along the way, I did believe we could do it.  Never doubted it.


I hope this simple tutorial on using the Spark FAIR Scheduler was helpful.  If you have any questions or suggestions, please let me know in the comments section below.

Further reference

Featured image credit

Apache Spark Thrift Server Load Testing Example

Spark Thrift Server Stress Test Tutorial

Wondering how to do perform stress tests with Apache Spark Thrift Server?  This tutorial will describe one way to do it.

What is Apache Spark Thrift Server?  

Apache Spark Thrift Server is based on the Apache HiveServer2 which was created to allow JDBC/ODBC clients to execute SQL queries using a Spark Cluster.  From my experience, these “clients” are typically business intelligence tools such as Tableau and they are most often only a portion of the overall Spark architecture.  In other words, the Spark cluster is primarily used for streaming and batch aggregation jobs and any JDBC/ODBC client access via Thrift Server to the cluster is secondary at best.

For more information on Apache Thrift Server and example use case, see the previous Spark Thrift Server tutorial.

Apache Spark Thrift Server Load Testing example Overview
How do simulate anticipated load on our Apache Spark Thrift Server?  In this post, we are going to use an open source tool called Gatling.  Check out the References section at the bottom of this post for links to Gatling.
At a high level, this Spark Thrift with Gatling tutorial will run through all the following steps:
    1. Confirm our environment (Spark, Cassandra, Thrift Server)
    1. Compile our Gatling based load testing code
  1. Run a sample Spark Thrift load test
Setup and configure our environment of Spark, Cassandra, and Thrift Server

If you are at the point of load testing Apache Spark Thrift Server, I’m going to assume you are already familiar with the setup of Spark, Cassandra or some other backed such as Hive or Parquet.  Therefore, I’m going to just run through the steps to start everything up in my local environment.  Adjust the following to best match your environment.

Confirm Environment
1. Start Cassandra

For this tutorial, we’re going to use the killrweather video sample keyspace and queries created in the previous Spark Thrift Server with Cassandra post.  You need to go through that tutorial first.  This post assumes you have already created and loaded the data, so all we need to do now is start cassandra if it is not already running.


2. Start your Spark Master, at least one Worker and the Thrift Server

If your Spark cluster is not already running, then start it up


`$SPARK_HOME/sbin/ spark://<spark-master>:7077`

3. Start the Thrift Server and set configuration for Cassandra

`$SPARK_HOME/sbin/ –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf –master spark://<spark-master>:7077`

Obtain Sample Apache Spark Thrift Server Load Tests

Clone the repo

This repo contains a Gatling extension I wrote.  This extension is what will allow us to load test the Spark Thrift Server.

See the src/main/resources/application.conf file for default Spark Thrift connection settings and adjust as needed.

You can run the src/test/scala/io/github/gatling/sql/example/ThriftServerSimulation.scala by invoking the Maven `test` task or build a jar and use the `` script which will be covered in next section.

Run Load Tests

You can run the src/test/scala/io/github/gatling/sql/example/ThriftServerSimulation.scala by invoking the Maven `test` task or build a jar and use the included `` script.


Hopefully, this tutorial on load testing Apache Spark Thrift Server helps get you started.

If you have any questions or ideas for corrections, let me know in the comments below.


Featured Image credit

Spark Thrift Server with Cassandra Example

With the Spark Thrift Server, you can do more than you might have thought possible.  For example, want to use `joins` with Cassandra?  Or, help people familiar with SQL leverage your Spark infrastructure without having to learn Scala or Python?  They can use their existing SQL based tools they already know such as Tableau or even MS Excel.  How about both?  This tutorial describes how to provide answers using the Spark Thrift Server.  It describes how to configure the Apache Spark Thrift Server with Cassandra.

First, some quick background on Apache Spark Thrift Server.  

Apache Spark Thrift Server is a port of Apache HiveServer2 which allows JDBC/ODBC clients to execute Spark SQL queries.  From my experience, these “clients” are typically business intelligence (BI) tools such as Tableau or even MS Excel or direct SQL access using their query tool of choice such as Toad, DBVisualizer, SQuirrel SQL Client.

Ok, great, but why do we care?  

The Spark Thrift Server allows clients to utilize the in-memory, distributed architecture of the Spark SQL.  In addition, the Thrift Server may be deployed as a Spark application which allows clients to share the same SparkContext/ SparkSession and take advantage of caching across different JDBC/ODBC requests.  Finally, through the Thrift Server and Spark SQL, we may be able to provide relational database concepts such as SQL JOINs in environments in which they are not supported such as Cassandra.

    1. Start Cassandra and Load Sample Data
    2. Optional Start Spark Cluster
    3. Start Spark Thrift Server with Cassandra configuration
    4. Verify our environment
    5. Configure Hive Metastore
    6. Run example SQL
    7. Check out the Spark UI
    8. Small celebration

I demo all these steps in a screencast in case it helps.

Spark Thrift Server Example Screencast

Spark Thrift Server with Cassandra Example. A How-to love story.

Spark Thrift Server Overview

If you have Spark Cluster or have downloaded Apache Spark to your laptop, you already have everything you need to run Spark Thrift Server.  Thrift Server is included with Spark.

Also, I assume you are familiar with the setup of Spark and Cassandra.  I don’t walk through these steps.  Instead, I just run through the steps to start everything up in my local environment.  You will need to make some adjustments the following to best match your environment.  For example, you will need to modify any references to `$SPARK_HOME` in steps below to match the appropriate directory for your setup.  Ok, enough chit chat, let’s go…

1. Start Cassandra and load sample database

This tutorial assumes you have Cassandra running locally, but it isn’t a requirement.  Your Cassandra cluster can be running someplace other than local.  If Cassandra is running locally, the next step is to load a sample keyspace and load up some data.  In this tutorial, we’re going to use the CDM tool found at  Using CDM is really simple.  Just download the pre-built binary, update the permissions to make it executable (i.e. `chmod 755 cdm`) and run it `cdm install killrweather`.  I show an example of running it in screencast below.


A running Spark cluster is optional in this Spark Thrift tutorial.  If you want to run a minimal cluster with one worker on your laptop, you can perform something similar to the following


`$SPARK_HOME/sbin/ spark://<spark-master>:7077`

I bet you already knew this.  If not, here’s a running Spark Standalone tutorial.  Anyhow, movin on…

3. Start Thrift Server and Set Config for Cassandra

If you did not perform step 2 or do not have an available Spark cluster, then run

`$SPARK_HOME/sbin/ –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf`

This will run Thrift Server in local[*] mode which is fine for this quick start.

Alternatively, if you do have a Spark cluster, you can also pass in –master arg

`$SPARK_HOME/sbin/ –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf –master spark://<spark-master>:7077`

Notice how we are passing in the Spark Cassandra Connector and the `` config here?  Of course you do, because you do not let details like this get past you.  You are stickler for details as they say.

4. Configure Hive Metastore with beeline client

Next, we’re going to update the Hive metastore.  The Hive metastore is what Spark Thrift Server uses to know connection parameters of registered data sources.  In this example, the data source is Cassandra of course.

To update the Hive metastore, we’re going to use the Apache Beeline client.  Beeline is a command shell that works with HiveServer2 using JDBC.  It is based on SQLite CLI.


`beeline> !connect jdbc:hive2://localhost:10000`

In my setup, I can disregard the username and password prompts… just enter key to answer both prompts.

`jdbc:hive2://localhost:10000> CREATE TABLE raw_weather_data USING org.apache.spark.sql.cassandra OPTIONS (keyspace ‘isd_weather_data’, table ‘raw_weather_data’);`

and then

`jdbc:hive2://localhost:10000> CREATE TABLE weather_station USING org.apache.spark.sql.cassandra OPTIONS (keyspace ‘isd_weather_data’, table ‘weather_station’);`

We registered two tables because we are going to use SQL JOINs in future sections of this Spark Thrift tutotial.  It’s probably noteworthy to mention these tables may be cached as well.  But, I’m not going to show how to do that in this example, but leave a comment, if you have any questions.

5. Verify Spark, Cassandra and Thrift Server Setup

Before running some more complex SQL, let’s just verify our setup with a smoke test.  Still, within the connected beeline client, issue a simple SQL statement

`jdbc:hive2://localhost:10000> select * from raw_weather_data limit 10;`

You should see some results.  10 rows to be precise you cheeky bastard.

6. SQL with Spark Examples

Let’s show more complex SQL examples and take things another step.  I’m going to use SQuirreL SQL client, but the concepts apply to any SQL client.  If you want more detail on setting up SQuirrel with Thrift, see the Reference section below.

Joins?  Joins you say!  No problem

`SELECT, raw.temperature
FROM raw_weather_data raw
JOIN weather_station ws
WHERE raw.wsid = ‘725030:14732’
AND raw.year = 2008 AND raw.month = 12 AND = 31;`

Well, well, special, special.  Let’s turn it to 11…

ranked_temp.avg_temp from
SELECT wsid, year, month, day, daily.avg_temp,
dense_rank() OVER (PARTITION BY wsid order by avg_temp desc) as rank
(select wsid, year, month, day, avg(temperature) as avg_temp
FROM raw_weather_data
group by wsid, year, month,day ) daily
) ranked_temp
JOIN weather_station on ranked_temp.wsid =
where rank <= 5;`

7. JDBC / ODBC server tab in SPARK UI

Once again, the Spark UI is a valuable resource for us.  Now, that we’ve run a few queries, let’s take a look at what we can see…

Open `http://localhost:4040/` in your browser.  You should be redirected to /jobs

If you started Spark Thrift Server with a –master argument and pointed a cluster, then you can open http://localhost:8080 and get to this screen as well.

Spark Thrift Server in the Spark UI
Spark Thrift Server in the Spark UI

Notice the JDBC/ODBC tab.  That’s the Thrift Server part.  You did that.  Check things out.  You can see the query plans, details of the Spark jobs such as stages and tasks, how did shuffling go?  etc. etc.

8. Small Celebration

Let’s sing

“Let’s celebrate, it’s all right… we gonna have a good time tonight”  — Kool and the Gang

Sing it with me.  Don’t be shy.

Spark Thrift Server Conclusion

Hopefully, this Apache Spark Thrift Server tutorial helps get you started.  I’ve also put together a quick screencast of me going through these steps above in case it helps.  See the Screencast section below.

If you have any questions or ideas for corrections, let me know in the comments below.

Also, you may be interested in the Spark Thrift Server performance testing tutorial.

Update November 2017

I intertwined two different data sources in the above post.  When I originally wrote this, I had a keyspace called ‘isd_weather_data’ from the killrweather reference application.  But, in this tutorial, I used `cdm` to create and load a keyspace called `killrweather`.  You can see this when looking at how the tables are created in the above when referencing `isd_weather_data`.  This shouldn’t have been there.  When using `cdm` as described in this tutorial, replace `isd_weather_data` with `killrweather`.  And not only that…. the data appears different between the two sources.  So, the `join` examples produce results when using isd_weather_data and none when using killrweather.  I’m presuming differences between and is the root cause of no results with joins.  For example, CSV contains weather_station id `725030:14732`.  Sorry if this causes any confusion.

Spark Thrift Server References

Image credit

Spark Submit Command Line Arguments

Spark Command Line Arguments in Scala

The primary reason why we want to use Spark submit command line arguments is to avoid hard-coding values into our code. As we know, hard-coding should be avoided because it makes our application more rigid and less flexible.

For example, let’s assume we want to run our Spark job in both test and production environments. Let’s further assume that our Spark job reads from a Cassandra database and the databases between test and production are different. In this example, we should prefer using dynamic configuration values when submitting the job to test vs production environments.  The alternative is hard-coding the Cassandra host connection in code, recompiling and deploying.  This approach wastes time.

So, how do we process Spark submit command line arguments in your Scala code?  I would think this would be easy by now. But, I’ve been surprised at how difficult this can be when people are new to Scala and Spark.

(Reference: For more info on deploying with spark-submit to a spark cluster and with spark and 3rd party jars)

Of course, one way to achieve command line arg parsing is querying the String Array which is part of your `main` function. For example, perhaps your code looks similar to:

def main(args: Array[String]) {

  val conf = new SparkConf().setAppName("SparkCassandraExampleApp")

  if (args.length > 0) conf.setMaster(args(0))

  if (args.length >= 1) conf.set("", args(1))


But this isn’t good.  It’s brittle because we are not using default values and `args` values depend on a specific ordering.  Let’s address these two weaknesses in our solution.

In this tutorial, I’ll present a simple example of a flexible and scalable way to process command-line args in your Scala based Spark jobs. We’re going to use `scopt` library [1] and update our previous Spark with Cassandra example.

To update our previous Spark Cassandra example to use command-line arguments, we’re going to update two areas of the project: the SBT build file and our code.

Step 1 Update SBT Build File for Scopt Command Line Option Parsing Library

Easy.  Essential from:

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"


libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0",
   "com.github.scopt" %% "scopt" % "3.5.0"

Step 2 Update Spark Scala code to process command line args

There are multiple areas of the code we need to update.  First, we’re going to update our code to use a case class to represent command-line options and utilize the `scopt` library to process.

A case class to hold and reference our command line args:

case class CommandLineArgs (
  cassandra: String = "", // required
  keyspace: String = "gameofthrones", // default is gameofthrones
  limit: Int = 10

Next, let’s update the code to use this case class and set possible values from the command line

val parser = new scopt.OptionParser[CommandLineArgs]("spark-cassandra-example") {
  head("spark-cassandra-example", "1.0")
  opt[String]('c', "cassandra").required().valueName("<cassandra-host>").
    action((x, c) => c.copy(cassandra = x)).
    text("Setting cassandra is required")
  opt[String]('k', "keyspace").action( (x, c) =>
    c.copy(keyspace = x) ).text("keyspace is a string with a default of `gameofthrones`")
  opt[Int]('l', "limit").action( (x, c) =>
    c.copy(limit = x) ).text("limit is an integer with default of 10")

So, there are a few interesting parts in this code.  Can you tell which command line variable is required?  Can you tell which requires an Int vs. String?  Sure, you can.  I have confidence in you.

Ok, finally we need to update the code to make a decision based on whether the command line arg parsing succeeded or not.  The way we do that is through a pattern match as shown here

parser.parse(args, CommandLineArgs()) match {

  case Some(config) =>
  // do stuff
  case None => // failed


In this example, `config` is our `CommandLineArgs` case class which is available on success.  If the command line arg parsing succeeded, our code will enter the `Some(config)` match and “do stuff”.  There we can use the vars such as `config.keyspace`.

The alternative match is `None`.  In this match, we know the command line arg parsing failed, so the code should exit.

That’s it.  Hopefully, this simple example helps you move ahead with Spark command line argument parsing and usage in Scala.  For more details and exploring more options using `scopt` check out the site via reference link below or let me know if you have any questions or comments.

For the complete updated example, see this commit or cut-and-paste

Spark Submit Command Line References

[1] For more on `scopt` checkout

Don’t miss other Spark Tutorials on Spark Scala Tutorials

Featured image credit

Spark RDD – A Two Minute Guide for Beginners

spark rdd

What is Spark RDD?

Spark RDD is short for Apache Spark Resilient Distributed Dataset.  A Spark Resilient Distributed Dataset is often shortened to simply RDD.  RDDs are a foundational component of the Apache Spark large scale data processing framework.

Spark RDDs are an immutable, fault-tolerant, and possibly distributed collection of data elements.  RDDs may be operated on in parallel across a cluster of computer nodes.  To operate in parallel, RDDs are divided into logical partitions.  Partitions are computed on different nodes of the cluster through Spark Transformation APIs. RDDs may contain a type of Python, Java, or Scala objects, including user-defined classes.

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value by performing a computation on the RDD.

How are Spark RDDs created?

Spark RDDs are created through the use of Spark Transformation functions.  Transformation functions create new RDDs from a variety of sources; e.g. textFile function from a local filesystem, Amazon S3 or Hadoop’s HDFS.  Transformation functions may also be used to create new RDDs from previously created RDDs.  For example, an RDD of all the customers from only North America could be constructed from an RDD of all customers throughout the world.

In addition to loading text files from file systems, RDDs may be created from external storage systems such as JDBC databases such as mySQL, HBase, Hive, Casandra or any data source compatible with Hadoop Input Format.

RDDs are also created and manipulated when using Spark modules such as Spark Streaming and Spark MLlib.

Why Spark RDD?

Spark makes use of data abstraction through RDDs to achieve faster and more efficient performance than Hadoop’s MapReduce.

RDDs support in-memory processing.  Accessing data from memory is 10 to 100 times faster than accessing data from a network or disk.  Data access from disk often occurs in Hadoop’s MapReduce-based processing.

In addition to performance gains, working through an abstraction layer provides a convenient and consistent way for developers and engineers to work with a variety of data sets.

When to use Spark RDDs?

RDDs are utilized to perform computations on an RDD dataset through Spark Actions such as a count or reduce when answering questions such as “how many times did xyz happen?” or “how many times did xyz happen by location?”

Often, RDDs are transformed into new RDDs in order to better prepare datasets for future processing downstream in the processing pipeline.  To reuse a previous example, let’s say you want to examine North America customer data and you have an RDD of all worldwide customers in memory.  It could be beneficial from a performance perspective to create a new RDD for North America only customers instead of using the much larger RDD of all worldwide customers.

Depending on the Spark operating environment and RDD size, RDDs should be cached (via cache function) or persisted to disk when there is an expectation for the RDD to be utilized more than once.

Conclusion Resources

Scala Transformation API examples

Python Transformation API examples

Hadoop Input Format API docs

Spark Tutorial Landing page

Featured Image credit

Apache Spark Advanced Cluster Deploy Troubleshooting

spark cluster deploy troubleshooting

In this Apache Spark cluster troubleshooting tutorial, we’ll review a few options when your Scala Spark code does not deploy as anticipated.  For example, does your Spark driver program rely on a 3rd party jar only compatible with Scala 2.11, but your Spark Cluster is based on Scala 2.10?  Maybe your code relies on a newer version of a 3rd party jar also used by Apache Spark?  Or maybe you want your code to use the Spark version of a particular jar instead of the jar specified by your code.

In any of these cases, your deploy to the Spark Cluster will not be smooth.  So, in this post, we’ll explore ways how to address all three issues.


We’re going to address three specific issues when deploying to a Spark cluster and how to address in this post:

  1. Using Apache Spark with Scala 2.11
  2. Overriding jars used by Spark with newer versions
  3. Excluding jars from your code in order to use the Spark version instead

All of these issues will be addressed based on the spark streaming code used in a previous Spark Streaming tutorial.  Links to source code download and screencasts are available at the end of this post.

Challenge 1 Apache Spark with Scala 2.11

I had no idea things could go so wrong with the following build.sbt file.

name := "spark-streaming-example"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

scalaVersion := "2.11.8"

resolvers += "jitpack" at ""

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.scalaj" %% "scalaj-http" % "2.3.0",
  "org.jfarcand" % "wcs" % "1.5"

Although it doesn’t stand out, the issue is going to be with “wcs” WebSocket client library.   The “wcs” WebSocket client library does not work with an Apache Spark cluster compiled to Scala 2.10.

There was the error

Exception in thread "Thread-28" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
    at scalaj.http.HttpConstants$.liftedTree1$1(Http.scala:637)

An error such as this indicates a Scala version incompatibility issue.

wcs is compiled to Scala 2.11 and I couldn’t find another WebSocket client to use in this project, so I explored compiling Spark to Scala 2.11 compatibility.  It turns out this isn’t a big deal.

Build Apache Spark with Scala 2.11 Steps

  1. Download Source (screencast below)
  2. Run script to change 2.11
  3. Run script

Screencast of these three steps:

Building a Spark Scala 2.11 Distribution

Commands run in Screencast:

./dev/ 2.11

./ --name spark-1.6.1-scala-2.11 --tgz -Dscala-2.11 -Pyarn -Phadoop-2.4

After creating the distribution, I started the Spark master and worker and optimistically tried to deploy again.  That’s how I ran into the next challenge.

Challenge 2 Incompatible Jars between Spark and Scala program – Use Your Jar

When I tried deploying the assembly jar to the new Spark cluster custom built for Scala 2.11, I ran into another issue as shown in this screencast:

Spark advanced deploy 2

As you see in the screencast, there were issues with SSL in Netty based HttpClient.

After asking Dr. Googs (see reference links below), I determined Spark uses Akka Actor for RPC and messaging, which in turn uses Netty.  And it turns out the Spark/ Akka version of Netty is an incompatible version with the needed by the scalaj-http library used in this project.

Recall from the “sbt assembly” command, the following as assembled:

~/Development/spark-course/spark-streaming $ sbt assembly
[info] Loading project definition from /Users/toddmcgrath/Development/spark-course/spark-streaming/project
[info] Set current project to spark-streaming-example (in build file:/Users/toddmcgrath/Development/spark-course/spark-streaming/)
[info] Including from cache: wcs-1.5.jar
[info] Including from cache: slf4j-api-1.7.12.jar
[info] Including from cache: scalaj-http_2.11-2.3.0.jar
[info] Including from cache: async-http-client-1.9.28.jar
[info] Including from cache: netty-3.10.3.Final.jar
[info] Checking every *.class/*.jar file's SHA-1.

I needed a way to configure Spark to use my netty-3.10.3.Final.jar instead of the older version used in Akka.

The solution was to use the “spark.driver.userClassPathFirst” configuration variable provided the answer.  This variable is described as

"Whether to give user-added jars precedence over Spark's own jars when loading classes in the the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature. This is used in cluster mode only."

So, I tried deploying again with this conf variable set as shown in the following screencast:

netty issue deploy

Challenge 3 Incompatible Jars between Spark and Scala program – Use Spark Jar

What to do if you want to use your jar instead of Spark’s version?  Essentially, this is the opposite the previously described Challenge 2.

I reviewed the output from “sbt assembly” and see slf4j was included in the assembly. Well, from the logs, we can see that Spark is already using slf4j and now our driver program is attempting to spawn another instance. Let’s use Spark’s already instantiated slf4j instead.

To remove or exclude certain jars from being included in the fat jar, hook into sbt-assembly plugin “excludedJars”

Update to build.sbt with the following:

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
  cp filter {
                i => == "slf4j-api-1.7.12.jar"

And re-run “sbt assembly” and you’ll be ready to try another Spark deploy.

Everything worked as you can see in this screencast

Spark Deploy with two slf4j jars

Spark Troubleshooting Conclusion

This post presented three challenges and solutions when troubleshooting Apache Spark with Scala deploys to a Spark cluster.  We covered three scenarios:

  • Apache Spark with Scala 2.11
  • Setting Apache Spark jars precedence over yours by excluding from assembly
  • Giving your jar(s) precedence over comparable Apache Spark version of jar(s).

Further References

If interested in more Apache Spark with Scala training and source code examples, make sure to checkout out our Spark courses or a full listing of Spark tutorials in Scala and Python.

Featured Image credit

Apache Spark with Amazon S3 Examples

Apache Spark with Amazon S3 setup

This post will show ways and options for accessing files stored on Amazon S3 from Apache Spark.  Examples of text file interaction on Amazon S3 will be shown from both Scala and Python using the spark-shell from Scala or ipython notebook for Python.

To begin, you should know there are multiple ways to access S3 based files.  The options depend on a few factors such as:

  • The version of Spark, because of the version of accompanying Hadoop libraries matters
  • How were the files were created on S3? Were they written from Spark or Hadoop to S3 or some other 3rd party tool?

All these examples are based on Scala console or pyspark, but they may be translated to different driver programs relatively easily.  If you run into any issues, just leave a comment at the bottom of this page and I’ll try to help you out.

Apache Spark with Amazon S3 Scala Examples

Example Load file from S3 Written By Third Party Amazon S3 tool


  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAJJRUVasdfasdf")
scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "LmuKE77fVLXJfasdfasdfxK2vj1nfA0Bp")
scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> myRDD.count
res2: Long = 35218

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

A person could also store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the spark console, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> myRDD.count
res0: Long = 35218

Example Load Text File from S3 Written from Hadoop Library


  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> val subset = => line.split(",")).map(n => (n(1), n(4)))
subset: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] at map at <console>:23

scala> subset.saveAsTextFile("s3://supergloospark/baby_names_s3_not_s3n.csv")
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res13: Long = 35218

Notice how s3 instead of s3n is used.  Also, we’re not setting any AWS credentials because we set them as environment variables before starting spark-shell.  See the previous example where AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID were set.  These vars will work for either s3 or s3n.

S3 from Spark Text File Interoperability


  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Scala S3 examples above
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> // the following will error, because using s3 instead of s3n
scala> sc.textFile("s3://supergloospark/baby_names.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
scala> sc.textFile("s3n://supergloospark/baby_names.csv").count()
res16: Long = 35218
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res19: Long = 35218                                                             
scala> sc.textFile("s3n://supergloospark/baby_names_s3_not_s3n.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://supergloospark/baby_names_s3_not_s3n.csv

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

Apache Spark with Amazon S3 Python Examples

Python Example Load File from S3 Written By Third Party Amazon S3 tool


  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAnotrealPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "+Uu9E4psBLnotrealyi+e7i1Z6kJANKt")
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv").count()

There are three different S3 options.  Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  See Reference section in this post for links for more information.

You can store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the ipython notebook, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

>>> sc.textFile("s3n://supergloospark/baby_names.csv").count()

Python Example Load Text File from S3 Written from Hadoop Library


  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
>>> subset = myRDD.filter(lambda line: "MICHAEL" in line)
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AKIAI74O5KPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "+Uu9E4psBLJgJPNFeV2GdevWcyi+e7i1Z6kJANKt")
>>> subset.saveAsTextFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv")
>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()

Note how this example is using s3 instead of s3n in setting security credentials and protocol specification in textFile call.  Unlike comparable Scala example above, we are setting the AWS keys again because we are using s3 instead of s3n.  We can avoid having to set either if we store these values in external environment vars as noted above.

Python with S3 from Spark Text File Interoperability


  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Python S3 examples above
>>>  sc.textFile("s3n://supergloospark/baby_names.csv").count()

>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()

>>> sc.textFile("s3://supergloospark/baby_names.csv").count()
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.


1)  To create the files on S3 outside of Spark/Hadoop, I used a client called Forklift.  But, Forklift isn’t a requirement as there are many S3 clients available.  Here’s a screencast example of configuring Amazon S3 and copying the file up to the S3 bucket.

Apache Spark with Amazon S3 setup

2) For more information on different S3 options, see Amazon S3 page on Hadoop wiki

3) ipython notebook file available on github

4) Additional tutorials around Amazon AWS and Spark include Spark on EC2 tutorial and Spark Kinesis example and be sure to watch Spark with Scala and Spark with Python tutorial landing pages.

Featured Image Credit

How To: Apache Spark Cluster on Amazon EC2 Tutorial

Spark Cluster on EC2

How to set up and run an Apache Spark Cluster on EC2?  This tutorial will walk you through each step to get an Apache Spark cluster up and running on EC2. The cluster consists of one master and one worker node. It includes each step I took regardless if it failed or succeeded.  While your experience may not match exactly, I’m hoping these steps could be helpful as you attempt to run an Apache Spark cluster on Amazon EC2.  There are screencasts throughout the steps.


This post assumes you have already signed up and have a verified AWS account.  If not, sign up here It assumes you are familiar with running Spark Standalone Cluster and deploying to a Spark cluster.


I’m going to go through step by step and also show some screenshots and screencasts along the way.  For example, there is a screencast that covers steps 1 through 5 below.

Spark Cluster on Amazon EC2 Step by Step

Note: There’s a screencast of steps one through four at the end of step five below.

1) Generate Key/Pair in EC2 section of AWS Console

Click “Key Pairs” in the left nav and then Create Key Pair button.

Download the resulting key/pair PEM file.

2) Create a new AWS user named courseuser and download the file which includes the User Name, Access Key Id, Secret Access Key.  We need the Key Id and Secret Access Key.

3) Set your environment variables according to the key and id from the previous step.  For me, that meant running the following from the command line:

export AWS_SECRET_ACCESS_KEY=F9mKN6obfusicatedpBrEVvel3PEaRiC


4) Open a terminal window and goto the root dir of your Spark distribution.  Then, copy PEM file from first step in this tutorial to root of Spark home dir

5) From Spark home dir, run:

ec2/spark-ec2 --key-pair=courseexample --identity-file=courseexample.pem launch spark-cluster-example

I received errors about the PEM file permissions, so I changed according to the error notification recommendation and re-ran the script.

Then, you should receive permission errors from Amazon, so update permissions of courseuser on Amazon and try again.

You may receive an error about zone availability such as:

Your requested instance type (m1.large) is not supported in your requested Availability Zone (us-east-1b). Please retry your request by not specifying an Availability Zone or choosing us-east-1c, us-east-1e, us-east-1d, us-east-1a.

If so, just update the script zone argument and re-run:

ec2/spark-ec2 --key-pair=courseexample --identity-file=courseexample.pem --zone=us-east-1d launch spark-cluster-example

The cluster creation takes approximately 10 min with all kinds output including deprecated warnings and possibly errors starting GANGLIA.  GANGLIA errors are fine if you are just experimenting.  Try a different Spark version or you can tweak PHP settings on your Cluster.

Here’s a screencast example of me creating an Apache Spark Cluster on EC2

Set up an Apache Spark Cluster on Amazon EC2 Part 1

6) After the cluster creation succeeds, you can verify by going to master http://<your-ec2-hostname>

7) And you can verify from Spark console in Spark or Python

Scala example:

bin/spark-shell --master spark://

Python example

IPYTHON_OPTS="notebook" ./bin/pyspark --master spark://

At first, both of these should have issues which eventually lead to an “ERROR OneForOneStrategy: java.lang.NullPointerException”:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_66)
Type in expressions to have them evaluated.
Type :help for more information.
16/01/17 07:30:28 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
16/01/17 07:30:28 ERROR OneForOneStrategy: 

8) This is an Amazon permission issue related to port 7077 not being open.  You need to open up port 7077 via an Inbound Rule.  Here’s a screencast on how to create an Inbound Rule in EC2:

Setting up an Apache Spark Cluster on Amazon EC2 Part 2

After creating this inbound rule, everything will work from both ipython notebook and spark shell


Hope this helps you configure a Spark Cluster on EC2.  Let me know in the page comments if I can help.  Once you are finished with your EC2 instances, make sure to destroy using the following command:

ec2/spark-ec2 --key-pair=courseexample --identity-file=courseexample.pem destroy spark-cluster-example


For a list of additional resources and tutorials, see Spark tutorials page.

Spark EC2 Tutorial Featured Image Credit:

What is Apache Spark?

What is Spark?

Becoming productive with Apache Spark requires an understanding of a few fundamental elements.  In this post, let’s explore the fundamentals or the building blocks of Apache Spark.  Let’s use descriptions and real-world examples in the exploration.

The intention is for you is to understand basic Spark concepts.  It assumes you are familiar with installing software and unzipping files.  Later posts will deeper dive into Apache Spark and example use cases.

The Spark API can be called via Scala, Python or Java.  This post will use Scala, but can be easily translated to other languages and operating systems.

If you have any questions or comments, please leave a comment at the bottom of this post.

I. Spark Overview

Let’s dive into code and working examples first.  Then, we’ll describe Spark concepts that tie back to the source code examples.


* Java 6 or newer installed

* Download NY State Baby Names in CSV format from:  (I renamed the csv file to baby_names.csv)

I. Spark with Scala Code examples

The CSV file we will use has following structure:

Year,First Name,County,Sex,Count

Let’s start our Spark and Scala journey with a working example.


  1. Download from  Select the “Pre-built package for Hadoop 2.4”
  2. Unpack it.  (tar, zip, etc.)
  3. From the terminal, run the spark-shell via: bin/spark-shell.  After running spark-shell you should see a scala> prompt

Let’s run some code

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[1] at textFile at <console>:12

scala> babyNames.count
res0: Long = 35218

scala> babyNames.first()
res177: String = Year,First Name,County,Sex,Count

So, we know there are 35218 rows in the CSV

scala> val rows = => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[17] at map at <console>:14

scala> => row(2)).distinct.count
res22: Long = 62

Above, we converted each row in the CSV file to an Array of Strings.  Then, we determined there are 62 unique NY State counties over the years of data collected in the CSV.

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))
davidRows: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[24] at filter at <console>:16

scala> davidRows.count
res32: Long = 136

There are 136 rows containing the name “DAVID”.

scala> davidRows.filter(row => row(4).toInt > 10).count()
res41: Long = 89

Number of rows where “NAME” DAVID has a “Count” greater than 10

scala> davidRows.filter(row => row(4).toInt > 10).map( r => r(2) ).distinct.count
res57: Long = 17

17 unique counties which have had the name DAVID over 10 times in a given year

scala> val names = => (name(1),1))
names: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[342] at map at <console>:16

scala> names.reduceByKey((a,b) => a + b).sortBy(_._2).foreach ( println _) // shows number lines each name appears in file, because how names was created with => (name(1), 1)); Jacob appears most often, but is actually not the most popular by total count.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).foreach (println _)

The output from the last foreach loop hasn’t been shown, but Michael (9187 times) followed by Matthew (7891) and Jaden (7807) have been the top 3 most popular name in NY from years 2007 through 2012.  Also, the first row of the CSV needed to be discarded in order to avoid NumberFormatException

If your results do not show Michael 9187 times, try re-running the last command numerous times.  The `println _` function  being called from foreach will only print out one partition of the RDD and there is always a minimum of 2 partitions in a Spark RDD.  To ensure the entire RDD is printed, send to collect first: ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).collect.foreach (println _)

I. Spark Core Overview

The remainder of this post will cover Spark Core concepts.  Spark Core is what makes all other aspects of the Spark ecosystem possible.   Later posts will cover other aspects of the Spark ecosystem including Spark SQL, Spark Streaming, MLib and GraphX.  Be sure to check the list of other Spark Tutorials with Scala for the latest tutorials.

II. Spark Context and Resilient Distributed Datasets

The way to interact with Spark is via a SparkContext (or Spark Session in new versions of Spark).  The example used the Spark Console which provides a SparkContext automatically.  Did you notice the last line in the REPL?

06:32:25 INFO SparkILoop: Created spark context..
Spark context available as sc.

That’s how we’re able to use sc from within our example.

After obtaining a SparkContext, developers may interact with Spark via Resilient Distributed Datasets or RDDs.  (Update: this changes from RDDs to DataFrames and DataSets in later versions of Spark.)

Resilient Distributed Datasets (RDDs) are an immutable, distributed collection of elements.  For more information on Spark RDDs, see the Spark RDD tutorial in 2 minutes or less.

These collections may be parallelized across a cluster.  RDDs are loaded from an external data set or created via a SparkContext.  We’ll cover both of these scenarios.

In the previous example, we create a RDD via:

scala> val babyNames = sc.textFile("baby_names.csv")

We also created RDDs other ways as well, which we’ll cover a bit later.

When utilizing Spark, you will be doing one of two primary interactions: creating new RDDs or transforming existing RDDs to compute a result.  The next section describes these two Spark interactions.

III Spark Actions and Transformations

When working with a Spark RDDs, there are two available operations: actions or transformations.  An action is an execution which produces a result.  Examples of actions in previous are countand first.

Example of Spark Actions

babyNames.count() // number of lines in the CSV file
babyNames.first() // first line of CSV

For a more comprehensive list of Spark Actions, see the Spark Examples of Actions page.

Example of Spark Transformations

Transformations create new RDDs using existing RDDs.  We created a variety of RDDs in our example:

scala> val rows = => line.split(","))

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))

For a more comprehensive list of Spark Transformations, see the Spark Examples of Transformations page.

IV. Conclusion and Looking Ahead

In this post, we covered the fundamentals for being productive with Apache Spark.  As you witnessed, there are just a few Spark concepts to know before being able to be productive.  What do you think of Scala?  To many, the use of Scala is more intimidating than Spark itself.  Sometimes choosing to use Spark is a way to bolster your Scala-fu as well.

In later posts, we’ll write and run code outside of the REPL.  We’ve dive deeper into Apache Spark and pick up some more Scala along the way as well.

As mentioned above, for the most recent list of tutorials around Spark and Scala, be sure to bookmark the Spark with Scala tutorials page.