Spark actions are the operations which trigger a Spark job to compute and return a result to the Spark driver program or write data to an external storage system. Unlike Spark transformations, which only define a computation path but do not actually execute, actions force Spark to compute and produce a result.
In this Spark actions with Scala tutorial, let’s explore some of the most popular Spark actions in Scala. understanding Spark actions is essential for getting the most out of Apache Spark.
Spark actions produce a result back to the Spark Driver. Computing this result will trigger any of the RDDs, DataFrames or DataSets needed in order to produce the result. Recall from the previous Spark Transformations in Scala tutorial that transformation functions such as map, flatMap, and other transformations are used to create RDDs, DataFrames or DataSets are lazily initialized.
A call to a Spark action will trigger the RDDs, DataFrames or DataSets to be formed in memory or hydrated or initialized or you may hear folks say it other ways as well.
Please understand choosing to use the word “hydrated” is a fancy way to express this concept, and I advise to be wary of people who use fancy terms like “hydrated”.
Let’s not get fancy, let’s get practical.
Spark Actions Table of Contents
- Spark Action Examples Overview
- What are Spark Actions?
- How to Code Spark Actions
- Spark Action Code Examples
- Spark Actions with Scala Conclusion
- Spark Actions Further References
Spark Action Examples Overview
As mentioned elsewhere on this site, when you are new to Scala with Spark, you are encouraged to perform these examples in your environment.
This means, don’t just read through the source code examples. Rather, type them out on your keyboard.
Now, I don’t mean to sound bossy here, but typing these examples really will help you learn. Trust me, I’m a doctor.
Actually, I’m not a doctor, but adding “I’m a doctor” sounds better than just “Trust me”.
Once you are comfortable here, check out other free Scala with Spark tutorials and do let me know if you have any questions or suggestions.
What are Spark Actions?
Spark Actions differ from Spark Transformations, because actions trigger the execution of a Spark job.
Examples of Spark Actions include:
- count() – returns the number of elements in an RDD
- collect() – returns all the elements in an RDD as an array
- saveAsTextFile() – writes the elements of an RDD to a text file
- reduce() – aggregates the elements of an RDD using a specified function
We will be going through examples of these actions and many more below.
How to Code Spark Actions
Step 1: Create a SparkSession
To create Spark Actions, a SparkSession is required. For now, we’ll define SparkSession is the entry point to Spark functionality. An example of a SparkSession can be seen in the following code:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession object Example { def main(args: Array[String]): Unit = { // create a SparkSession with a local master and app name val spark = SparkSession.builder() .appName("SparkActions") .master("local[*]") .getOrCreate() // See Step 2 below // stop the SparkSession spark.stop() } }
This code creates a new SparkSession with the name “Spark Actions”. If a SparkSession with this name already exists, it will return that instead of creating a new one.
Step 2: Creating a DataFrame
Once we have a SparkSession, we can create a DataFrame (or RDD or DataSet). DataFrame is an abstraction for the underlying distributed collection of data. DataFrame is also organized into named columns.
For example, we create a DataFrame in the following Scala code:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession object Example { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("SparkActions") .master("local[*]") .getOrCreate() // New: create create a DataFrame from a sequence of numbers // with toDF from functions._ import val df = Seq((1), (2), (3), (4), (5)).toDF("number") // Step 3 next spark.stop() } }
Step 3: Applying Spark Actions
With a SparkSession and DataFrame now in place, we are ready to apply a Spark Action. A Spark actions will trigger the computation of a DataFrame(s) and return a result to the driver program or write data to an external storage system. Some examples of Spark Actions are:
show()
: displays the first n rows of a DataFramecount()
: returns the number of rows in a DataFramecollect()
: returns all the rows of a DataFrame as a list of Rowswrite()
: writes the contents of a DataFrame to an external storage system
Building upon our preceding examples, we can apply an action function such as sum
to our DataFrame using the following code:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object Example {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkActions")
.master("local[*]")
.getOrCreate()
val df = Seq((1), (2), (3), (4), (5)).toDF("number")
// Spark action example "agg"
// calculate the sum of the numbers using agg
val sum = df.agg(sum("number"))
// print the result
println(s"The sum is $sum")
spark.stop()
}
}
In this example, we created a DataFrame from a sequence of numbers using SparkSession
, and then use the agg
Spark transformation, followed by sum
action function to calculate the total sum of all the numbers. We then print it out.
Just a simple example as we could have used SparkSession
to create the DataFrame by reading from a data source, such as a CSV file in Scala.
Spark Action Code Examples
In all the following Spark action code examples, I’m going to use the spark-shell because it conveniently provides a SparkSession. I encourage you to try it as well.
In my environment, I downloaded a Spark distribution from https://spark.apache.org, extracted and opened a terminal window where I can call “spark-shell” shell script.
For example, my environment looks like this:
reduce
A Spark action used to aggregate the elements of a dataset through func
The reduce
Spark action function is used to aggregate the elements of an RDD or DataFrame by combining all the elements using a binary operator. It returns the result as a single value.
scala> val names1 = sc.parallelize(List("abe", "abby", "apple"))
names1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1467] at parallelize at <console>:12
scala> names1.reduce((t1,t2) => t1 + t2)
res778: String = abbyabeapple
scala> names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2)
res779: Int = 12
// another way to show
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1473] at map at <console>:14
scala> names2.flatMap(t => Array(t._2)).reduce(_ + _)
res783: Int = 19
DataFrame alternative using agg function
As we previously saw, when using DataFrame it may be more convenient to use agg
with sum
function instead of reduce
as shown in the following code
scala> val df = spark.range(10).toDF("num")
df: org.apache.spark.sql.DataFrame = [num: bigint]
scala> val sumDF = df.agg(sum("num"))
sumDF: org.apache.spark.sql.DataFrame = [sum(num): bigint]
scala> sumDF.show
+--------+
|sum(num)|
+--------+
| 45|
+--------+
collect
collect
is a Spark action which retrieves all RDD or DataFrame data back to the driver program as an array in Scala.
It is important to use collect
only on small datasets, as it can cause the driver program to run out of memory if the dataset is too large.
collect
should be used only when it is necessary to retrieve as it can significantly decrease the performance due to transferring all the data to the driver.
We’ll be fine using it in small batches from the shell though.
collect is often used in previously provided examples such as Spark Transformation Examples in order to show the values of the return. The REPL, for example, will print the values of the array back to the console. This can be helpful in debugging programs.
scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))
// or DataFrame version using df DataFrame from previous example
scala> df.collect().foreach(println)
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
Count can be used to determine the number of elements in a RDD (or DataFrame, DataSet).
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1476] at parallelize at <console>:12
scala> names2.count
res784: Long = 3
// or previous DataFame version
scala> df.count
res10: Long = 10
foreach
We already saw use of foreach
above in a Dataframe.
The foreach
function is used to apply a function to each element of an RDD or DataFrame. The function can be used for various purposes such as writing data to console/log, external systems or updating shared state.
Here’s a version of foreach
source code example using an RDD in Scala:
val spark = SparkSession.builder().appName("foreach-example").getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
rdd.foreach(element => {
println(element)
})
This code creates an RDD containing the integers 1 through 5, applies the printlin
function to the foreach
function to print out each element.
first
No fancy explanation needed as it simply returns the first element in the RDD or DataFrame or DataSet.
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1477] at parallelize at <console>:12
scala> names2.first
res785: String = apple
take
The take
function is used to retrieve the first n elements of an RDD or DataFrame as shown in the following code examples:
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1478] at parallelize at <console>:12
scala> names2.take(2)
res786: Array[String] = Array(apple, beatty)
takeSample
Similar to take, takeSample
action function returns a type of array with size of n. It also includes boolean option of with or without replacement and random generator seed as shown in the following examples:
scala> val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
teams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1482] at parallelize at <console>:12
scala> teams.takeSample(true, 3)
res789: Array[String] = Array(white sox, twins, white sox)
scala> teams.takeSample(true, 3)
res790: Array[String] = Array(cubs, white sox, twins)
countByKey
This is only available on RDDs of (K,V) and returns a hashmap of (K, count of K)
scala> val hockeyTeams = sc.parallelize(List("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
hockeyTeams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:12
scala> hockeyTeams.map(k => (k,1)).countByKey
res0: scala.collection.Map[String,Long] = Map(jets -> 1, blackhawks -> 1, red wings -> 1, oilers -> 1, whalers -> 1, wild -> 3)
CountByKey would have been helpful to show in most popular baby names spark example from an earlier post.
scala> val babyNamesToTotalCount = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
babyNamesToTotalCount: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[21] at map at <console>:12
scala> babyNamesToTotalCount.countByKey
res2: scala.collection.Map[String,Long] = Map(JADEN -> 65, KACPER -> 2, RACHEL -> 63, JORDYN -> 33, JANA -> 1, CESIA -> 1, IBRAHIM -> 22, LUIS -> 65, DESMOND -> 5, AMANI -> 6, ELIMELECH -> 7, LILA -> 39, NEYMAR -> 1, JOSUE -> 31, LEELA -> 1, DANNY -> 25, GARY -> 3, SIMA -> 10, GOLDY -> 14, SADIE -> 41, MARY -> 40, LINDSAY -> 10, JAKOB -> 2, AHARON -> 5, LEVI -> 39, MADISYN -> 3, HADASSAH -> 5, MALIA -> 10, ANTONIA -> 2, RAIZY -> 16, ISAIAS -> 1, AMINA -> 9, DECLAN -> 33, GILLIAN -> 1, ARYANA -> 1, GRIFFIN -> 25, BRYANNA -> 6, SEBASTIEN -> 1, JENCARLOS -> 1, ELSA -> 1, HANA -> 3, MASON -> 194, SAVANNA -> 6, ROWAN -> 6, DENNIS -> 15, JEROME -> 1, BROOKLYNN -> 2, MIRANDA -> 11, KYLER -> 1, HADLEY -> 2, STEPHANIE -> 46, CAMILA -> 45, MAKENNA -> 3, CARMINE -> 5, KATRINA -> 1, AMALIA -> 1, EN...
saveAsTextFile
Write out the elements of the data set as a text file in a filepath directory on the filesystem, HDFS or any other Hadoop-supported file system.
scala> val onlyInterestedIn = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
onlyInterestedIn: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[27] at map at <console>:12
scala> onlyInterestedIn.saveAsTextFile("results.csv")
Produces:
todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ ls -al results.csv/ total 824 drwxr-xr-x 8 toddmcgrath staff 272 Dec 3 06:53 . drwxr-xr-x@ 17 toddmcgrath staff 578 Dec 3 06:54 .. -rw-r--r-- 1 toddmcgrath staff 8 Dec 3 06:53 ._SUCCESS.crc -rw-r--r-- 1 toddmcgrath staff 1600 Dec 3 06:53 .part-00000.crc -rw-r--r-- 1 toddmcgrath staff 1588 Dec 3 06:53 .part-00001.crc -rw-r--r-- 1 toddmcgrath staff 0 Dec 3 06:53 _SUCCESS -rw-r--r-- 1 toddmcgrath staff 203775 Dec 3 06:53 part-00000 -rw-r--r-- 1 toddmcgrath staff 202120 Dec 3 06:53 part-00001
Spark Actions with Scala Conclusion
Throughout this tutorial, we explored different types of Spark actions in Scala. Through Spark action functions, we trigger the computation in Spark and retrieve the results back to the driver program.
We have seen how actions such as collect()
, count()
, and take()
can be used to retrieve data from an RDD, Dataframe, DataSet. We have also explored how reduce()
, fold()
, and aggregate()
can be used to perform aggregations on an RDD.
Furthermore, we have discussed how foreach()
and foreachPartition()
can be used to perform operations on each element of an RDD.
We have also explored how saveAsTextFile()
can be used to save to a file system.
Spark actions are an essential part of Spark programming, and you need solid understanding how to use them.
Did you type these examples above as I suggested? I hope so. And do let me know if you have any questions or suggestions for improvement.
Are you starting your journey in Spark with Scala and need more? Check out other Spark with Scala tutorials.
Spark Actions Further References
- Common Spark Actions listed in table format from Apache Spark Documentation https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
great post…
thank you , very useful post
Thank you for the simple explanation of RDD.
Great works!