PySpark action functions produce a computed value back to the Spark driver program. This is different from PySpark transformation functions which produce RDDs, DataFrames or DataSets in results. For example, an action function such as count will produce a result back to the Spark driver while a collect transformation function will not. These may seem easy at first but how actions are computed include some performance characteristic subtleties which you need to know.
PySarpk Actions trigger any previously constructed Spark transformations to be initialized. Recall transformations are lazy initialized Spark data abstractions such as RDDs and DataFrames. Any call to a Spark action will result in these data abstractions in the Spark directed acyclic graph (DAG) to be evaluated.
If you are just starting with PySpark actions, go through each of these examples to start to build your confidence.
PySpark Examples of Action Functions
- PySpark Actions Further References
Aggregate the elements of a dataset through func
>>> names1 = sc.parallelize(["abe", "abby", "apple"]) >>> print names1.reduce(lambda t1, t2: t1+t2) abeabbyapple >>> names2 = sc.parallelize(["apple", "beatty", "beatrice"]).map(lambda a: [a, len(a)]) >>> print names2.collect() [['apple', 5], ['beatty', 6], ['beatrice', 8]] >>> names2.flatMap(lambda t: [t]).reduce(lambda t1, t2: t1+t2) 19
collect returns the elements of the RDD back to the driver program.
collect is often used in previously provided examples such as Spark Transformation Examples in Python in order to show the values of the return. PySpark, for example, will print the values of the array back to the console. This can be helpful in debugging programs.
>>> sc.parallelize([1,2,3]).flatMap(lambda x: [x,x,x]).collect() [1, 1, 1, 2, 2, 2, 3, 3, 3]
Number of elements in the RDD
>>> names1 = sc.parallelize(["abe", "abby", "apple"]) >>> names1.count() 3
Return the first element in the RDD
>>> names1 = sc.parallelize(["abe", "abby", "apple"]) >>> names1.first() 'abe'
Take the first n elements of the RDD.
Works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
Translated from the Scala implementation in RDD#take().
Can be much more convenient and economical to use take instead of collect to inspect a very large RDD
>>> names1 = sc.parallelize(["abe", "abby", "apple"]) >>> names1.take(2) ['abe', 'abby']
Similar to take, in return size of n. Includes boolean option of with or without replacement and random generator seed which defaults to None
>>> teams = sc.parallelize(("twins", "brewers", "cubs", "white sox", "indians", "bad news bears")) >>> teams.takeSample(True, 3) ['brewers', 'brewers', 'twins'] # run a few times to see different results
Count the number of elements for each key, and return the result to the master as a dictionary.
>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild")) >>> hockeyTeams.map(lambda k: (k,1)).countByKey().items() [('red wings', 1), ('oilers', 1), ('blackhawks', 1), ('jets', 1), ('wild', 3), ('whalers', 1)]
Save RDD as text file, using string representations of elements.
>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild")) >>> hockeyTeams.map(lambda k: (k,1)).countByKey().items() >>> hockeyTeams.saveAsTextFile("hockey_teams.txt")
$ ls hockey_teams.txt/ _SUCCESS part-00001 part-00003 part-00005 part-00007 part-00000 part-00002 part-00004 part-00006
So, you’ll see each partition is written to it’s own file. I have 8 partitions in dataset example here.
PySpark Actions Further References
For more the Spark tutorial landing page and especially PySpark tutorials.
Featured Image in Post: https://flic.kr/p/ajgHPF