PySpark Examples of Actions


PySpark actions produce a computed value back to the Spark driver program.  This is different from PySpark transformation functions which produce RDDs, DataFrames or DataSets in results.  For example, an action function such as count will produce a result back to the Spark driver while a collect transformation function will not. 

These may seem easy at first but how actions are computed include some performance characteristic subtleties which you need to know.

PySpark actions trigger the computation of any previously constructed PySpark transformations to be initialized.  Recall transformations are lazy initialized Spark data abstractions such as RDDs and DataFrames.  Any call to a Spark action will result in these data abstractions in the Spark directed acyclic graph (DAG) to be evaluated.

If you are just starting with PySpark actions, go through each of these examples to start building your confidence.

PySpark Examples of Action Functions

Running the PySpark Examples

All the following examples, can be executed in the PySpark shell. For example, I’ve downloaded and extracted Apache Spark and now call the pyspark script located in the bin directory:

$ bin/pyspark
Python 3.9.6 (default, Oct 18 2022, 12:41:40)
[Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/08 06:58:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Python version 3.9.6 (default, Oct 18 2022 12:41:40)
Spark context Web UI available at http://192.168.1.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1683547103667).
SparkSession available as 'spark'.
>>>

reduce

Aggregate the elements of a dataset through a function as shown

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> print(names1.reduce(lambda t1, t2: t1+t2))
abeabbyapple

>>> names2 = sc.parallelize(["apple", "beatty", "beatrice"]).map(lambda a: [a, len(a)])
>>> print(names2.collect())
[['apple', 5], ['beatty', 6], ['beatrice', 8]]

>>> names2.flatMap(lambda t: [t[1]]).reduce(lambda t1, t2: t1+t2)
19

Shown above the lambda function being passed adds or combines the previous value with the current value in the element list.

collect

collect returns the elements of the RDD back to the driver program.

collect is often used in previously provided examples such as Spark Transformation Examples in Python in order to show the values of the return.  PySpark, for example, will print the values of the array back to the console.  This can be helpful in debugging programs.

Examples

>>> sc.parallelize([1,2,3]).flatMap(lambda x: [x,x,x]).collect()
[1, 1, 1, 2, 2, 2, 3, 3, 3]

Back to Top

count

Number of elements in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.count()
3

Back to Top

first

Return the first element in the RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.first()
'abe'

take

Take the first n elements of the RDD.

Works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Translated from the Scala implementation in RDD#take().

Can be much more convenient and economical to use take instead of collect to inspect a very large RDD

>>> names1 = sc.parallelize(["abe", "abby", "apple"])
>>> names1.take(2)
['abe', 'abby']

takeSample

Similar to take, in return size of n.  Includes boolean option  of with or without replacement and random generator seed which defaults to None

>>> teams = sc.parallelize(("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
>>> teams.takeSample(True, 3)
['brewers', 'brewers', 'twins']
# run a few times to see different results

countByKey

Count the number of elements for each key, and return the result to the master as a dictionary.

>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
[('red wings', 1),
 ('oilers', 1),
 ('blackhawks', 1),
 ('jets', 1),
 ('wild', 3),
 ('whalers', 1)]

saveAsTextFile

Save RDD as text file, using string representations of elements.

Parameters:
  • path – path to file
  • compressionCodecClass – (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
>>> hockeyTeams = sc.parallelize(("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
>>> hockeyTeams.map(lambda k: (k,1)).countByKey().items()
>>> hockeyTeams.saveAsTextFile("hockey_teams.txt")

Produces:

$ ls hockey_teams.txt/
_SUCCESS	part-00001	part-00003	part-00005	part-00007
part-00000	part-00002	part-00004	part-00006

So, you’ll see each partition is written to it’s own file.  I have 8 partitions in dataset example here.

PySpark Actions Further References

See also  PySpark DataFrames by Example
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment