What is Apache Spark? Deconstructing the Building Blocks Part 1

Becoming productive with Apache Spark requires an understanding of just a few fundamental elements.  In this post, the building blocks of Apache Spark will be covered quickly and backed with real-world examples.

The intention is for readers to understand basic Spark concepts.  It assumes you are familiar with installing software and unzipping files.  Later posts will deeper dive into Apache Spark and example use cases.

Spark computations can be called via Scala, Python or Java.  This post will use Scala on a Mac, but can be easily translated to other languages and operating systems.

If you have any questions or comments, please leave a comment at the bottom of this post.

 

I. Overview

Let’s dive into code and working examples first.  Then, we’ll describe Spark concepts and tie back to the examples.

Requirements

* Java 6 or newer installed

* Download NY State Baby Names in CSV format from: http://www.healthdata.gov/dataset/baby-names-beginning-2007.  (I renamed the csv file to baby_names.csv)

 

I. Code examples

The CSV file we will use has following structure:

Year,First Name,County,Sex,Count
2012,DOMINIC,CAYUGA,M,6
2012,ADDISON,ONONDAGA,F,14
2012,JULIA,ONONDAGA,F,15

 

Let’s start our Spark and Scala journey with a working example.

Steps

  1. Download from http://spark.apache.org/downloads.html.  Select the “Pre-built package for Hadoop 2.4”
  2. Unpack it.  (tar, zip, etc.)
  3. From terminal, run the spark-shell via: bin/spark-shell

Let’s run some code

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[1] at textFile at <console>:12

scala> babyNames.count
res0: Long = 35218

scala> babyNames.first()
res177: String = Year,First Name,County,Sex,Count

So, we know there are 35218 rows in the CSV

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[17] at map at <console>:14

scala> rows.map(row => row(2)).distinct.count
res22: Long = 62

Let’s convert the CSV to an Array of Strings.  Then, determine there are 62 unique NY State counties over the years of data collect in the CSV.

 

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))
davidRows: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[24] at filter at <console>:16

scala> davidRows.count
res32: Long = 136

There are 136 rows containing the name “DAVID”.

scala> davidRows.filter(row => row(4).toInt > 10).count()
res41: Long = 89

Number of rows where “NAME” DAVID has a “Count” greater than 10

scala> davidRows.filter(row => row(4).toInt > 10).map( r => r(2) ).distinct.count
res57: Long = 17

17 unique counties which have had the name DAVID over 10 times in a given year

scala> val names = rows.map(name => (name(1),1))
names: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[342] at map at <console>:16

scala> names.reduceByKey((a,b) => a + b).sortBy(_._2).foreach ( println _) // shows number lines each name appears in file, because how names was created with rows.map(name => (name(1), 1)); Jacob appears most often, but is actually not the most popular by total count.


scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).foreach (println _)

The output from the last foreach loop hasn’t been shown, but Michael (9187 times) followed by Matthew (7891) and Jaden (7807) have been the top 3 most popular name in NY from years 2007 through 2012.  Also, the first row of the CSV needed to be discarded in order to avoid NumberFormatException

If your results do not show Michael 9187 times, try re-running the last command numerous times.  The println _ function  being called from foreach will only print out one partition of the RDD and there is always a minimum of 2 partitions in a Spark RDD.  To ensure the entire RDD is printed, send to collect first:

filteredRows.map ( n => (n(1), n(4).toInt)).reduceByKey((a,b) => a + b).sortBy(_._2).collect.foreach (println _)

I. Overview

The remainder of this post will cover Spark Core concepts.  Spark Core is what makes all other aspects of the Spark ecosystem possible.   Later posts will cover other aspecs of Spark ecosystem including Spark SQL, Spark Streaming, MLib and GraphX.

 

II. Spark Context and Resilient Distributed Datasets

The way to interact with Spark is via a SparkContext.  The example used the Spark Console which provides a SparkContext automatically.  Did you notice the last line in the REPL?

06:32:25 INFO SparkILoop: Created spark context..
Spark context available as sc.

That’s how we’re able to use sc from within our example.

After obtaining a SparkContext, developers interact with Spark via Resilient Distributed Datasets.

Resilient Distributed Datasets (RDDs) are a immutable, distributed collection of elements.  These collections may be parallelized across a cluster.  RDDs are loaded from an external data set or created via a SparkContext.  We’ll cover both of these scenarios.

In the previous example, we create a RDD via:

scala> val babyNames = sc.textFile("baby_names.csv")

We also created RDDs other ways as well, which we’ll cover a bit later.

When utilizing Spark, you will be doing one of two primary interactions: creating new RDDs or transforming existing RDDs to compute a result.  The next section describes these two Spark interactions.

 

III Actions and Transformations

When working with a Spark RDDs, there are two available operations: actions or transformations.  An action is an execution which produces a result.  Examples of actions in previous are count, first.

Example Spark Actions

babyNames.count() // number of lines in the CSV file
babyNames.first() // first line of CSV

Transformations create new RDDs using existing RDDs.  We created a variety of RDDs in our example:

scala> val rows = babyNames.map(line => line.split(","))

scala> val davidRows = rows.filter(row => row(1).contains("DAVID"))

 

IV. Conclusion and Looking Ahead

In this post, we covered the fundamentals for being productive with Apache Spark.  As you witnessed, there are just a few Spark concepts to know before being able to be productive.  What do you think of Scala?  To many, the use of Scala is more intimidating than Spark itself.  Sometimes choosing to use Spark is a way to bolster your Scala-fu as well.

In later posts, we’ll write and run code outside of the REPL.  We’ve dive deeper into Apache Spark and pick up some more Scala along the way as well.

Comments welcome.

7 thoughts on “What is Apache Spark? Deconstructing the Building Blocks Part 1

  1. Excellent . It helped me a lot to understand the basics of Spark. Thank you for taking your time and coming up with nice articles.

  2. Thank you, very nice tutorial. I have one question though When i try to take distinct count i am getting ArrayIndexOutOfBoundsException for this below statement.

    rows.map(row => row(2)).distinct.count

    Can you please help in understanding what could be the issue ? Thanks

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.