Apache Spark Action Examples in Python

Apache Spark Action Examples in Python

Apache Spark Action Examples in Python

As you may have learned in other apache spark tutorials on this site, action functions produce a computed value back to the Spark driver program.  This is unlike Transformations which produce RDDs, DataFrames or DataSets.  For example, an action function such as countwill produce a result back to the Spark driver.  These may seem easy at first but how actions are computed include some performance characteristic subtleties which you need to know.

Actions trigger any previously constructed Spark transformations to be initialized.  Recall transformations are lazy initialized Spark data abstractions such as RDDs and DataFrames.  Any call to a Spark action will result in these data abstractions in the Spark directed acyclic graph to be evaluated.

An ipython notebook file of all these examples is available in Reference section of this page.


reduce
collect
count
first
take
takeSample
countByKey
saveAsTextFile

reduce(func)

Aggregate the elements of a dataset through func

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> print names1.reduce(lambda t1, t2: t1+t2)
abeabbyapple

>>> names2 = sc.parallelize(["apple", "beatty", "beatrice"]).map(lambda a: [a, len(a)])
>>> print names2.collect()
[['apple', 5], ['beatty', 6], ['beatrice', 8]]

>>> names2.flatMap(lambda t: [t[1]]).reduce(lambda t1, t2: t1+t2)
19

Back to Top

collect(func)

collect returns the elements of the RDD back to the driver program.

collect is often used in previously provided examples such as Spark Transformation Examples in Python in order to show the values of the return.  Pyspark, for example, will print the values of the array back to the console.  This can be helpful in debugging programs.

Examples

>>> sc.parallelize([1,2,3]).flatMap(lambda x: [x,x,x]).collect()
[1, 1, 1, 2, 2, 2, 3, 3, 3]

Back to Top

count()

Number of elements in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.count()
3

Back to Top

first()

Return the first element in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.first()
'abe'

Back to Top

take(n)

Take the first n elements of the RDD.

Works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Can be much more convenient and economical to use take instead of collect to inspect a very large RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.take(2)
['abe', 'abby']

Back to Top

takeSample(withReplacement, n, seed=None)

Similar to take, in return size of n.  Includes boolean option  of with or without replacement and random generator seed which defaults to None

>>> teams = sc.parallelize(("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
>>> teams.takeSample(True, 3)
['brewers', 'brewers', 'twins']
# run a few times to see different results

Back to Top

countByKey()

Count the number of elements for each key, and return the result to the master as a dictionary.

>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
[('red wings', 1),
 ('oilers', 1),
 ('blackhawks', 1),
 ('jets', 1),
 ('wild', 3),
 ('whalers', 1)]

Back to Top

saveAsTextFile(path, compressionCodecClass=None)

Save RDD as text file, using string representations of elements.

Parameters:
  • path – path to file
  • compressionCodecClass – (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
>>> hockeyTeams.saveAsTextFile("hockey_teams.txt")

Produces:

$ ls hockey_teams.txt/
_SUCCESS	part-00001	part-00003	part-00005	part-00007
part-00000	part-00002	part-00004	part-00006

So, you’ll see each partition is written to it’s own file.  I have 8 partitions in dataset example here.

Back to Top

References

ipython notebook

https://github.com/tmcgrath/spark-with-python-course/blob/master/Apache%20Spark%20Action%20Examples%20with%20Python.ipynb

For more the Spark tutorial landing page and especially Spark with Python tutorials.

Featured Image in Post: https://flic.kr/p/ajgHPF

Leave a Reply

Your email address will not be published. Required fields are marked *