PySpark Action Examples

Apache Spark Action Examples in Python

PySpark action functions produce a computed value back to the Spark driver program.  This is different from PySpark transformation functions which produce RDDs, DataFrames or DataSets in results.  For example, an action function such as count will produce a result back to the Spark driver while a collect transformation function will not.  These may seem easy at first but how actions are computed include some performance characteristic subtleties which you need to know.

PySarpk Actions trigger any previously constructed Spark transformations to be initialized.  Recall transformations are lazy initialized Spark data abstractions such as RDDs and DataFrames.  Any call to a Spark action will result in these data abstractions in the Spark directed acyclic graph (DAG) to be evaluated.

If you are just starting with PySpark actions, go through each of these examples to start to build your confidence.

PySpark Examples of Action Functions

reduce

Aggregate the elements of a dataset through func

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> print names1.reduce(lambda t1, t2: t1+t2)
abeabbyapple

>>> names2 = sc.parallelize(["apple", "beatty", "beatrice"]).map(lambda a: [a, len(a)])
>>> print names2.collect()
[['apple', 5], ['beatty', 6], ['beatrice', 8]]

>>> names2.flatMap(lambda t: [t[1]]).reduce(lambda t1, t2: t1+t2)
19

collect

collect returns the elements of the RDD back to the driver program.

collect is often used in previously provided examples such as Spark Transformation Examples in Python in order to show the values of the return.  PySpark, for example, will print the values of the array back to the console.  This can be helpful in debugging programs.

Examples

>>> sc.parallelize([1,2,3]).flatMap(lambda x: [x,x,x]).collect()
[1, 1, 1, 2, 2, 2, 3, 3, 3]

Back to Top

See also  Apache Spark and ipython notebook - The Easy Way

count

Number of elements in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.count()
3

Back to Top

first

Return the first element in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.first()
'abe'

take

Take the first n elements of the RDD.

Works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Can be much more convenient and economical to use take instead of collect to inspect a very large RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.take(2)
['abe', 'abby']

takeSample

Similar to take, in return size of n.  Includes boolean option  of with or without replacement and random generator seed which defaults to None

>>> teams = sc.parallelize(("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
>>> teams.takeSample(True, 3)
['brewers', 'brewers', 'twins']
# run a few times to see different results

countByKey

Count the number of elements for each key, and return the result to the master as a dictionary.

>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
[('red wings', 1),
 ('oilers', 1),
 ('blackhawks', 1),
 ('jets', 1),
 ('wild', 3),
 ('whalers', 1)]

saveAsTextFile

Save RDD as text file, using string representations of elements.

Parameters:
  • path – path to file
  • compressionCodecClass – (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
>>> hockeyTeams.saveAsTextFile("hockey_teams.txt")

Produces:

$ ls hockey_teams.txt/
_SUCCESS	part-00001	part-00003	part-00005	part-00007
part-00000	part-00002	part-00004	part-00006

So, you’ll see each partition is written to it’s own file.  I have 8 partitions in dataset example here.

PySpark Actions Further References

ipython notebook

See also  How to Deploy Python Programs to a Spark Cluster
https://github.com/tmcgrath/spark-with-python-course/blob/master/Apache%20Spark%20Action%20Examples%20with%20Python.ipynb

For more the Spark tutorial landing page and especially PySpark tutorials.

Featured Image in Post: https://flic.kr/p/ajgHPF

Leave a Reply

Your email address will not be published. Required fields are marked *