Spark FAIR Scheduler Example

Spark FAIR Scheduler Example

Scheduling in Spark can be a confusing topic.  When someone says “scheduling” in spark, do they mean scheduling applications running on the same cluster?  Or, do they mean the internal scheduling of Spark tasks within the Spark application?  So, before we cover an example of utilizing the Spark FAIR Scheduler, let’s make sure we’re on the same page in regards to Spark scheduling.

In this tutorial, we’re going to cover an example of how we schedule certain processing within our application with higher priority and potentially more resources.

What is the Spark FAIR Scheduler?

By default, Spark’s internal scheduler runs jobs in FIFO fashion.  When we use the term “jobs” in describing the default scheduler, we are referring to internal Spark jobs within the Spark application.   The use of the word “jobs” is often intermingled between a Spark application a Spark job.  But, applications vs jobs are two very different constructs.  “Oyy yoy yoy” as my grandma used to say when things became more complicated.  Sometimes it’s difficult to translate Spark terminology sometimes.  We are talking about jobs in this post.

Anyhow, as we know, jobs are divided into stages and the first job gets priority on all available resources. Then, the second job gets priority, etc.  As a visual review, the following diagram shows what we mean by jobs and stages.  

Spark Fair Scheduler
Spark Internals

Notice how there are multiple jobs.  We know this because the “Jobs” tab in the Spark UI as well.

If the jobs at the head of the queue are long-running, then later jobs may be delayed significantly.

This is where the Spark FAIR scheduler comes in…

The FAIR scheduler supports the grouping of jobs into pools.  It also allows setting different scheduling options (e.g. weight) for each pool. This can be useful to create high priority pools for some jobs vs others.  This approach is modeled after the Hadoop Fair Scheduler.

 

How do we UTILIZE the Spark FAIR Scheduler?

Let’s run through an example of configuring and implementing the Spark FAIR Scheduler.  The following are the steps we will take

  • Run a simple Spark Application and review the Spark UI History Server
  • Create a new Spark FAIR Scheduler pool in an external XML file
  • Set the `spark.scheduler.pool` to the pool created in external XML file
  • Update code to use threads to trigger use of FAIR pools and rebuild
  • Re-deploy the Spark Application with
    • `spark.scheduler.mode` configuration variable to FAIR
    • `spark.scheduler.allocation.file` configuration variable to point to the XML file
  • Run and review Spark UI History Server

 

Here’s a screen case of me running through all these steps

How to Spark FAIR scheduler

Also, for more context, I’ve outlined all the steps below.

Run a simple Spark Application with default FIFO settings

In this tutorial on Spark FAIR scheduling, we’re going to use a simple Spark application.  The code reads in a bunch of CSV files about 850MB and calls a `count` and prints out values.  In the screencast above, I was able to verify the use of pools in the regular Spark UI but if you are using a simple Spark application to verify and it completes you may want to utilize the Spark History Server to monitor metrics.  (By the way, see the Spark Performance Monitor with History Server tutorial for more information on History Server).

Create a new Spark FAIR Scheduler pool

There is more than one way to create FAIR pools.  In this example, we will create a new file with the following content

<?xml version="1.0"?>

<allocations>
  <pool name="fair_pool">
    <schedulingMode>FAIR</schedulingMode>
    <weight>2</weight>
    <minShare>4</minShare>
  </pool>
  <pool name="a_different_pool">
    <schedulingMode>FIFO</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
</allocations>

Save this file to the file system so we can reference it later.

A note about the file options.  Hopefully obvious, but we configure pools in the `pool` nodes and give it a name.  Then we have three options for each pool:

  • `schedulingMode` — which is either FAIR or FIFO
  • `weight` — Controls this pool’s share of the cluster relative to other pools. Pools have a weight of 1 by default. Giving a specific pool a weight of 2, for example, it will get 2x more resources as other active pools
  • `minShare` — Pools can be set a minimum share of CPU cores to allocate

 

Update code to utilize the new FAIR POOls

The code in use can be found on my work-in-progress Spark 2 repo

Set Scheduler Configuration During Spark-Submit

We’re going to add two configuration variables when we re-run our application:

  • `spark.scheduler.mode` configuration variable to FAIR
  • `spark.scheduler.allocation.file` configuration variable to point to the previously created XML file

 

Verify Pools are being utilized

Let’s go back to the Spark UI and review while the updated application with new spark-submit configuration variables is running.  We can now see the pools are in use!  Just in case you had any doubt along the way, I did believe we could do it.  Never doubted it.

 

Conclusion

I hope this simple tutorial on using the Spark FAIR Scheduler was helpful.  If you have any questions or suggestions, please let me know in comments section below.

Further reference

 

 

Featured image credit https://flic.kr/p/qejeR3

 

Apache Spark Thrift Server Load Testing Example

Spark Thrift Server Stress Test Tutorial

Wondering how to do perform stress tests with Apache Spark Thrift Server?  This post will describe one way to do it.

What is Apache Spark Thrift Server?  

Apache Spark Thrift Server is based on the Apache HiveServer2 which was created to allow JDBC/ODBC clients to execute SQL queries using a Spark Cluster.  From my experience, these “clients” are typically business intelligence tools such as Tableau and they are most often only a portion of the overall Spark architecture.  In other words, the Spark cluster is primarily used for streaming and batch aggregation jobs and any JDBC/ODBC client access via Thrift Server to the cluster is secondary at best.

For more information on Apache Thrift Server and example use case, see the previous Apache Thrift Server with Cassandra.

Apache Spark Thrift Server Load Testing example Overview
How do simulate anticipated load on our Apache Spark Thrift Server?  In this post, we are going to use an open source tool called Gatling.  Check out the References section at the bottom of this post for links to Gatling.
At a high level, this Spark Thrift with Gatling tutorial will run through all the following steps:
  1. Confirm our environment (Spark, Cassandra, Thrift Server)
  2. Compile our Gatling based load testing code
  3. Run a sample Spark Thrift load test

 

Setup and configure our environment of Spark, Cassandra, and Thrift Server

If you are at the point of load testing Apache Spark Thrift Server, I’m going to assume you are already familiar with the setup of Spark, Cassandra or some other backed such as Hive or Parquet.  Therefore, I’m going to just run through the steps to start everything up in my local environment.  Adjust the following to best match your environment.

Confirm Environment
1. Start Cassandra

For this tutorial, we’re going to use the killrweather video sample keyspace and queries created in the previous Apache Spark Thrift Server with Cassandra post.  You need to go through that tutorial first.  This post assumes you have already created and loaded the data, so all we need to do now is start cassandra if it is not already running.

`$CASSANDRA_HOME/bin/cassandra`

2. Start your Spark Master, at least one Worker and the Thrift Server

If your Spark cluster is not already running, then start it up

`$SPARK_HOME/sbin/start-master.sh`

`$SPARK_HOME/sbin/start-slave.sh spark://<spark-master>:7077`

3. Start the Thrift Server and set configuration for Cassandra

`$SPARK_HOME/sbin/start-thriftserver.sh –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf spark.cassandra.connection.host=127.0.0.1 –master spark://<spark-master>:7077`

 

Obtain Sample Apache Spark Thrift Server Load Tests

Clone the repo https://github.com/tmcgrath/gatling-sql

This repo contains a Gatling extension I wrote.  This extension is what will allow us to load test the Spark Thrift Server.

See the src/main/resources/application.conf file for default Spark Thrift connection settings and adjust as needed.

You can run the src/test/scala/io/github/gatling/sql/example/ThriftServerSimulation.scala by invoking the Maven `test` task or build a jar and use the `launch.sh` script which will be covered in next section.

Run Load Tests

You can run the src/test/scala/io/github/gatling/sql/example/ThriftServerSimulation.scala by invoking the Maven `test` task or build a jar and use the included `launch.sh` script.

Conclusion

Hopefully, this tutorial on load testing Apache Spark Thrift Server helps get you started.

If you have any questions or ideas for corrections, let me know in the comments below.

 

References

 

Featured Image credit https://flic.kr/p/e5hWaC

 

Spark Thrift Server with Cassandra Example

Want to use `joins` with Cassandra?  Or, help people already familiar with SQL leverage your Spark infrastructure?  They can use their existing SQL based tools they already know such as Tableau or even MS Excel.  Maybe both?  This post describes how to provide answers.  It describes how to configure the Apache Spark Thrift Server with Cassandra.

First, some quick background on Apache Spark Thrift Server.  

Apache Spark Thrift Server is a port of Apache HiveServer2 which allows JDBC/ODBC clients to execute SQL queries.  From my experience, these “clients” are typically business intelligence (BI) tools such as Tableau or even MS Excel or direct SQL access using their query tool of choice such as Toad, DBVisualizer, SQuirrel SQL Client.

Ok, great, but why do we care?  

The Spark Thrift Server allows clients to utilize the in-memory, distributed architecture of the Spark SQL.  In addition, the Thrift Server may be deployed as a Spark application which allows clients to share the same SparkContext and take advantage of caching across different JDBC/ODBC requests.  Finally, through the Thrift Server and Spark SQL, we may be able to provide relational database concepts such as SQL JOINs in environments in which they are not supported such as Cassandra.

APACHE SPARK THRIFT SERVER with Cassandra Setup
  1. Start Cassandra and Load Sample Data
  2. Optional Start Spark Cluster
  3. Start Spark Thrift Server with Cassandra configuration
  4. Verify our environment
  5. Configure Hive Metastore
  6. Run example SQL
  7. Check out the Spark UI
  8. Small celebration

 

I demo all these steps in a screencast in case it helps.  See the Screencast section below to view.

overview

If you have Spark Cluster or have downloaded Apache Spark to your laptop, you already have everything you need to run Spark Thrift Server.  Thrift Server is included with Spark.

Also, I assume you are familiar with the setup of Spark and Cassandra.  I don’t walk through these steps.  Instead, I just run through the steps to start everything up in my local environment.  You will need to make some adjustments the following to best match your environment.  For example, you will need to modify any references to `$SPARK_HOME` in steps below to match the appropriate directory for your setup.  Ok, enough chit chat, let’s go…

 

1. START CASSANDRA and load sample database

This tutorial assumes you have Cassandra running locally, but it isn’t a requirement.  Your Cassandra cluster can be running someplace other than local.  If Cassandra is running locally, the next step is load a sample keyspace and load up some data.  In this tutorial, we’re going to use the CDM tool found at https://github.com/riptano/cdm-java.  Using CDM is really simple.  Just download the pre-built binary, update the permissions to make it executable (i.e. `chmod 755 cdm`) and run it `cdm install killrweather`.  I show an example of running it in screencast below.

2. START YOUR SPARK MASTER, AT LEAST ONE WORKER AND THE THRIFT SERVER (OPTIONAL)

A running Spark cluster is optional in this Spark Thrift tutorial.  If you want to run a minimal cluster with one worker on your laptop, you can perform something similar to the following

`$SPARK_HOME/sbin/start-master.sh`

`$SPARK_HOME/sbin/start-slave.sh spark://<spark-master>:7077`

I bet you already knew this.  Anyhow, movin on…

3. START THE THRIFT SERVER AND SET CONFIGURATION FOR CASSANDRA

If you did not perform step 2 or do not have an available Spark cluster, then run

`$SPARK_HOME/sbin/start-thriftserver.sh –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf spark.cassandra.connection.host=127.0.0.1`

This will run Thrift Server in local[*] mode which is fine for this quick start.

Alternatively, if you do have a Spark cluster, you can also pass in –master arg

`$SPARK_HOME/sbin/start-thriftserver.sh –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf spark.cassandra.connection.host=127.0.0.1 –master spark://<spark-master>:7077`

Notice how we are passing in the Spark Cassandra Connector and the `cassandra.connection.host` config here?  Of course you do, because you do not let details like this get past you.  You are stickler for details as they say.

4. Configure Hive Metastore with beeline client

Next, we’re going to update the hive metastore.  The Hive metastore is what Spark Thrift Server uses to know connection parameters of registered data sources.  In this example, the data source is cassandra of course.

To update the metastore, we’re going to use the Apache Beeline client.  Beeline is a command shell that works with HiveServer2 using JDBC.  It is based on SQLite CLI.

`$SPARK_HOME/bin/beeline`

`beeline> !connect jdbc:hive2://localhost:10000`

In my setup, I can disregard the username and password prompts… just enter key to answer both prompts.

`jdbc:hive2://localhost:10000> CREATE TABLE raw_weather_data USING org.apache.spark.sql.cassandra OPTIONS (keyspace ‘isd_weather_data’, table ‘raw_weather_data’);`

and then

`jdbc:hive2://localhost:10000> CREATE TABLE weather_station USING org.apache.spark.sql.cassandra OPTIONS (keyspace ‘isd_weather_data’, table ‘weather_station’);`

We registered two tables because we are going to use SQL JOINs in future sections of this Spark Thrift tutotial.  It’s probably noteworthy to mention these tables may be cached as well.  But, I’m not going to show how to do that in this example, but leave a comment, if you have any questions.

5. VERIFY OUR SPARK, CASSANDRA AND THRIFT SERVER SETUP

Before running some more complex SQL, let’s just verify our setup with a smoke test.  Still, within the connected beeline client, issue a simple SQL statement

`jdbc:hive2://localhost:10000> select * from raw_weather_data limit 10;`

You should see some results.  10 rows to be precise you cheeky bastard.

6. SQL with Spark Examples

Let’s show more complex SQL examples and take things another step.  I’m going to use SQuirreL SQL client, but the concepts apply to any SQL client.  If you want more detail on setting up SQuirrel with Thrift, see the Reference section below.

Joins?  Joins you say!  No problem

`SELECT ws.name, raw.temperature
FROM raw_weather_data raw
JOIN weather_station ws
ON raw.wsid=ws.id
WHERE raw.wsid = ‘725030:14732’
AND raw.year = 2008 AND raw.month = 12 AND raw.day = 31;`

Well, well, special, special.  Let’s turn it to 11…

`select weather_station.id,
weather_station.call_sign,
weather_station.country_code,
weather_station.name,
ranked_temp.avg_temp from
(
SELECT wsid, year, month, day, daily.avg_temp,
dense_rank() OVER (PARTITION BY wsid order by avg_temp desc) as rank
FROM
(select wsid, year, month, day, avg(temperature) as avg_temp
FROM raw_weather_data
group by wsid, year, month,day ) daily
) ranked_temp
JOIN weather_station on ranked_temp.wsid = weather_station.id
where rank <= 5;`

 

7. JDBC / ODBC server tab in SPARK UI

Once again, the Spark UI is a valuable resource for us.  Now, that we’ve run a few queries, let’s take a look at what we can see…

Open `http://localhost:4040/` in your browser.  You should be redirected to /jobs

If you started Spark Thrift Server with a –master argument and pointed a cluster, then you can open http://localhost:8080 and get to this screen as well.

Spark Thrift Server in the Spark UI
Spark Thrift Server in the Spark UI

Notice the JDBC/ODBC tab.  That’s the Thrift Server part.  You did that.  Check things out.  You can see the query plans, details of the Spark jobs such as stages and tasks, how did shuffling go?  etc. etc.

8. Small Celebration

Let’s sing

“Let’s celebrate, it’s all right… we gonna have a good time tonight”  — Kool and the Gang

Sing it with me.

CONCLUSION

Hopefully, this Apache Spark Thrift Server tutorial helps get you started.  I’ve also put together a quick screencast of me going through these steps above in case it helps.  See the Screencast section below.

If you have any questions or ideas for corrections, let me know in the comments below.

Update November 2017

I intertwined two different data sources in the above post.  When I originally wrote this, I had a keyspace called ‘isd_weather_data’ from the killrweather reference application.  But, in this tutorial, I used `cdm` to create and load a keyspace called `killrweather`.  You can see this when looking at how the tables are created in the above when referencing `isd_weather_data`.  This shouldn’t have been there.  When using `cdm` as described in this tutorial, replace `isd_weather_data` with `killrweather`.  And not only that…. the data appears different between the two sources.  So, the `join` examples produce results when using isd_weather_data and none when using killrweather.  I’m presuming differences between  https://github.com/killrweather/killrweather/blob/master/data/weather_stations.csv and https://github.com/killrweather/killrweather-data/blob/master/data/weather_station.csv is the root cause of no results with joins.  For example, CSV contains weather_station id `725030:14732`.  Sorry if this causes any confusion.

SCREENCAST

Spark Thrift Server with Cassandra Example. A How-to love story.

 

REFERENCES
  • Spark Thrift Server https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server
  • CDM https://github.com/riptano/cdm-java
  • Apache Beeline https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients
  • Killrweather Sample query inspiration https://github.com/killrweather/killrweather/wiki/7.-Spark-and-Cassandra-Exercises-for-KillrWeather-data
  • SQuirrel SQL client setup with Thrift https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-thrift-server.html#SQuirreL-SQL-Client

 

Image credit https://flic.kr/p/4ykKnp

Spark Command Line Arguments in Scala Example

Spark Command Line Arguments in Scala

The primary reason why we want to use Spark command line arguments is to avoid hard-coding values into our code. As we know, hard-coding should be avoided because it makes our application more rigid and less flexible.

For example, let’s assume we want to run our Spark job in both test and production environments. Let’s further assume that our Spark job reads from Cassandra database and the databases between test and production are different. In this example, we should prefer using dynamic configuration values when submitting the job to test vs production environments.  The alternative is hard-coding the Cassandra host connection in code, recompiling and deploying.  This approach wastes time.

So, how do we process Spark command line arguments in your Scala code?  I would think this would be easy by now. But, I’ve been surprised at how difficult this can be when people are new to Scala and Spark.

Of course, one way to achieve command line arg parsing is querying the String Array which is part of your `main` function. For example, perhaps your code looks similar to:

def main(args: Array[String]) {

  val conf = new SparkConf().setAppName("SparkCassandraExampleApp")

  if (args.length > 0) conf.setMaster(args(0))

  if (args.length >= 1) conf.set("spark.cassandra.connection.host", args(1))

...

But this isn’t good.  It’s brittle because we are not using default values and `args` values depend on a specific ordering.  Let’s address these two weaknesses in our solution.

In this tutorial, I’ll present a simple example of a flexible and scalable way to process command-line args in your Scala based Spark jobs. We’re going to use `scopt` library [1] and update our previous Spark with Cassandra example.

To update our previous Spark Cassandra example to use command-line arguments, we’re going to update two areas of the project: the SBT build file and our code.

Step 1 Update SBT Build File for Scopt Command Line Option Parsing Library

Easy.  Essential from:

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"
)

to:

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0",
   "com.github.scopt" %% "scopt" % "3.5.0"
)

 

Step 2 Update Spark Scala code to process command line args

There are multiple areas of the code we need to update.  First, we’re going to update our code to use a case class to represent command-line options and utilize the `scopt` library to process.

A case class to hold and reference our command line args:

case class CommandLineArgs (
  cassandra: String = "", // required
  keyspace: String = "gameofthrones", // default is gameofthrones
  limit: Int = 10
)

Next, let’s update the code to use this case class and set possible values from command line

val parser = new scopt.OptionParser[CommandLineArgs]("spark-cassandra-example") {
  head("spark-cassandra-example", "1.0")
  opt[String]('c', "cassandra").required().valueName("<cassandra-host>").
    action((x, c) => c.copy(cassandra = x)).
    text("Setting cassandra is required")
  opt[String]('k', "keyspace").action( (x, c) =>
    c.copy(keyspace = x) ).text("keyspace is a string with a default of `gameofthrones`")
  opt[Int]('l', "limit").action( (x, c) =>
    c.copy(limit = x) ).text("limit is an integer with default of 10")
}

So, there are a few interesting parts in this code.  Can you tell which command line variable is required?  Can you tell which requires an Int vs. String?  Sure, you can.  I have confidence in you.

Ok, finally we need to update the code to make a decision based on whether the command line arg parsing succeeded or not.  The way we do that is through a pattern match as shown here

parser.parse(args, CommandLineArgs()) match {

  case Some(config) =>
  // do stuff
  case None => // failed

}

In this example, `config` is our `CommandLineArgs` case class which is available on success.  If the command line arg parsing succeeded, our code will enter the `Some(config)` match and “do stuff”.  There we can use the vars such as `config.keyspace`.

The alternative match is `None`.  In this match, we know the command line arg parsing failed, so the code should exit.

 

That’s it.  Hopefully, this simple example helps you move ahead with Spark command line argument parsing and usage in Scala.  For more details and exploring more options using `scopt` check out the site via reference link below or let me know if you have any questions or comments.

For the complete updated example, see this commit or cut-and-paste https://github.com/tmcgrath/spark-scala/blob/5f286e6543c87ff3d0cd64ade55f85ac32939007/got-battles/src/main/scala/com/supergloo/SparkCassandra.scala

 

References

[1] For more on `scopt` checkout https://github.com/scopt/scopt

 

Featured image credit https://flic.kr/p/QFCGLQ

Learning Spark PDF

learning spark pdf

So, I’ve noticed “Learning Spark PDF” is a search term which happens on this site.  Can someone help me understand what people are looking for when using this phrase?

Are readers looking for the Learning Spark: Lightning-Fast Big Data Analysis book from O’Reilly?

Perhaps looking for the new Apache Spark with Scala Tutorial book? It’s available in PDF and is much more hands-on than the Learning Spark PDF book.

Or, maybe… people are looking for the Summary of Learning Spark book  🙂

Well, I would bet people are searching for the O’Reilly version, but maybe, just maybe, people are looking for the Summary of Learning Spark book.

Did you know the summary of learning spark is currently available for free for Kindle Unlimited subscribers?  That’s better than a learning spark pdf.

More information on the summary book:

The summary guide will help readers become more confident and productive in Apache Spark quickly.  Apache Spark core fundamentals and ecosystem components are presented succinctly.

The guide is ideal for hands-on engineers, managers, and architects who need an in-depth understanding of Spark quickly.  Readers do not need to know Java, Scala or Python in detail.

Readers do not need to know Java, Scala or Python in detail.

The guide will answer questions such as: “does Spark warrant any further investment?” and “how does Spark integrate with Hadoop, Hive, HBase, HDFS, YARN, etc.?” Click on stocks to invest to see what companies are the best to have your money invested in. Give it a try and have your riches enlarged in no time!

The key benefit is saving your time by providing the basic principles presented in the original book.

For more information check out the summary.

 

Featured image credit https://flic.kr/p/7En8U

Spark RDD – A Two Minute Guide for Beginners

spark rdd

What is Spark RDD?

Spark RDD is short for Apache Spark Resilient Distributed Dataset.  A Spark Resilient Distributed Dataset is often shortened to simply RDD.  RDDs are a foundational component of the Apache Spark large scale data processing framework.

Spark RDDs are an immutable, fault-tolerant, and possibly distributed collection of data elements.  RDDs may be operated on in parallel across a cluster of computer nodes.  To operate in parallel, RDDs are divided into logical partitions.  Partitions are computed on different nodes of the cluster through Spark Transformation APIs. RDDs may contain a type of Python, Java, or Scala objects, including user-defined classes.

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value by performing a computation on the RDD.

How are Spark RDDs created?

Spark RDDs are created through the use of Spark Transformation functions.  Transformation functions create new RDDs from a variety of sources; e.g. textFile function from a local filesystem, Amazon S3 or Hadoop’s HDFS.  Transformation functions may also be used to create new RDDs from previously created RDDs.  For example, an RDD of all the customers from only North America could be constructed from an RDD of all customers throughout the world.

In addition to loading text files from file systems, RDDs may be created from external storage systems such as JDBC databases such as mySQL, HBase, Hive, Casandra or any data source compatible with Hadoop Input Format.

RDDs are also created an manipulated when using Spark modules such as Spark Streaming and Spark MLlib.

Why Spark RDD?

Spark makes use of data abstraction through RDDs to achieve faster and more efficient performance than Hadoop’s MapReduce.

RDDs support in-memory processing.  Accessing data from memory is 10 to 100 times faster than accessing data from a network or disk.  Data access from disk often occurs in Hadoop’s MapReduce-based processing.

In addition to performance gains, working through an abstraction layer provides a convenient and consistent way for developers and engineers to work with a variety of data sets.

When to use Spark RDDs?

RDDs are utilized to perform computations on an RDD dataset through Spark Actions such as a count or reduce when answering questions such as “how many times did xyz happen?” or “how many times did xyz happen by location?”

Often, RDDs are transformed into new RDDs in order to better prepare datasets for future processing downstream in the processing pipeline.  To reuse a previous example, let’s say you want to examine North America customer data and you have an RDD of all worldwide customers in memory.  It could be beneficial from a performance perspective to create a new RDD for North America only customers instead of using the much larger RDD of all worldwide customers.

Depending on the Spark operating environment and RDD size, RDDs should be cached (via cache function) or persisted to disk when there is an expectation for the RDD to be utilized more than once.

 

Conclusion Resources

Learning Spark book

Scala Transformation API examples

Python Transformation API examples

Hadoop Input Format API docs

 

Featured Image credit https://flic.kr/p/7TqgUV

Apache Spark Advanced Cluster Deploy Troubleshooting

spark cluster deploy troubleshooting

In this Apache Spark example tutorial, we’ll review a few options when your Scala Spark code does not deploy as anticipated.  For example, does your Spark driver program rely on a 3rd party jar only compatible with Scala 2.11, but your Spark Cluster is based on Scala 2.10?  Maybe your code relies on a newer version of a 3rd party jar also used by Apache Spark?  Or maybe you want your code to use the Spark version of a particular jar instead of the jar specified by your code.

In any of these cases, your deploy to the Spark Cluster will not be smooth.  So, in this post, we’ll explore ways how to address all three of issues.

Overview

We’re going to address three specific issues when deploying to a Spark cluster and how to address in this post:

  1. Using Apache Spark with Scala 2.11
  2. Overriding jars used by Spark with newer versions
  3. Excluding jars from your code in order to use the Spark version instead

All of these issues will be addressed based on the spark streaming code used in a previous Spark Streaming tutorial.  Links to source code download and screencasts are available at the end of this post.

Challenge 1 Apache Spark with Scala 2.11

I had no idea things could go so wrong with the following build.sbt file.

name := "spark-streaming-example"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.scalaj" %% "scalaj-http" % "2.3.0",
  "org.jfarcand" % "wcs" % "1.5"
)

Although it doesn’t stand out, the issue is going to be with “wcs” WebSocket client library.   The “wcs” WebSocket client library does not work with an Apache Spark cluster compiled to Scala 2.10.

There was the error

Exception in thread "Thread-28" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
    at scalaj.http.HttpConstants$.liftedTree1$1(Http.scala:637)

An error such as this indicates a Scala version incompatibility issue.

wcs is compiled to Scala 2.11 and I couldn’t find another WebSocket client to use in this project, so I explored compiling Spark to Scala 2.11 compatibility.  It turns out this isn’t a big deal.

Build Apache Spark with Scala 2.11 Steps

  1. Download Source (screencast below)
  2. Run script to change 2.11
  3. Run make-distribution.sh script

Screencast of these three steps:

Building a Spark Scala 2.11 Distribution

Commands run in Screencast:

./dev/change-scala-version.sh 2.11

./make-distribution.sh --name spark-1.6.1-scala-2.11 --tgz -Dscala-2.11 -Pyarn -Phadoop-2.4

 

After creating the distribution, I started the Spark master and worker and optimistically tried to deploy again.  That’s how I ran into the next challenge.

Challenge 2 Incompatible Jars between Spark and Scala program – Use Your Jar

When I tried deploying the assembly jar to the new Spark cluster custom built for Scala 2.11, I ran into another issue as shown in this screencast:

Spark advanced deploy 2

As you see in the screencast, there were issues with SSL in Netty based HttpClient.

After asking Dr. Googs (see reference links below), I determined Spark uses Akka Actor for RPC and messaging, which in turn uses Netty.  And it turns out the Spark/ Akka version of Netty is an incompatible version with the needed by the scalaj-http library used in this project.

Recall from the “sbt assembly” command, the following as assembled:

~/Development/spark-course/spark-streaming $ sbt assembly
[info] Loading project definition from /Users/toddmcgrath/Development/spark-course/spark-streaming/project
[info] Set current project to spark-streaming-example (in build file:/Users/toddmcgrath/Development/spark-course/spark-streaming/)
[info] Including from cache: wcs-1.5.jar
[info] Including from cache: slf4j-api-1.7.12.jar
[info] Including from cache: scalaj-http_2.11-2.3.0.jar
[info] Including from cache: async-http-client-1.9.28.jar
[info] Including from cache: netty-3.10.3.Final.jar
[info] Checking every *.class/*.jar file's SHA-1.

 

I needed a way to configure Spark to use my netty-3.10.3.Final.jar instead of the older version used in Akka.

The solution was to use the “spark.driver.userClassPathFirst” configuration variable provided the answer.  This variable is described as

"Whether to give user-added jars precedence over Spark's own jars when loading classes in the the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature. This is used in cluster mode only."

So, I tried deploying again with this conf variable set as shown in the following screencast:

netty issue deploy

Challenge 3 Incompatible Jars between Spark and Scala program – Use Spark Jar

What to do if you want to use your jar instead of Spark’s version?  Essentially, this is the opposite the previously described Challenge 2.

I reviewed the output from “sbt assembly” and see slf4j was included in the assembly. Well, from the logs, we can see that Spark is already using slf4j and now our driver program is attempting to spawn another instance. Let’s use Spark’s already instantiated slf4j instead.

To remove or exclude certain jars from being included in the fat jar, hook into sbt-assembly plugin “excludedJars”

Update to build.sbt with the following:

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
  cp filter {
                i => i.data.getName == "slf4j-api-1.7.12.jar"
            }
}

And re-run “sbt assembly” and you’ll be ready to try another Spark deploy.

Everything worked as you can see in this screencast

Spark Deploy with two slf4j jars

Conclusion

This post presented three challenges and solutions when troubleshooting Apache Spark with Scala deploys to a Spark cluster.  We covered three scenarios:

  • Apache Spark with Scala 2.11
  • Setting Apache Spark jars precedence over yours by excluding from assembly
  • Giving your jar(s) precedence over comparable Apache Spark version of jar(s).

 

Further References

http://stackoverflow.com/questions/23330449/how-does-spark-use-netty/23333955#23333955

https://issues.apache.org/jira/browse/SPARK-4738

http://spark.apache.org/docs/latest/configuration.html

http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211

 

If interested in more Apache Spark with Scala training and source code examples, make sure to checkout out our Apache Spark with Scala training course

 

Featured Image credit https://flic.kr/p/jWwHNq

Free Apache Spark Training

Free Apache Spark Training

Hi everyone,

I just released a free, 3 day Apache Spark training course and curious for your feedback. You can sign up on the Free Apache Spark Training page.

In all the course is over an hour long across three videos.

The format of the course is:

Day 1) Show example project of using Apache Spark to analyze data
Day 2) Fundamentals of Apache Spark using short descriptions and diagrams
Day 3) Spend 30 minutes covering Spark Transformations and Actions

This course is ideal for people new to Apache Spark and not sure if they want to commit or pay for a larger course. This course offers the no risk chance to highlight my teaching style.

This initial release is version one and I’m happy to make updates based on student feedback. So, sign up for the free Apache Spark course and let me know your thoughts.

Thanks in advance,
Todd

 

Featured image credit: https://flic.kr/p/itMqCz

Apache Spark with Amazon S3 Examples of Text Files Tutorial

Apache Spark with Amazon S3 setup

This post will show ways and options for accessing files stored on Amazon S3 from Apache Spark.  Examples of text file interaction on Amazon S3 will be shown from both Scala and Python using the spark-shell from Scala or ipython notebook for Python.

To begin, you should know there are multiple ways to access S3 based files.  The options depend on a few factors such as:

  • Which version of Spark you are using, because the version of Hadoop matters
  • How were the files were created on S3? Were they written from Spark or Hadoop to S3 or some other 3rd party tool?

All these examples are based on scala console or pyspark, but they may be translated to different driver programs relatively easily.  If you run into any issues, just leave a comment at the bottom of this page and I’ll try to help you out.

Apache Spark with Amazon S3 Scala Examples

Example Load file from S3 Written By Third Party Amazon S3 tool

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAJJRUVasdfasdf")
scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "LmuKE77fVLXJfasdfasdfxK2vj1nfA0Bp")
scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> myRDD.count
res2: Long = 35218

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

A person could also store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the spark console, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> myRDD.count
res0: Long = 35218

Example Load Text File from S3 Written from Hadoop Library

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> val subset = myRDD.map(line => line.split(",")).map(n => (n(1), n(4)))
subset: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] at map at <console>:23

scala> subset.saveAsTextFile("s3://supergloospark/baby_names_s3_not_s3n.csv")
                                                                                
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res13: Long = 35218

Notice how s3 instead of s3n is used.  Also, we’re not setting any AWS credentials because we set them as environment variables before starting spark-shell.  See the previous example where AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID were set.  These vars will work for either s3 or s3n.

 

S3 from Spark Text File Interoperability

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Scala S3 examples above
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> // the following will error, because using s3 instead of s3n
scala> sc.textFile("s3://supergloospark/baby_names.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...
scala> sc.textFile("s3n://supergloospark/baby_names.csv").count()
res16: Long = 35218
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res19: Long = 35218                                                             
scala> sc.textFile("s3n://supergloospark/baby_names_s3_not_s3n.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://supergloospark/baby_names_s3_not_s3n.csv
...

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

 

Apache Spark with Amazon S3 Python Examples

Python Example Load File from S3 Written By Third Party Amazon S3 tool

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAnotrealPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "+Uu9E4psBLnotrealyi+e7i1Z6kJANKt")
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

There are three different S3 options.  Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  See Reference section in this post for links for more information.

You can store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the ipython notebook, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

>>> sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

Python Example Load Text File from S3 Written from Hadoop Library

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
>>> subset = myRDD.filter(lambda line: "MICHAEL" in line)
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AKIAI74O5KPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "+Uu9E4psBLJgJPNFeV2GdevWcyi+e7i1Z6kJANKt")
>>> subset.saveAsTextFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv")
>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206

Note how this example is using s3 instead of s3n in setting security credentials and protocol specification in textFile call.  Unlike comparable Scala example above, we are setting the AWS keys again because we are using s3 instead of s3n.  We can avoid having to set either if we store these values in external environment vars as noted above.

Python with S3 from Spark Text File Interoperability

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Python S3 examples above
>>>  sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206

>>> sc.textFile("s3://supergloospark/baby_names.csv").count()
...
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

 

References

1)  To create the files on S3 outside of Spark/Hadoop, I used a client called Forklift.  But, Forklift isn’t a requirement as there are many S3 clients available.  Here’s a screencast example of configuring Amazon S3 and copying the file up to the S3 bucket.

Apache Spark with Amazon S3 setup

2) For more information on different S3 options, see Amazon S3 page on Hadoop wiki http://wiki.apache.org/hadoop/AmazonS3

3) ipython notebook file available on github

 

 

Featured Image Credit https://flic.kr/p/nvoqRm

Connecting ipython notebook to an Apache Spark Cluster Quick Start

This post will cover how to connect ipython notebook to two kinds of Spark Clusters: Spark Cluster running in Standalone mode and a Spark Cluster running on Amazon EC2.

Requirements

You need to have a Spark Cluster Standalone and Apache Spark Cluster running to complete this tutorial.  See the Background section of this post for further information and helpful references.

Connecting ipython notebook to an Apache Spark Standalone Cluster

Connecting to the Spark Cluster from ipython notebook is easy.  Simply set the master environment variable when calling pyspark, for example:

IPYTHON_OPTS=”notebook” ./bin/pyspark –master spark://todd-mcgraths-macbook-pro.local:7077

Run a version or some function off of sc.  There’s really know way I know of to programmatically determine if we are truly running ipython notebook against the Spark cluster.  But, we can verify from the Spark Web UI:

Connecting ipython notebook to an Apache Spark Cluster running on EC2

Using pyspark against a remote cluster is just as easy.  Just pass in the appropriate URL to the –master argument.

IPYTHON_OPTS=”notebook” ./bin/pyspark –master spark://ec2-54-198-139-10.compute-1.amazonaws.com:7077

Conclusion

As you saw in this tutorial, connecting to a standalone cluster or spark cluster running on EC2 is essential the same.  It’s easy.  The difficult part of connecting to a Spark cluster happens beforehand.  Check the next section on Background Information to help setup your Apache Spark Cluster and/or connection ipython notebook to a spark cluster.

Background Information or Possibly Helpful References

1) How to use ipython notebook with Spark: Apache Spark and ipython notebook – The Easy Way

2) Apache Spark Cluster in Standalone tutorial, you learned how to run a Spark Standalone cluster.  In addition, you learned how to connect the Scala console to utilize this cluster.

3) Running an Apache Spark Cluster on EC2

 

Featured Image: https://flic.kr/p/5dBco

How To: Apache Spark Cluster on Amazon EC2 Tutorial

Spark Cluster on EC2

How to setup and run Apache Spark Cluster on EC2?  This post will walk you through each step to get an Apache Spark cluster up and running on EC2. The cluster consists of one master and one worker node. It includes each step I took regardless if it failed or succeeded.  While your experience may not match exactly, I’m hoping these steps could be helpful as you attempt to run an Apache Spark cluster on Amazon EC2.  There are screencasts throughout the steps.

Overview

The basis for this tutorial are the ec2 scripts provided with Spark.  It wouldn’t hurt to spend a few minutes reading http://spark.apache.org/docs/latest/ec2-scripts.html to get an idea of what this Apache Spark Cluster on EC2 tutorial will cover.

Assumptions

This post assumes you have already signed-up and have a verified AWS account.  If not, sign up here https://aws.amazon.com/

Approach

I’m going to go through step by step and also show some screenshots and screencasts along the way.  For example, there is a screencast that covers steps 1 through 5 below.

Spark Cluster on Amazon EC2 Step by Step

Note: There’s a screencast of steps one through four at end of step five below.

1) Generate Key/Pair in EC2 section of AWS Console

Click “Key Pairs” in left nav and then Create Key Pair button.

 

Download the resulting key/pair PEM file.

2) Create a new AWS user named courseuser and download the file which includes the User Name, Access Key Id, Secret Access Key.  We need the Key Id and Secret Access Key.

3) Set your environment variables according to the key and id from the previous step.  For me, that meant running the following from the command line:

export AWS_SECRET_ACCESS_KEY=F9mKN6obfusicatedpBrEVvel3PEaRiC

export AWS_ACCESS_KEY_ID=AKIAobfusicatedPOQ7XDXYTA

4) Open a terminal window and goto the root dir of your Spark distribution.  Then, copy PEM file from first step in this tutorial to root of Spark home dir

5) From Spark home dir, run:

ec2/spark-ec2 –key-pair=courseexample –identity-file=courseexample.pem launch spark-cluster-example

I received errors about the PEM file permissions, so I changed according to the error notification recommendation and re-ran spark-ec2 script.

Then, you should receive permission errors from Amazon, so update permissions of courseuser on Amazon and try again.

You may receive an error about zone availability such as:

Your requested instance type (m1.large) is not supported in your requested Availability Zone (us-east-1b). Please retry your request by not specifying an Availability Zone or choosing us-east-1c, us-east-1e, us-east-1d, us-east-1a.

If so, just update the script zone argument and re-run:

ec2/spark-ec2 –key-pair=courseexample –identity-file=courseexample.pem –zone=us-east-1d launch spark-cluster-example

The cluster creation takes approximately 10 min with all kinds output including deprecated warnings and possibly errors starting GANGLIA.  GANGLIA errors are fine if you are just experimenting.  Try a different Spark version or you can tweak PHP settings on your Cluster.

Here’s a screencast example of me creating an Apache Spark Cluster on EC2

Set up an Apache Spark Cluster on Amazon EC2 Part 1

6) After the cluster creation succeeds, you can verify by going to master http://<your-ec2-hostname>.amazonaws.com:8080/

7) And you can verify from Spark console in Spark or Python

Scala example:

bin/spark-shell –master spark://ec2-54-145-64-173.compute-1.amazonaws.com:7077

Python example

IPYTHON_OPTS=”notebook” ./bin/pyspark –master spark://ec2-54-198-139-10.compute-1.amazonaws.com:7077

At first, both of these should have issues which eventually lead to an “ERROR OneForOneStrategy: java.lang.NullPointerException”:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_66)
Type in expressions to have them evaluated.
Type :help for more information.
16/01/17 07:30:28 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
16/01/17 07:30:28 ERROR OneForOneStrategy: 
java.lang.NullPointerException

 

8) This is an Amazon permission issue related to port 7077 not being open.  You need to open up port 7077 via an Inbound Rule.  Here’s a screencast on how to create an Inbound Rule in EC2:

Setting up an Apache Spark Cluster on Amazon EC2 Part 2

After creating this inbound rule, everything will work from both ipython notebook and spark shell

Conclusion

Hope this helps you configure a Spark Cluster on EC2.  Let me know in the page comments if I can help.  Once you are finished with your EC2 instances, make sure to destroy using the following command:

ec2/spark-ec2 –key-pair=courseexample –identity-file=courseexample.pem destroy spark-cluster-example

Featured Image Credit: https://flic.kr/p/g19ivQ

Apache Spark and ipython notebook – The Easy Way

ipython-notebook-spark

Using ipython notebook with Apache Spark couldn’t be easier.  This post will cover how to use ipython notebook (jupyter) with Spark and why it is best choice when using python with Spark.

Requirements

This post assumes you have downloaded and extracted Apache Spark and you are running on a Mac or *nix.  If you are on Windows see http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/

ipython notebook with Apache Spark

I recommend the use the Python 2.7 Anaconda Python distribution which can be downloaded here https://www.continuum.io/downloads.  It contains more than 300 of the most popular python packages for science, math, engineering, and data analysis.  Also, future python spark tutorials and python spark examples will use this distribution.

After you have Anaconda installed, you should make sure that ipyton notebook (Jupyter) is up to date. Run the following command in the Terminal (Mac/Linux) or Command Prompt (Windows):

conda update conda
conda update ipython

Ref: http://ipython.org/install.html in the section “I am getting started with Python” section

Launching ipython notebook with Apache Spark

1) In a terminal, go to the root of your Spark install and enter the following command

IPYTHON_OPTS=”notebook” ./bin/pyspark

A browser tab should launch and various output to your terminal window depending on your logging level.

What’s going on here with IPYTHON_OPTS command to pyspark?  Well, you can look at the source of bin/pyspark in a text editor.  This section

# Determine the Python executable to use for the driver:
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
  # If IPython options are specified, assume user wants to run IPython
  # (for backwards-compatibility)
  PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
  PYSPARK_DRIVER_PYTHON="ipython"
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
  PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi

Hopefully, this snippet makes sense.  If IPYTHON_OPTS is present, use ipython.

Verify Spark with ipython notebook

At this point, you should be able to create a new notebook and execute some python using the provided SparkContext.  For example:

print sc

or

print sc.version

 

Here’s a screencast of running ipython notebook with pyspark on my laptop

ipython notebook spark example

 

In this screencast, pay special attention to your terminal window log statements.  At the default log level of INFO, you should see the no errors in pyspark output.  Also, when you start a new notebook, the terminal should show SparkContext sc being available for use, such as

INFO SparkContext: Running Spark version

Why use ipython notebook with Spark?

1) Same reasons you use ipython notebook without Spark such as convenience, easy to share and execute notebooks, etc.

2) Code completion.  As the screencast shows, a python spark developer can hit the tab key for available functions or also known as code completion options.

 

Hope this helps, let me know if you have any questions.

Learn Apache Spark Online with Limited Time 85% Discount

Image credit https://flic.kr/p/hycr1n

Learn Apache Spark Online Training Online

Our Learn Apache Spark with Scala online course is currently available on Udemy.  To celebrate, we’re offering a85% discount coupon from supergloo.com.

This course will prepare both new and seasoned developers to be productive, confident and valuable with Apache Spark.

Short on time and need to know the core essentials quickly in order to prepare for a job interview, converse with your boss or co-workers or maybe you need to determine if Spark might be a viable solution for your use case?

This is the right course for you.  It teaches Apache Spark core concepts through Scala source code examples and programs. It provides the examples for download so you can run and tweak them to your needs.

The course doesn’t waste your time watching the instructor type the code or providing long overviews of the features and benefits of Spark.  The course respects your time by being concise and focused.

Learn Apache Spark through over 50 hands-on Scala examples and more including.

  • Learn the fundamentals of Spark’s Resilient Distributed Datasets
  • Scala examples of Spark Actions and Transformations
  • Run Spark in a Cluster
  • Deploy a Scala application to Spark running in a Cluster
  • Convenient links to download all source code
  • Reinforce your understanding through multiple quizzes and lecture recap

By the end of this course, you’ll be conversant, knowledgeable and confident with Apache Spark.  You’ll also your own environment and source code to possibly continue your Spark adventures.

Our Spark Training is designed for developers, data scientists and engineering managers.  It is based on our real-world experience using Apache Spark in healthcare and finance, for a taste you can look at Ally bank login instructions from CC Bank.  Although Spark may be used from Java, Python, Scala, the focus of spark training will be on Scala.  So, if you are interested in learning more Scala and build tools such as SBT, this course provides the opportunity.

The course content is online and may be completed at the student’s own pace.

Spark Training Requirements

A computer and an open mind.  Students will have much greater chance of success and overall benefit if they have previous programming and database (relational, document, columnar) experience, but it’s not an absolute requirement.

Training is conducted in English.

Early Apache Spark Training Reviews

A few of our course participants have provided feedback such as:

“The instructor speaks slowly and provides thorough coverage on the topics.  I appreciate the attention to detail”

“I like how I could learn Apache spark in bite sized chunks on my own time”

“So grateful the examples were not word count.  I had tried studying Spark on my own before the course, but the examples were always word count.”

 

Join the Course

Again, retail price is currently $59, but for a limited time, you can access the

Apache Spark – Become Productive and Confident Rapidly course at 85% discount with this link or by using the coupon code: SGNINE during registration

By joining now, you receive all future updates to the course for free.  The next updates will include in-depth presentations and examples of Spark SQL, Streaming and ML (machine learning)!

 

Learn Spark
Learn Apache Spark

 

What is Apache Spark? Deconstructing the Building Blocks Part 1

Becoming productive with Apache Spark requires an understanding of just a few fundamental elements.  In this post, the building blocks of Apache Spark will be covered quickly and backed with real-world examples.

The intention is for readers to understand basic Spark concepts.  It assumes you are familiar with installing software and unzipping files.  Later posts will deeper dive into Apache Spark and example use cases.

Spark computations can be called via Scala, Python or Java.  This post will use Scala on a Mac, but can be easily translated to other languages and operating systems.

If you have any questions or comments, please leave a comment at the bottom of this post.

 

I. Overview

Let’s dive into code and working examples first.  Then, we’ll describe Spark concepts and tie back to the examples.

Requirements

* Java 6 or newer installed

* Download NY State Baby Names in CSV format from: http://www.healthdata.gov/dataset/baby-names-beginning-2007.  (I renamed the csv file to baby_names.csv)

 

I. Code examples

The CSV file we will use has following structure:

Year,First Name,County,Sex,Count
2012,DOMINIC,CAYUGA,M,6
2012,ADDISON,ONONDAGA,F,14
2012,JULIA,ONONDAGA,F,15

 

Let’s start our Spark and Scala journey with a working example.

Steps

  1. Download from http://spark.apache.org/downloads.html.  Select the “Pre-built package for Hadoop 2.4”
  2. Unpack it.  (tar, zip, etc.)
  3. From terminal, run the spark-shell via: bin/spark-shell

Let’s run some code

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[1] at textFile at <console>:12

scala> babyNames.count
res0: Long = 35218

scala> babyNames.first()
res177: String = Year,First Name,County,Sex,Count

So, we know there are 35218 rows in the CSV

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[17] at map at <console>:14

scala> rows.map(row => row(2)).distinct.count
res22: Long = 62

Let’s convert the CSV to an Array of Strings.  Then, determine there are 62 unique NY State counties over the years of data collect in the CSV.

 

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))
davidRows: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[24] at filter at <console>:16

scala> davidRows.count
res32: Long = 136

There are 136 rows containing the name “DAVID”.

scala> davidRows.filter(row => row(4).toInt > 10).count()
res41: Long = 89

Number of rows where “NAME” DAVID has a “Count” greater than 10

scala> davidRows.filter(row => row(4).toInt > 10).map( r => r(2) ).distinct.count
res57: Long = 17

17 unique counties which have had the name DAVID over 10 times in a given year

scala> val names = rows.map(name => (name(1),1))
names: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[342] at map at <console>:16

scala> names.reduceByKey((a,b) => a + b).sortBy(_._2).foreach ( println _) // shows number lines each name appears in file, because how names was created with rows.map(name => (name(1), 1)); Jacob appears most often, but is actually not the most popular by total count.


scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).foreach (println _)

The output from the last foreach loop hasn’t been shown, but Michael (9187 times) followed by Matthew (7891) and Jaden (7807) have been the top 3 most popular name in NY from years 2007 through 2012.  Also, the first row of the CSV needed to be discarded in order to avoid NumberFormatException

If your results do not show Michael 9187 times, try re-running the last command numerous times.  The println _ function  being called from foreach will only print out one partition of the RDD and there is always a minimum of 2 partitions in a Spark RDD.  To ensure the entire RDD is printed, send to collect first:

filteredRows.map ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).collect.foreach (println _)

I. Overview

The remainder of this post will cover Spark Core concepts.  Spark Core is what makes all other aspects of the Spark ecosystem possible.   Later posts will cover other aspecs of Spark ecosystem including Spark SQL, Spark Streaming, MLib and GraphX.

 

II. Spark Context and Resilient Distributed Datasets

The way to interact with Spark is via a SparkContext.  The example used the Spark Console which provides a SparkContext automatically.  Did you notice the last line in the REPL?

06:32:25 INFO SparkILoop: Created spark context..
Spark context available as sc.

That’s how we’re able to use sc from within our example.

After obtaining a SparkContext, developers interact with Spark via Resilient Distributed Datasets.

Resilient Distributed Datasets (RDDs) are a immutable, distributed collection of elements.  These collections may be parallelized across a cluster.  RDDs are loaded from an external data set or created via a SparkContext.  We’ll cover both of these scenarios.

In the previous example, we create a RDD via:

scala> val babyNames = sc.textFile("baby_names.csv")

We also created RDDs other ways as well, which we’ll cover a bit later.

When utilizing Spark, you will be doing one of two primary interactions: creating new RDDs or transforming existing RDDs to compute a result.  The next section describes these two Spark interactions.

 

III Actions and Transformations

When working with a Spark RDDs, there are two available operations: actions or transformations.  An action is an execution which produces a result.  Examples of actions in previous are count, first.

Example Spark Actions

babyNames.count() // number of lines in the CSV file
babyNames.first() // first line of CSV

Transformations create new RDDs using existing RDDs.  We created a variety of RDDs in our example:

scala> val rows = babyNames.map(line => line.split(","))

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))

 

IV. Conclusion and Looking Ahead

In this post, we covered the fundamentals for being productive with Apache Spark.  As you witnessed, there are just a few Spark concepts to know before being able to be productive.  What do you think of Scala?  To many, the use of Scala is more intimidating than Spark itself.  Sometimes choosing to use Spark is a way to bolster your Scala-fu as well.

In later posts, we’ll write and run code outside of the REPL.  We’ve dive deeper into Apache Spark and pick up some more Scala along the way as well.

Comments welcome.