How to Deploy Python Programs to a Spark Cluster

Python Program Deploy to Spark Cluster

After you have a Spark cluster running, how do you deploy Python programs to a Spark Cluster?

In this post, we’ll deploy a couple of examples of Spark Python programs. We’ll start with a simple example and then progress to more complicated examples which include utilizing spark-packages and Spark SQL.

Deploy Python programs to Spark Cluster Part 1

Ok, now that we’ve deployed a few examples as shown in the above screencast, let’s review a Python program which utilizes code we’ve already seen in this Spark with Python tutorials on this site. It’s a Python program which analyzes New York City Uber data using Spark SQL. The video will show the program in the Sublime Text editor, but you can use any editor you wish.

When deploying our driver program, we need to do things differently than we have while working with pyspark. For example, we need to obtain a SparkContext and SQLContext. We need to specify Python imports.

bin/spark-submit –master spark://todd-mcgraths-macbook-pro.local:7077 –packages com.databricks:spark-csv_2.10:1.3.0 uberstats.py Uber-Jan-Feb-FOIL.csv

Deploy Python program to Spark Cluster Part 2

Let’s return to the Spark UI now we have an available worker in the cluster and we have deployed some Python programs.

The Spark UI is the tool for Spark Cluster diagnostics, so we’ll review the key attributes of the tool.

Deploy Python program to Spark Cluster Part 3 – Spark UI

If you find these videos of deploying Python programs to an Apache Spark cluster interesting, you will find the entire Apache Spark with Python Course valuable.  Make sure to check it out.

Additional Spark Python Resources

Spark with Python Tutorials

Spark Tutorial

Featured Image credit https://flic.kr/p/bpd8Ht

Apache Spark with Python Quick Start

Apache Spark Python Tutorial

In this post, let’s cover Apache Spark with Python fundamentals to get you started and feeling comfortable about using Spark with Python.

The intention is for readers to understand basic Spark concepts through examples.  Later posts will deeper dive into Apache Spark fundamentals and example use cases.

Spark computations can be called via Scala, Python or Java.  There are numerous Spark with Scala examples on this site, but this post will focus on Python.

See Reference section at the bottom of this post for ipython notebook file.

I. Overview

Let’s start by using an example data science analysis scenario.  Do you know the company Uber?  We’re going to analyze New York City Uber data in this post first.  Then, we’ll use this example to describe Spark fundamental concepts.

Requirements

* Spark instance in Standalone or Cluster mode (more below)

* Download aggregated NYC Uber trip data in CSV format from: https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

I. Let’s Run Some Code

The CSV file we will use has following structure:

dispatching_base_number,date,active_vehicles,trips
B02512,1/1/2015,190,1132
B02765,1/1/2015,225,1765
B02764,1/1/2015,3427,29421
B02682,1/1/2015,945,7679

dispatching_base_number is the NYC Taxi and Limousine company code of the base that dispatched the Uber.   active_vehicles shows the number of active Uber vehicles for a particular date and company (base).  Trips is the number of trips for a particular base and date.

With this data, we can answer questions such as: what was the busiest dispatch base by trips for a particular day or entire month?  what day had the most active vehicles?  what days had the most trips sorted by most to fewest?  etc.

For more information see https://github.com/fivethirtyeight/uber-tlc-foil-response

Steps

  1. Download from http://spark.apache.org/downloads.html.  Select the “Pre-built package for Hadoop 2.4” if you haven’t already and unpack it.  (See Reference section below if you need help installing Spark.)
  2. From terminal in Spark home directory , run the Python Spark shell: bin/pyspark

Let’s run some code

>>> ut = sc.textFile("Uber-Jan-Feb-FOIL.csv")
>>> ut.count()
355                                                                             
>>> ut.first()
u'dispatching_base_number,date,active_vehicles,trips'

So, we know there are 355 rows in the CSV

>>> rows = ut.map(lambda line: line.split(","))
>>> rows.map(lambda row: row[0]).distinct().count()
7

In above, the Python code converted the CSV to a Resilient Distributed Dataset (RDD) by splitting each row in the source CSV file by a comma.  More on RDDs later.  Then, we used a Spark Transformation distinct and a Spark Action count to determine there are 7 unique values in the first column in the CSV.  Again, more on Spark Transformations and Actions later in this post.

>>> rows.map(lambda row: row[0]).distinct().collect()
[u'B02617', u'B02682', u'B02598', u'B02765', u'B02512', u'dispatching_base_number', u'B02764']
>>> rows.filter(lambda row: "B02617" in row).count()
59

There are 59 rows containing the trip data for TLC base company code “B02617”.

>>> base02617 = rows.filter(lambda row: "B02617" in row)
>>> base02617.filter(lambda row: int(row[3]) > 15000).count()
6

Number of rows where base02617 had more than 15000 trips in a day: 6.  Or, I should say this daily ratio is assumed.  Let’s confirm

>>> base02617.filter(lambda row: int(row[3]) > 15000).map(lambda day: day[1]).distinct().count()
6

Yes, it’s confirmed.  Let’s keep going…

>>> filteredRows = sc.textFile("Uber-Jan-Feb-FOIL.csv").filter(lambda line: "base" not in line).map(lambda line:line.split(","))
>>> filteredRows.map(lambda kp: (kp[0], int(kp[3])) ).reduceByKey(lambda k,v: k + v).collect()
[(u'B02617', 725025), (u'B02682', 662509), (u'B02598', 540791), (u'B02765', 193670), (u'B02512', 93786), (u'B02764', 1914449)]

So, we see the number of trips per base station.  But, it’s difficult to determine which base was busiest over the time frame in CSV?  Let’s make it easier to see:

>>> filteredRows.map(lambda kp: (kp[0], int(kp[3])) ).reduceByKey(lambda k,v: k + v).takeOrdered(10, key=lambda x: -x[1])
[(u'B02764', 1914449), (u'B02617', 725025), (u'B02682', 662509), (u'B02598', 540791), (u'B02765', 193670), (u'B02512', 93786)]

So, base B02764 was busiest by trip… by over 1 milion.

II. Next Steps

The remainder of this post will cover Spark Core concepts.  Spark Core is what makes all other aspects of the Spark ecosystem possible including Spark SQL, Spark Streaming, MLLib.

III. Spark Context and Resilient Distributed Datasets

The way to interact with Spark is via a SparkContext.  The example used the PySpark Console which provides a SparkContext automatically.  When you start pyspark, do you notice the last line ?

SparkContext available as sc, HiveContext available as sqlContext.
>>>

That’s how we’re able to use sc from within our example.

After obtaining a SparkContext, developers interact with Spark’s primary data abstraction called Resilient Distributed Datasets.

Resilient Distributed Datasets (RDDs) are an immutable, distributed collection of elements.  These collections may be parallelized across a cluster.  As we witnessed, RDDs are loaded from an external data set or created via a SparkContext.  We’ll cover both of these scenarios.

We created an RDD by loading in a CSV file:

>>> ut = sc.textFile("Uber-Jan-Feb-FOIL.csv")

We also created RDDs through Spark Transformations, which we’ll cover a bit later.

When utilizing Spark, you will be doing one of two primary interactions: creating new RDDs through transformations or using existing RDDs to compute a result such as distinct counts.  The next section describes these two Spark interactions.

IV Actions and Transformations

When working with a Spark RDDs, there are two available operations: actions or transformations.  An action is an execution which produces a result.  Examples of actions in previous are count, first.

Example Spark Actions in Python

ut.count() // number of lines in the CSV file
ut.first() // first line of CSV

Example Spark Transformations in Python

Transformations create new RDDs using existing RDDs.  We created a variety of RDDs in our example:

>>> rows = ut.map(lambda line: line.split(","))

>>> filteredRows = sc.textFile("Uber-Jan-Feb-FOIL.csv").filter(lambda line: "base" not in line).map(lambda line:line.split(","))

IV. Conclusion

In this post, we covered the fundamentals of being productive with Apache Spark in Python.  From here you are encouraged to dive further into Spark with Python including:

Spark Actions in Python Examples

Spark Transformations in Python Examples

All Spark Python Tutorials

V. Further Reference

ipython notebook of this Spark with Python Quickstart example

Setting up Ipython Notebook with Spark

What is Spark?

Featured image: https://flic.kr/p/cyWLHu

Connecting ipython notebook to an Apache Spark Cluster Quick Start

This post will cover how to connect an ipython notebook to two kinds of Spark Clusters: Spark Cluster running in Standalone mode and a Spark Cluster running on Amazon EC2.

Requirements

You need to have a Spark Cluster Standalone and Apache Spark Cluster running to complete this tutorial.  See the Background section of this post for further information and helpful references.

Connecting ipython notebook to an Apache Spark Standalone Cluster

Connecting to the Spark Cluster from ipython notebook is easy.  Simply set the master environment variable when calling pyspark, for example:

IPYTHON_OPTS=”notebook” ./bin/pyspark –master spark://todd-mcgraths-macbook-pro.local:7077

Run a version or some function off of sc.  There’s really know way I know of to programmatically determine if we are truly running ipython notebook against the Spark cluster.  But, we can verify from the Spark Web UI:

Connecting an ipython notebook to an Apache Spark Cluster running on EC2

Using pyspark against a remote cluster is just as easy.  Just pass in the appropriate URL to the –master argument.

IPYTHON_OPTS=”notebook” ./bin/pyspark –master spark://ec2-54-198-139-10.compute-1.amazonaws.com:7077

Conclusion

As you saw in this tutorial, connecting to a standalone cluster or spark cluster running on EC2 is essentially the same.  It’s easy.  The difficult part of connecting to a Spark cluster happens beforehand.  Check the next section on Background Information to help setup your Apache Spark Cluster and/or connection ipython notebook to a spark cluster.

Background Information or Possibly Helpful References

1) How to use ipython notebook with Spark: Apache Spark and ipython notebook – The Easy Way

2) Apache Spark Cluster in Standalone tutorial, you learned how to run a Spark Standalone cluster.  In addition, you learned how to connect the Scala console to utilize this cluster.

3) Running an Apache Spark Cluster on EC2

Featured Image: https://flic.kr/p/5dBco

Apache Spark Action Examples in Python

Apache Spark Action Examples in Python

Apache Spark Action Examples in Python

As you may have learned in other apache spark tutorials on this site, action functions produce a computed value back to the Spark driver program.  This is unlike Transformations which produce RDDs, DataFrames or DataSets.  For example, an action function such as countwill produce a result back to the Spark driver.  These may seem easy at first but how actions are computed include some performance characteristic subtleties which you need to know.

Actions trigger any previously constructed Spark transformations to be initialized.  Recall transformations are lazy initialized Spark data abstractions such as RDDs and DataFrames.  Any call to a Spark action will result in these data abstractions in the Spark directed acyclic graph to be evaluated.

An ipython notebook file of all these examples is available in Reference section of this page.


reduce
collect
count
first
take
takeSample
countByKey
saveAsTextFile

reduce(func)

Aggregate the elements of a dataset through func

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> print names1.reduce(lambda t1, t2: t1+t2)
abeabbyapple

>>> names2 = sc.parallelize(["apple", "beatty", "beatrice"]).map(lambda a: [a, len(a)])
>>> print names2.collect()
[['apple', 5], ['beatty', 6], ['beatrice', 8]]

>>> names2.flatMap(lambda t: [t[1]]).reduce(lambda t1, t2: t1+t2)
19

Back to Top

collect(func)

collect returns the elements of the RDD back to the driver program.

collect is often used in previously provided examples such as Spark Transformation Examples in Python in order to show the values of the return.  Pyspark, for example, will print the values of the array back to the console.  This can be helpful in debugging programs.

Examples

>>> sc.parallelize([1,2,3]).flatMap(lambda x: [x,x,x]).collect()
[1, 1, 1, 2, 2, 2, 3, 3, 3]

Back to Top

count()

Number of elements in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.count()
3

Back to Top

first()

Return the first element in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.first()
'abe'

Back to Top

take(n)

Take the first n elements of the RDD.

Works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Can be much more convenient and economical to use take instead of collect to inspect a very large RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.take(2)
['abe', 'abby']

Back to Top

takeSample(withReplacement, n, seed=None)

Similar to take, in return size of n.  Includes boolean option  of with or without replacement and random generator seed which defaults to None

>>> teams = sc.parallelize(("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
>>> teams.takeSample(True, 3)
['brewers', 'brewers', 'twins']
# run a few times to see different results

Back to Top

countByKey()

Count the number of elements for each key, and return the result to the master as a dictionary.

>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
[('red wings', 1),
 ('oilers', 1),
 ('blackhawks', 1),
 ('jets', 1),
 ('wild', 3),
 ('whalers', 1)]

Back to Top

saveAsTextFile(path, compressionCodecClass=None)

Save RDD as text file, using string representations of elements.

Parameters:
  • path – path to file
  • compressionCodecClass – (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
>>> hockeyTeams.saveAsTextFile("hockey_teams.txt")

Produces:

$ ls hockey_teams.txt/
_SUCCESS	part-00001	part-00003	part-00005	part-00007
part-00000	part-00002	part-00004	part-00006

So, you’ll see each partition is written to it’s own file.  I have 8 partitions in dataset example here.

Back to Top

References

ipython notebook

https://github.com/tmcgrath/spark-with-python-course/blob/master/Apache%20Spark%20Action%20Examples%20with%20Python.ipynb

For more the Spark tutorial landing page and especially Spark with Python tutorials.

Featured Image in Post: https://flic.kr/p/ajgHPF

Apache Spark Transformations in Python Examples

Spark Transformations with Python Examples

Apache Spark Transformations in Python

If you’ve read the previous Spark with Python tutorials on this site, you know that Spark Transformation functions produce a DataFrame, DataSet or Resilient Distributed Dataset (RDD).  Resilient distributed datasets are Spark’s main programming abstraction and RDDs are automatically parallelized across the cluster.  As Spark matured, this abstraction changed from RDDs to DataFrame to DataSets, but the underlying concept of a Spark transformation remains the same: transformations produce a new, lazily initialized abstraction for data set whether the underlying implementation is an RDD, DataFrame or DataSet.  See the Spark Tutorial landing page for more.

Note: as you would probably expect when using Python, RDDs can hold objects of multiple types because Python is dynamically typed.

In some Spark Transformation in Python examples below, a CSV file is loaded.  A snippet of this CSV file:

Year,First Name,County,Sex,Count

2012,DOMINIC,CAYUGA,M,62012,ADDISON,ONONDAGA,F,14

2012,ADDISON,ONONDAGA,F,14

2012,JULIA,ONONDAGA,F,15

For background information, See the Steps section of What is Apache Spark tutorial.

For ipython notebook and sample CSV file, see Reference section at end of this post.

Spark Python Transformations

map
flatMap
filter
mapPartitions
mapPartitionsWithIndex
sample
union
intersection
distinct
groupByKey
reduceByKey
aggregateByKey
sortByKey
join

map(func)

Map transformation returns a new RDD by applying a function to each element of this RDD

>>> baby_names = sc.textFile("baby_names.csv")
>>> rows = baby_names.map(lambda line: line.split(","))

So, in this transformation example, we’re creating a new RDD called “rows” by splitting every row in the baby_names RDD.  We accomplish this by mapping over every element in baby_names and passing in a lambda function to split by commas.

From here, we could use Python to access the array

>>> for row in rows.take(rows.count()): print(row[1])

First Name
DAVID
JAYDEN
...

Back to Top

flatMap(func)

flatMap is similar to map, because it applies a function to all elements in a RDD.  But, flatMap flattens the results.

Compare flatMap to map in the following

>>> sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x]).collect()
[2, 2, 2, 3, 3, 3, 4, 4, 4]

>>> sc.parallelize([1,2,3]).map(lambda x: [x,x,x]).collect()
[[1, 1, 1], [2, 2, 2], [3, 3, 3]]

This is helpful with nested datasets such as found in JSON.

Adding collect to flatMap and map results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use collect:

>>> sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x])
PythonRDD[36] at RDD at PythonRDD.scala:43

Back to Top

filter(func)

Create a new RDD bye returning only the elements that satisfy the search filter.  For SQL minded, think where clause.

>>> rows.filter(lambda line: "MICHAEL" in line).collect()
Out[36]:
[[u'2013', u'MICHAEL', u'QUEENS', u'M', u'155'],
 [u'2013', u'MICHAEL', u'KINGS', u'M', u'146'],
 [u'2013', u'MICHAEL', u'SUFFOLK', u'M', u'142']...

Back to Top

mapPartitions(func, preservesPartitioning=False)

Consider mapPartitions a tool for performance optimization if you have the resources available.  It won’t do much when running examples on your laptop.  It’s the same as “map”, but works with Spark RDD partitions which are distributed.  Remember the first D in RDD – Resilient Distributed Datasets.

In examples below that when using parallelize, elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

A distributed dataset can be operated on in parallel.

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.

>>> one_through_9 = range(1,10)
>>> parallel = sc.parallelize(one_through_9, 3)
>>> def f(iterator): yield sum(iterator)
>>> parallel.mapPartitions(f).collect()
[6, 15, 24]

>>> parallel = sc.parallelize(one_through_9)
>>> parallel.mapPartitions(f).collect()
[1, 2, 3, 4, 5, 6, 7, 17]

See what’s happening?  Results [6,15,24] are created because mapPartitions loops through 3 partitions which is the second argument to the sc.parallelize call.

Partion 1: 1+2+3 = 6

Partition 2: 4+5+6 = 15

Partition 3: 7+8+9 = 24

The second example produces [1,2,3,4,5,6,7,17] which I’m guessing means the default number of partitions on my laptop is 8.

Partion 1 = 1

Partition 2= 2

Partion 3 = 3

Partition 4 = 4

Partion 5 = 5

Partition 6 = 6

Partion 7 = 7

Partition 8: 8+9 = 17

Typically you want 2-4 partitions for each CPU core in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster or hardware based on standalone environment.

To find the default number of partitions and confirm the guess of 8 above:

>>> print sc.defaultParallelism
8

Back to Top

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides a function with an int value to indicate the index position of the partition.

>>> parallel = sc.parallelize(range(1,10),4)
>>> def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
>>> parallel.mapPartitionsWithIndex(show).collect()

['index: 0 values: 1',
 'index: 1 values: 3',
 'index: 2 values: 5',
 'index: 3 values: 7']

When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelized list with 3 slices, our output changes significantly:

>>> parallel = sc.parallelize(range(1,10),3)
>>> def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
>>> parallel.mapPartitionsWithIndex(show).collect()

['index: 0 values: [1, 2, 3]',
 'index: 1 values: [4, 5, 6]',
 'index: 2 values: [7, 8, 9]']

sample(withReplacement,fraction, seed)

Return a random sample subset RDD of the input RDD

>>> parallel = sc.parallelize(range(1,10))
>>> parallel.sample(True,.2).count()
2

>>> parallel.sample(True,.2).count()
1

>>> parallel.sample(True,.2).count()
2

sample(withReplacement, fraction, seed=None)

Parameters:
  • withReplacement – can elements be sampled multiple times (replaced when sampled out)
  • fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
  • seed – seed for the random number generator

Back to Top

union(a different rdd)

Simple.  Return the union of two RDDs

>>> one = sc.parallelize(range(1,10))
>>> two = sc.parallelize(range(10,21))
>>> one.union(two).collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

Back to Top

intersection(a different rdd)

Again, simple.  Similar to union but return the intersection of two RDDs

>>> one = sc.parallelize(range(1,10))
>>> two = sc.parallelize(range(5,15))
>>> one.intersection(two).collect()
[5, 6, 7, 8, 9]

Back to Top

distinct([numTasks])

Another simple one.  Return a new RDD with distinct elements within a source RDD

>>> parallel = sc.parallelize(range(1,9))
>>> par2 = sc.parallelize(range(5,15))

>>> parallel.union(par2).distinct().collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

Formal API: distinct(): RDD[T]

Back to Top

The Keys

The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key,value pair RDDs.

For the following, we’re going to use the baby_names.csv file again which was introduced in a previous post What is Apache Spark?

All the following examples presume the baby_names.csv file has been loaded and split such as:

>>> baby_names = sc.textFile("baby_names.csv")
>>> rows = baby_names.map(lambda line: line.split(","))

Back to Top

groupByKey([numTasks])

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. ”

The following groups all names to counties in which they appear over the years.

>>> rows = baby_names.map(lambda line: line.split(","))
>>> namesToCounties = rows.map(lambda n: (str(n[1]),str(n[2]) )).groupByKey()
>>> namesToCounties.map(lambda x : {x[0]: list(x[1])}).collect()

[{'GRIFFIN': ['ERIE',
   'ONONDAGA',
   'NEW YORK',
   'ERIE',
   'SUFFOLK',
   'MONROE',
   'NEW YORK',
...

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

reduceByKey(func, [numTasks])

Operates on key, value pairs again, but the func must be of type (V,V) => V

Let’s sum the yearly name counts over the years in the CSV.  Notice we need to filter out the header row.  Also notice we are going to use the “Count” column value (n[4])

>>> filtered_rows = baby_names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))
>>> filtered_rows.map(lambda n:  (str(n[1]), int(n[4]) ) ).reduceByKey(lambda v1,v2: v1 + v2).collect()

[('GRIFFIN', 268),
 ('KALEB', 172),
 ('JOHNNY', 219),
 ('SAGE', 5),
 ('MIKE', 40),
 ('NAYELI', 44),
....

Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Ok, I admit, this one drives me a bit nuts.  Why wouldn’t we just use reduceByKey?  I don’t feel smart enough to know when to use aggregateByKey over reduceByKey.  For example, the same results may be produced as reduceByKey:

>>> filtered_rows = baby_names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))
>>> filtered_rows.map(lambda n:  (str(n[1]), int(n[4]) ) ).aggregateByKey(0, lambda k,v: int(v)+k, lambda v,k: k+v).collect()

[('GRIFFIN', 268),
 ('KALEB', 172),
 ('JOHNNY', 219),
 ('SAGE', 5),
...

And again,  the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

There’s a gist of aggregateByKey as well.

Back to Top

sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda>>)

This simply sorts the (K,V) pair by K.  Try it out. See examples above on where babyNames originates.

>>> filtered_rows.map (lambda n:  (str(n[1]), int(n[4]) ) ).sortByKey().collect()
[('AADEN', 18),
 ('AADEN', 11),
 ('AADEN', 10),
 ('AALIYAH', 50),
 ('AALIYAH', 44),
...

#opposite
>>> filtered_rows.map (lambda n:  (str(n[1]), int(n[4]) ) ).sortByKey(False).collect()

[('ZOIE', 5),
 ('ZOEY', 37),
 ('ZOEY', 32),
 ('ZOEY', 30),
...

Back to Top

join(otherDataset, [numTasks])

If you have relational database experience, this will be easy.  It’s joining of two datasets.  Other joins are available as well such as leftOuterJoin and rightOuterJoin.

>>> names1 = sc.parallelize(("abe", "abby", "apple")).map(lambda a: (a, 1))
>>> names2 = sc.parallelize(("apple", "beatty", "beatrice")).map(lambda a: (a, 1))
>>> names1.join(names2).collect()

[('apple', (1, 1))]

leftOuterJoin, rightOuterJoin

>>> names1.leftOuterJoin(names2).collect()
[('abe', (1, None)), ('apple', (1, 1)), ('abby', (1, None))]

>>> names1.rightOuterJoin(names2).collect()
[('apple', (1, 1)), ('beatrice', (None, 1)), ('beatty', (None, 1))]

Back to Top

References

The Spark ipython notebook is available

https://github.com/tmcgrath/spark-with-python-course/blob/master/Spark-Transformers-With-Spark.ipynb

This ipython notebook uses a scaled down CSV file

https://github.com/tmcgrath/spark-with-python-course/blob/master/data/baby_names_reduced.csv

Spark with Python tutorials

Featured Image Credit: https://flic.kr/p/96rcDC

Apache Spark and ipython notebook – The Easy Way

ipython-notebook-spark

Using ipython notebook with Apache Spark couldn’t be easier.  This post will cover how to use ipython notebook (jupyter) with Spark and why it is best choice when using python with Spark.

Requirements

This post assumes you have downloaded and extracted Apache Spark and you are running on a Mac or *nix.  If you are on Windows see http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/

ipython notebook with Apache Spark

I recommend the use the Python 2.7 Anaconda Python distribution which can be downloaded here https://www.continuum.io/downloads.  It contains more than 300 of the most popular python packages for science, math, engineering, and data analysis.  Also, future python spark tutorials and python spark examples will use this distribution.

After you have Anaconda installed, you should make sure that ipython notebook (Jupyter) is up to date. Run the following command in the Terminal (Mac/Linux) or Command Prompt (Windows):

conda update conda
conda update ipython

Ref: http://ipython.org/install.html in the section “I am getting started with Python” section

Launching ipython notebook with Apache Spark

1) In a terminal, go to the root of your Spark install and enter the following command

IPYTHON_OPTS=”notebook” ./bin/pyspark

A browser tab should launch and various output to your terminal window depending on your logging level.

What’s going on here with IPYTHON_OPTS command to pyspark?  Well, you can look at the source of bin/pyspark in a text editor.  This section

# Determine the Python executable to use for the driver:
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
  # If IPython options are specified, assume user wants to run IPython
  # (for backwards-compatibility)
  PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
  PYSPARK_DRIVER_PYTHON="ipython"
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
  PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi

Hopefully, this snippet makes sense.  If IPYTHON_OPTS is present, use ipython.

Verify Spark with ipython notebook

At this point, you should be able to create a new notebook and execute some python using the provided SparkContext.  For example:

print sc

or

print sc.version

Here’s a screencast of running ipython notebook with pyspark on my laptop

ipython notebook spark example

In this screencast, pay special attention to your terminal window log statements.  At the default log level of INFO, you should see the no errors in pyspark output.  Also, when you start a new notebook, the terminal should show SparkContext sc being available for use, such as

INFO SparkContext: Running Spark version

Why use ipython notebook with Spark?

1) Same reasons you use ipython notebook without Spark such as convenience, easy to share and execute notebooks, etc.

2) Code completion.  As the screencast shows, a python spark developer can hit the tab key for available functions or also known as code completion options.

Hope this helps, let me know if you have any questions.  To continue with Python in Spark, check out the Spark Transformations in Python and Spark Actions in Python tutorials.

Next tutorial with ipython is ipython with a Spark Cluster.