What is Apache Spark?

What is Spark?

Becoming productive with Apache Spark requires an understanding of a few fundamental elements.  In this post, let’s explore the fundamentals or the building blocks of Apache Spark.  Let’s use descriptions and real-world examples in the exploration.

The intention is for you is to understand basic Spark concepts.  It assumes you are familiar with installing software and unzipping files.  Later posts will deeper dive into Apache Spark and example use cases.

The Spark API can be called via Scala, Python or Java.  This post will use Scala, but can be easily translated to other languages and operating systems.

If you have any questions or comments, please leave a comment at the bottom of this post.

I. Spark Overview

Let’s dive into code and working examples first.  Then, we’ll describe Spark concepts that tie back to the source code examples.

Requirements

* Java 6 or newer installed

* Download NY State Baby Names in CSV format from: http://www.healthdata.gov/dataset/baby-names-beginning-2007.  (I renamed the csv file to baby_names.csv)

I. Spark with Scala Code examples

The CSV file we will use has following structure:

Year,First Name,County,Sex,Count
2012,DOMINIC,CAYUGA,M,6
2012,ADDISON,ONONDAGA,F,14
2012,JULIA,ONONDAGA,F,15

Let’s start our Spark and Scala journey with a working example.

Steps

  1. Download from http://spark.apache.org/downloads.html.  Select the “Pre-built package for Hadoop 2.4”
  2. Unpack it.  (tar, zip, etc.)
  3. From the terminal, run the spark-shell via: bin/spark-shell.  After running spark-shell you should see a scala> prompt

Let’s run some code

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[1] at textFile at <console>:12

scala> babyNames.count
res0: Long = 35218

scala> babyNames.first()
res177: String = Year,First Name,County,Sex,Count

So, we know there are 35218 rows in the CSV

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[17] at map at <console>:14

scala> rows.map(row => row(2)).distinct.count
res22: Long = 62

Above, we converted each row in the CSV file to an Array of Strings.  Then, we determined there are 62 unique NY State counties over the years of data collected in the CSV.

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))
davidRows: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[24] at filter at <console>:16

scala> davidRows.count
res32: Long = 136

There are 136 rows containing the name “DAVID”.

scala> davidRows.filter(row => row(4).toInt > 10).count()
res41: Long = 89

Number of rows where “NAME” DAVID has a “Count” greater than 10

scala> davidRows.filter(row => row(4).toInt > 10).map( r => r(2) ).distinct.count
res57: Long = 17

17 unique counties which have had the name DAVID over 10 times in a given year

scala> val names = rows.map(name => (name(1),1))
names: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[342] at map at <console>:16

scala> names.reduceByKey((a,b) => a + b).sortBy(_._2).foreach ( println _) // shows number lines each name appears in file, because how names was created with rows.map(name => (name(1), 1)); Jacob appears most often, but is actually not the most popular by total count.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).foreach (println _)

The output from the last foreach loop hasn’t been shown, but Michael (9187 times) followed by Matthew (7891) and Jaden (7807) have been the top 3 most popular name in NY from years 2007 through 2012.  Also, the first row of the CSV needed to be discarded in order to avoid NumberFormatException

If your results do not show Michael 9187 times, try re-running the last command numerous times.  The `println _` function  being called from foreach will only print out one partition of the RDD and there is always a minimum of 2 partitions in a Spark RDD.  To ensure the entire RDD is printed, send to collect first:

filteredRows.map ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).collect.foreach (println _)

I. Spark Core Overview

The remainder of this post will cover Spark Core concepts.  Spark Core is what makes all other aspects of the Spark ecosystem possible.   Later posts will cover other aspects of the Spark ecosystem including Spark SQL, Spark Streaming, MLib and GraphX.  Be sure to check the list of other Spark Tutorials with Scala for the latest tutorials.

II. Spark Context and Resilient Distributed Datasets

The way to interact with Spark is via a SparkContext (or Spark Session in new versions of Spark).  The example used the Spark Console which provides a SparkContext automatically.  Did you notice the last line in the REPL?

06:32:25 INFO SparkILoop: Created spark context..
Spark context available as sc.

That’s how we’re able to use sc from within our example.

After obtaining a SparkContext, developers may interact with Spark via Resilient Distributed Datasets or RDDs.  (Update: this changes from RDDs to DataFrames and DataSets in later versions of Spark.)

Resilient Distributed Datasets (RDDs) are an immutable, distributed collection of elements.  For more information on Spark RDDs, see the Spark RDD tutorial in 2 minutes or less.

These collections may be parallelized across a cluster.  RDDs are loaded from an external data set or created via a SparkContext.  We’ll cover both of these scenarios.

In the previous example, we create a RDD via:

scala> val babyNames = sc.textFile("baby_names.csv")

We also created RDDs other ways as well, which we’ll cover a bit later.

When utilizing Spark, you will be doing one of two primary interactions: creating new RDDs or transforming existing RDDs to compute a result.  The next section describes these two Spark interactions.

III Spark Actions and Transformations

When working with a Spark RDDs, there are two available operations: actions or transformations.  An action is an execution which produces a result.  Examples of actions in previous are countand first.

Example of Spark Actions

babyNames.count() // number of lines in the CSV file
babyNames.first() // first line of CSV

For a more comprehensive list of Spark Actions, see the Spark Examples of Actions page.

Example of Spark Transformations

Transformations create new RDDs using existing RDDs.  We created a variety of RDDs in our example:

scala> val rows = babyNames.map(line => line.split(","))

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))

For a more comprehensive list of Spark Transformations, see the Spark Examples of Transformations page.

IV. Conclusion and Looking Ahead

In this post, we covered the fundamentals for being productive with Apache Spark.  As you witnessed, there are just a few Spark concepts to know before being able to be productive.  What do you think of Scala?  To many, the use of Scala is more intimidating than Spark itself.  Sometimes choosing to use Spark is a way to bolster your Scala-fu as well.

In later posts, we’ll write and run code outside of the REPL.  We’ve dive deeper into Apache Spark and pick up some more Scala along the way as well.

As mentioned above, for the most recent list of tutorials around Spark and Scala, be sure to bookmark the Spark with Scala tutorials page.

11 thoughts on “What is Apache Spark?

  1. Excellent . It helped me a lot to understand the basics of Spark. Thank you for taking your time and coming up with nice articles.

  2. First of all, Thanks a lot for putting together this Tutorial series!

    In the reduceByKey example, the sort order is ascending. To list the name with maximum counts, the dataset needs to be sorted in descending order, by specifying a boolean value false in the second argument, as shown below:
    filteredRows.map (n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2, false).foreach(println _)

  3. Thank you, very nice tutorial. I have one question though When i try to take distinct count i am getting ArrayIndexOutOfBoundsException for this below statement.

    rows.map(row => row(2)).distinct.count

    Can you please help in understanding what could be the issue ? Thanks

Leave a Reply

Your email address will not be published. Required fields are marked *