Spark FAIR Scheduler Example

Spark FAIR Scheduler Example

Scheduling in Spark can be a confusing topic.  When someone says “scheduling” in Spark, do they mean scheduling applications running on the same cluster?  Or, do they mean the internal scheduling of Spark tasks within the Spark application?  So, before we cover an example of utilizing the Spark FAIR Scheduler, let’s make sure we’re on the same page in regards to Spark scheduling.

In this Spark Fair Scheduler tutorial, we’re going to cover an example of how we schedule certain processing within our application with higher priority and potentially more resources.

What is the Spark FAIR Scheduler?

By default, Spark’s internal scheduler runs jobs in FIFO fashion.  When we use the term “jobs” in describing the default scheduler, we are referring to internal Spark jobs within the Spark application.   The use of the word “jobs” is often intermingled between a Spark application a Spark job.  But, applications vs jobs are two very different constructs.  “Oyy yoy yoy” as my grandma used to say when things became more complicated.  Sometimes it’s difficult to translate Spark terminology sometimes.  We are talking about jobs in this post.

Anyhow, as we know, jobs are divided into stages and the first job gets priority on all available resources. Then, the second job gets priority, etc.  As a visual review, the following diagram shows what we mean by jobs and stages.  

Spark Fair Scheduler
Spark Internals

Notice how there are multiple jobs.  We know this because the “Jobs” tab in the Spark UI as well.

If the jobs at the head of the queue are long-running, then later jobs may be delayed significantly.

This is where the Spark FAIR scheduler comes in…

The FAIR scheduler supports the grouping of jobs into pools.  It also allows setting different scheduling options (e.g. weight) for each pool. This can be useful to create high priority pools for some jobs vs others.  This approach is modeled after the Hadoop Fair Scheduler.

How do we utilize the Spark FAIR Scheduler?

Let’s run through an example of configuring and implementing the Spark FAIR Scheduler.  The following are the steps we will take

    • Run a simple Spark Application and review the Spark UI History Server
    • Create a new Spark FAIR Scheduler pool in an external XML file
    • Set the `spark.scheduler.pool` to the pool created in external XML file
    • Update code to use threads to trigger use of FAIR pools and rebuild
    • Re-deploy the Spark Application with
        • `spark.scheduler.mode` configuration variable to FAIR
      • `spark.scheduler.allocation.file` configuration variable to point to the XML file
  • Run and review Spark UI History Server

Here’s a screen case of me running through all these steps

Also, for more context, I’ve outlined all the steps below.

Run a simple Spark Application with default FIFO settings

In this tutorial on Spark FAIR scheduling, we’re going to use a simple Spark application.  The code reads in a bunch of CSV files about 850MB and calls a `count` and prints out values.  In the screencast above, I was able to verify the use of pools in the regular Spark UI but if you are using a simple Spark application to verify and it completes you may want to utilize the Spark History Server to monitor metrics.  (By the way, see the Spark Performance Monitor with History Server tutorial for more information on History Server).

Create a new Spark FAIR Scheduler pool

There is more than one way to create FAIR pools.  In this example, we will create a new file with the following content

<?xml version="1.0"?>

<allocations>
  <pool name="fair_pool">
    <schedulingMode>FAIR</schedulingMode>
    <weight>2</weight>
    <minShare>4</minShare>
  </pool>
  <pool name="a_different_pool">
    <schedulingMode>FIFO</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
</allocations>

Save this file to the file system so we can reference it later.

A note about the file options.  Hopefully obvious, but we configure pools in the `pool` nodes and give it a name.  Then we have three options for each pool:

  • `schedulingMode` — which is either FAIR or FIFO
  • `weight` — Controls this pool’s share of the cluster relative to other pools. Pools have a weight of 1 by default. Giving a specific pool a weight of 2, for example, it will get 2x more resources as other active pools
  • `minShare` — Pools can be set a minimum share of CPU cores to allocate
Update code to utilize the new FAIR POOls

The code in use can be found on my work-in-progress Spark 2 repo

Set Scheduler Configuration During Spark-Submit

We’re going to add two configuration variables when we re-run our application:

  • `spark.scheduler.mode` configuration variable to FAIR
  • `spark.scheduler.allocation.file` configuration variable to point to the previously created XML file
Verify Pools are being utilized

Let’s go back to the Spark UI and review while the updated application with new spark-submit configuration variables is running.  We can now see the pools are in use!  Just in case you had any doubt along the way, I did believe we could do it.  Never doubted it.

Conclusion

I hope this simple tutorial on using the Spark FAIR Scheduler was helpful.  If you have any questions or suggestions, please let me know in the comments section below.

Further reference

Featured image credit https://flic.kr/p/qejeR3

Apache Spark Thrift Server Load Testing Example

Spark Thrift Server Stress Test Tutorial

Wondering how to do perform stress tests with Apache Spark Thrift Server?  This tutorial will describe one way to do it.

What is Apache Spark Thrift Server?  

Apache Spark Thrift Server is based on the Apache HiveServer2 which was created to allow JDBC/ODBC clients to execute SQL queries using a Spark Cluster.  From my experience, these “clients” are typically business intelligence tools such as Tableau and they are most often only a portion of the overall Spark architecture.  In other words, the Spark cluster is primarily used for streaming and batch aggregation jobs and any JDBC/ODBC client access via Thrift Server to the cluster is secondary at best.

For more information on Apache Thrift Server and example use case, see the previous Spark Thrift Server tutorial.

Apache Spark Thrift Server Load Testing example Overview
How do simulate anticipated load on our Apache Spark Thrift Server?  In this post, we are going to use an open source tool called Gatling.  Check out the References section at the bottom of this post for links to Gatling.
At a high level, this Spark Thrift with Gatling tutorial will run through all the following steps:
    1. Confirm our environment (Spark, Cassandra, Thrift Server)
    1. Compile our Gatling based load testing code
  1. Run a sample Spark Thrift load test
Setup and configure our environment of Spark, Cassandra, and Thrift Server

If you are at the point of load testing Apache Spark Thrift Server, I’m going to assume you are already familiar with the setup of Spark, Cassandra or some other backed such as Hive or Parquet.  Therefore, I’m going to just run through the steps to start everything up in my local environment.  Adjust the following to best match your environment.

Confirm Environment
1. Start Cassandra

For this tutorial, we’re going to use the killrweather video sample keyspace and queries created in the previous Spark Thrift Server with Cassandra post.  You need to go through that tutorial first.  This post assumes you have already created and loaded the data, so all we need to do now is start cassandra if it is not already running.

`$CASSANDRA_HOME/bin/cassandra`

2. Start your Spark Master, at least one Worker and the Thrift Server

If your Spark cluster is not already running, then start it up

`$SPARK_HOME/sbin/start-master.sh`

`$SPARK_HOME/sbin/start-slave.sh spark://<spark-master>:7077`

3. Start the Thrift Server and set configuration for Cassandra

`$SPARK_HOME/sbin/start-thriftserver.sh –packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 –conf spark.cassandra.connection.host=127.0.0.1 –master spark://<spark-master>:7077`

Obtain Sample Apache Spark Thrift Server Load Tests

Clone the repo https://github.com/tmcgrath/gatling-sql

This repo contains a Gatling extension I wrote.  This extension is what will allow us to load test the Spark Thrift Server.

See the src/main/resources/application.conf file for default Spark Thrift connection settings and adjust as needed.

You can run the src/test/scala/io/github/gatling/sql/example/ThriftServerSimulation.scala by invoking the Maven `test` task or build a jar and use the `launch.sh` script which will be covered in next section.

Run Load Tests

You can run the src/test/scala/io/github/gatling/sql/example/ThriftServerSimulation.scala by invoking the Maven `test` task or build a jar and use the included `launch.sh` script.

Conclusion

Hopefully, this tutorial on load testing Apache Spark Thrift Server helps get you started.

If you have any questions or ideas for corrections, let me know in the comments below.

References

Featured Image credit https://flic.kr/p/e5hWaC

Spark Thrift Server with Cassandra Example

With the Spark Thrift Server, you can do more than you might have thought possible.  For example, want to use `joins` with Cassandra?  Or, help people familiar with SQL leverage your Spark infrastructure without having to learn Scala or Python?  They can use their existing SQL based tools they already know such as Tableau or even MS Excel.  How about both?  This tutorial describes how to provide answers using the Spark Thrift Server.  It describes how to configure the Apache Spark Thrift Server with Cassandra.

First, some quick background on Apache Spark Thrift Server.  

Apache Spark Thrift Server is a port of Apache HiveServer2 which allows JDBC/ODBC clients to execute Spark SQL queries.  From my experience, these “clients” are typically business intelligence (BI) tools such as Tableau or even MS Excel or direct SQL access using their query tool of choice such as Toad, DBVisualizer, SQuirrel SQL Client.

Ok, great, but why do we care?  

The Spark Thrift Server allows clients to utilize the in-memory, distributed architecture of the Spark SQL.  In addition, the Thrift Server may be deployed as a Spark application which allows clients to share the same SparkContext/ SparkSession and take advantage of caching across different JDBC/ODBC requests.  Finally, through the Thrift Server and Spark SQL, we may be able to provide relational database concepts such as SQL JOINs in environments in which they are not supported such as Cassandra.

APACHE SPARK THRIFT SERVER with Cassandra Setup
    1. Start Cassandra and Load Sample Data
    2. Optional Start Spark Cluster
    3. Start Spark Thrift Server with Cassandra configuration
    4. Verify our environment
    5. Configure Hive Metastore
    6. Run example SQL
    7. Check out the Spark UI
    8. Small celebration

I demo all these steps in a screencast in case it helps.

Spark Thrift Server Example Screencast

Spark Thrift Server Overview

If you have Spark Cluster or have downloaded Apache Spark to your laptop, you already have everything you need to run Spark Thrift Server.  Thrift Server is included with Spark.

Also, I assume you are familiar with the setup of Spark and Cassandra.  I don’t walk through these steps.  Instead, I just run through the steps to start everything up in my local environment.  You will need to make some adjustments the following to best match your environment.  For example, you will need to modify any references to `$SPARK_HOME` in steps below to match the appropriate directory for your setup.  Ok, enough chit chat, let’s go…

1. Start Cassandra and load sample database

This tutorial assumes you have Cassandra running locally, but it isn’t a requirement.  Your Cassandra cluster can be running someplace other than local.  If Cassandra is running locally, the next step is to load a sample keyspace and load up some data.  In this tutorial, we’re going to use the CDM tool found at https://github.com/riptano/cdm-java.  Using CDM is really simple.  Just download the pre-built binary, update the permissions to make it executable (i.e. `chmod 755 cdm`) and run it `cdm install killrweather`.  I show an example of running it in screencast below.

2. START YOUR SPARK MASTER, AT LEAST ONE WORKER AND THE THRIFT SERVER (OPTIONAL)

A running Spark cluster is optional in this Spark Thrift tutorial.  If you want to run a minimal cluster with one worker on your laptop, you can perform something similar to the following

`$SPARK_HOME/sbin/start-master.sh`

`$SPARK_HOME/sbin/start-slave.sh spark://<spark-master>:7077`

I bet you already knew this.  If not, here’s a running Spark Standalone tutorial.  Anyhow, movin on…

3. Start Thrift Server and Set Config for Cassandra

If you did not perform step 2 or do not have an available Spark cluster, then run

`$SPARK_HOME/sbin/start-thriftserver.sh – packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 – conf spark.cassandra.connection.host=127.0.0.1`

This will run Thrift Server in local[*] mode which is fine for this quick start.

Alternatively, if you do have a Spark cluster, you can also pass in – master arg

`$SPARK_HOME/sbin/start-thriftserver.sh – packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 – conf spark.cassandra.connection.host=127.0.0.1 – master spark://<spark-master>:7077`

Notice how we are passing in the Spark Cassandra Connector and the `cassandra.connection.host` config here?  Of course you do, because you do not let details like this get past you.  You are stickler for details as they say.

4. Configure Hive Metastore with beeline client

Next, we’re going to update the Hive metastore.  The Hive metastore is what Spark Thrift Server uses to know connection parameters of registered data sources.  In this example, the data source is Cassandra of course.

To update the Hive metastore, we’re going to use the Apache Beeline client.  Beeline is a command shell that works with HiveServer2 using JDBC.  It is based on SQLite CLI.

`$SPARK_HOME/bin/beeline`

`beeline> !connect jdbc:hive2://localhost:10000`

In my setup, I can disregard the username and password prompts… just enter key to answer both prompts.

`jdbc:hive2://localhost:10000> CREATE TABLE raw_weather_data USING org.apache.spark.sql.cassandra OPTIONS (keyspace ‘isd_weather_data’, table ‘raw_weather_data’);`

and then

`jdbc:hive2://localhost:10000> CREATE TABLE weather_station USING org.apache.spark.sql.cassandra OPTIONS (keyspace ‘isd_weather_data’, table ‘weather_station’);`

We registered two tables because we are going to use SQL JOINs in future sections of this Spark Thrift tutotial.  It’s probably noteworthy to mention these tables may be cached as well.  But, I’m not going to show how to do that in this example, but leave a comment, if you have any questions.

5. Verify Spark, Cassandra and Thrift Server Setup

Before running some more complex SQL, let’s just verify our setup with a smoke test.  Still, within the connected beeline client, issue a simple SQL statement

`jdbc:hive2://localhost:10000> select * from raw_weather_data limit 10;`

You should see some results.  10 rows to be precise you cheeky bastard.

6. SQL with Spark Examples

Let’s show more complex SQL examples and take things another step.  I’m going to use SQuirreL SQL client, but the concepts apply to any SQL client.  If you want more detail on setting up SQuirrel with Thrift, see the Reference section below.

Joins?  Joins you say!  No problem

`SELECT ws.name, raw.temperature
FROM raw_weather_data raw
JOIN weather_station ws
ON raw.wsid=ws.id
WHERE raw.wsid = ‘725030:14732’
AND raw.year = 2008 AND raw.month = 12 AND raw.day = 31;`

Well, well, special, special.  Let’s turn it to 11…

`select weather_station.id,
weather_station.call_sign,
weather_station.country_code,
weather_station.name,
ranked_temp.avg_temp from
(
SELECT wsid, year, month, day, daily.avg_temp,
dense_rank() OVER (PARTITION BY wsid order by avg_temp desc) as rank
FROM
(select wsid, year, month, day, avg(temperature) as avg_temp
FROM raw_weather_data
group by wsid, year, month,day ) daily
) ranked_temp
JOIN weather_station on ranked_temp.wsid = weather_station.id
where rank <= 5;`

7. JDBC / ODBC server tab in SPARK UI

Once again, the Spark UI is a valuable resource for us.  Now, that we’ve run a few queries, let’s take a look at what we can see…

Open `http://localhost:4040/` in your browser.  You should be redirected to /jobs

If you started Spark Thrift Server with a – master argument and pointed a cluster, then you can open http://localhost:8080 and get to this screen as well.

Spark Thrift Server in the Spark UI
Spark Thrift Server in the Spark UI

Notice the JDBC/ODBC tab.  That’s the Thrift Server part.  You did that.  Check things out.  You can see the query plans, details of the Spark jobs such as stages and tasks, how did shuffling go?  etc. etc.

8. Small Celebration

Let’s sing

“Let’s celebrate, it’s all right… we gonna have a good time tonight”  — Kool and the Gang

Sing it with me.  Don’t be shy.

Spark Thrift Server Conclusion

Hopefully, this Apache Spark Thrift Server tutorial helps get you started.  I’ve also put together a quick screencast of me going through these steps above in case it helps.  See the Screencast section below.

If you have any questions or ideas for corrections, let me know in the comments below.

Also, you may be interested in the Spark Thrift Server performance testing tutorial.

Update November 2017

I intertwined two different data sources in the above post.  When I originally wrote this, I had a keyspace called ‘isd_weather_data’ from the killrweather reference application.  But, in this tutorial, I used `cdm` to create and load a keyspace called `killrweather`.  You can see this when looking at how the tables are created in the above when referencing `isd_weather_data`.  This shouldn’t have been there.  When using `cdm` as described in this tutorial, replace `isd_weather_data` with `killrweather`.  And not only that…. the data appears different between the two sources.  So, the `join` examples produce results when using isd_weather_data and none when using killrweather.  I’m presuming differences between  https://github.com/killrweather/killrweather/blob/master/data/weather_stations.csv and https://github.com/killrweather/killrweather-data/blob/master/data/weather_station.csv is the root cause of no results with joins.  For example, CSV contains weather_station id `725030:14732`.  Sorry if this causes any confusion.

Spark Thrift Server References

Image credit https://flic.kr/p/4ykKnp

Spark Submit Command Line Arguments

Spark Command Line Arguments in Scala

The primary reason why we want to use Spark submit command line arguments is to avoid hard-coding values into our code. As we know, hard-coding should be avoided because it makes our application more rigid and less flexible.

For example, let’s assume we want to run our Spark job in both test and production environments. Let’s further assume that our Spark job reads from a Cassandra database and the databases between test and production are different. In this example, we should prefer using dynamic configuration values when submitting the job to test vs production environments.  The alternative is hard-coding the Cassandra host connection in code, recompiling and deploying.  This approach wastes time.

So, how do we process Spark submit command line arguments in your Scala code?  I would think this would be easy by now. But, I’ve been surprised at how difficult this can be when people are new to Scala and Spark.

(Reference: For more info on deploying with spark-submit to a spark cluster and with spark and 3rd party jars)

Of course, one way to achieve command line arg parsing is querying the String Array which is part of your `main` function. For example, perhaps your code looks similar to:

def main(args: Array[String]) {

  val conf = new SparkConf().setAppName("SparkCassandraExampleApp")

  if (args.length > 0) conf.setMaster(args(0))

  if (args.length >= 1) conf.set("spark.cassandra.connection.host", args(1))

...

But this isn’t good.  It’s brittle because we are not using default values and `args` values depend on a specific ordering.  Let’s address these two weaknesses in our solution.

In this tutorial, I’ll present a simple example of a flexible and scalable way to process command-line args in your Scala based Spark jobs. We’re going to use `scopt` library [1] and update our previous Spark with Cassandra example.

To update our previous Spark Cassandra example to use command-line arguments, we’re going to update two areas of the project: the SBT build file and our code.

Step 1 Update SBT Build File for Scopt Command Line Option Parsing Library

Easy.  Essential from:

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"
)

to:

libraryDependencies ++= Seq(
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0",
   "com.github.scopt" %% "scopt" % "3.5.0"
)

Step 2 Update Spark Scala code to process command line args

There are multiple areas of the code we need to update.  First, we’re going to update our code to use a case class to represent command-line options and utilize the `scopt` library to process.

A case class to hold and reference our command line args:

case class CommandLineArgs (
  cassandra: String = "", // required
  keyspace: String = "gameofthrones", // default is gameofthrones
  limit: Int = 10
)

Next, let’s update the code to use this case class and set possible values from the command line

val parser = new scopt.OptionParser[CommandLineArgs]("spark-cassandra-example") {
  head("spark-cassandra-example", "1.0")
  opt[String]('c', "cassandra").required().valueName("<cassandra-host>").
    action((x, c) => c.copy(cassandra = x)).
    text("Setting cassandra is required")
  opt[String]('k', "keyspace").action( (x, c) =>
    c.copy(keyspace = x) ).text("keyspace is a string with a default of `gameofthrones`")
  opt[Int]('l', "limit").action( (x, c) =>
    c.copy(limit = x) ).text("limit is an integer with default of 10")
}

So, there are a few interesting parts in this code.  Can you tell which command line variable is required?  Can you tell which requires an Int vs. String?  Sure, you can.  I have confidence in you.

Ok, finally we need to update the code to make a decision based on whether the command line arg parsing succeeded or not.  The way we do that is through a pattern match as shown here

parser.parse(args, CommandLineArgs()) match {

  case Some(config) =>
  // do stuff
  case None => // failed

}

In this example, `config` is our `CommandLineArgs` case class which is available on success.  If the command line arg parsing succeeded, our code will enter the `Some(config)` match and “do stuff”.  There we can use the vars such as `config.keyspace`.

The alternative match is `None`.  In this match, we know the command line arg parsing failed, so the code should exit.

That’s it.  Hopefully, this simple example helps you move ahead with Spark command line argument parsing and usage in Scala.  For more details and exploring more options using `scopt` check out the site via reference link below or let me know if you have any questions or comments.

For the complete updated example, see this commit or cut-and-paste https://github.com/tmcgrath/spark-scala/blob/5f286e6543c87ff3d0cd64ade55f85ac32939007/got-battles/src/main/scala/com/supergloo/SparkCassandra.scala

Spark Submit Command Line References

[1] For more on `scopt` checkout https://github.com/scopt/scopt

Don’t miss other Spark Tutorials on Spark Scala Tutorials

Featured image credit https://flic.kr/p/QFCGLQ

Spark RDD – A Two Minute Guide for Beginners

spark rdd

What is Spark RDD?

Spark RDD is short for Apache Spark Resilient Distributed Dataset.  A Spark Resilient Distributed Dataset is often shortened to simply RDD.  RDDs are a foundational component of the Apache Spark large scale data processing framework.

Spark RDDs are an immutable, fault-tolerant, and possibly distributed collection of data elements.  RDDs may be operated on in parallel across a cluster of computer nodes.  To operate in parallel, RDDs are divided into logical partitions.  Partitions are computed on different nodes of the cluster through Spark Transformation APIs. RDDs may contain a type of Python, Java, or Scala objects, including user-defined classes.

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value by performing a computation on the RDD.

How are Spark RDDs created?

Spark RDDs are created through the use of Spark Transformation functions.  Transformation functions create new RDDs from a variety of sources; e.g. textFile function from a local filesystem, Amazon S3 or Hadoop’s HDFS.  Transformation functions may also be used to create new RDDs from previously created RDDs.  For example, an RDD of all the customers from only North America could be constructed from an RDD of all customers throughout the world.

In addition to loading text files from file systems, RDDs may be created from external storage systems such as JDBC databases such as mySQL, HBase, Hive, Casandra or any data source compatible with Hadoop Input Format.

RDDs are also created and manipulated when using Spark modules such as Spark Streaming and Spark MLlib.

Why Spark RDD?

Spark makes use of data abstraction through RDDs to achieve faster and more efficient performance than Hadoop’s MapReduce.

RDDs support in-memory processing.  Accessing data from memory is 10 to 100 times faster than accessing data from a network or disk.  Data access from disk often occurs in Hadoop’s MapReduce-based processing.

In addition to performance gains, working through an abstraction layer provides a convenient and consistent way for developers and engineers to work with a variety of data sets.

When to use Spark RDDs?

RDDs are utilized to perform computations on an RDD dataset through Spark Actions such as a count or reduce when answering questions such as “how many times did xyz happen?” or “how many times did xyz happen by location?”

Often, RDDs are transformed into new RDDs in order to better prepare datasets for future processing downstream in the processing pipeline.  To reuse a previous example, let’s say you want to examine North America customer data and you have an RDD of all worldwide customers in memory.  It could be beneficial from a performance perspective to create a new RDD for North America only customers instead of using the much larger RDD of all worldwide customers.

Depending on the Spark operating environment and RDD size, RDDs should be cached (via cache function) or persisted to disk when there is an expectation for the RDD to be utilized more than once.

Conclusion Resources

Scala Transformation API examples

Python Transformation API examples

Hadoop Input Format API docs

Spark Tutorial Landing page

Featured Image credit https://flic.kr/p/7TqgUV