Beginning Spark Actions in Scala [9 Popular Examples]


Spark actions are the operations which trigger a Spark job to compute and return a result to the Spark driver program or write data to an external storage system. Unlike Spark transformations, which only define a computation path but do not actually execute, actions force Spark to compute and produce a result.

In this Spark actions with Scala tutorial, let’s explore some of the most popular Spark actions in Scala. understanding Spark actions is essential for getting the most out of Apache Spark.

Spark actions produce a result back to the Spark Driver.  Computing this result will trigger any of the RDDs, DataFrames or DataSets needed in order to produce the result.  Recall from the previous Spark Transformations in Scala tutorial that transformation functions such as map, flatMap, and other transformations are used to create RDDs, DataFrames or DataSets are lazily initialized.   

A call to a Spark action will trigger the RDDs, DataFrames or DataSets to be formed in memory or hydrated or initialized or you may hear folks say it other ways as well. 

Please understand choosing to use the word “hydrated” is a fancy way to express this concept, and I advise to be wary of people who use fancy terms like “hydrated”.

Let’s not get fancy, let’s get practical.

Spark Actions Table of Contents

Spark Action Examples Overview

As mentioned elsewhere on this site, when you are new to Scala with Spark, you are encouraged to perform these examples in your environment. 

This means, don’t just read through the source code examples.  Rather, type them out on your keyboard. 

Now, I don’t mean to sound bossy here, but typing these examples really will help you learn.  Trust me, I’m a doctor. 

Actually, I’m not a doctor, but adding “I’m a doctor” sounds better than just “Trust me”.

Once you are comfortable here, check out other free Scala with Spark tutorials and do let me know if you have any questions or suggestions. 

Spark Actions Examples

What are Spark Actions?

Spark Actions differ from Spark Transformations, because actions trigger the execution of a Spark job.

Examples of Spark Actions include:

  • count() – returns the number of elements in an RDD
  • collect() – returns all the elements in an RDD as an array
  • saveAsTextFile() – writes the elements of an RDD to a text file
  • reduce() – aggregates the elements of an RDD using a specified function

We will be going through examples of these actions and many more below.

How to Code Spark Actions

Step 1: Create a SparkSession

To create Spark Actions, a SparkSession is required. For now, we’ll define SparkSession is the entry point to Spark functionality. An example of a SparkSession can be seen in the following code:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

object Example {
  def main(args: Array[String]): Unit = {

    // create a SparkSession with a local master and app name
    val spark = SparkSession.builder()
      .appName("SparkActions")
      .master("local[*]")
      .getOrCreate()

    // See Step 2 below

    // stop the SparkSession
    spark.stop()
  }
}

This code creates a new SparkSession with the name “Spark Actions”. If a SparkSession with this name already exists, it will return that instead of creating a new one.

Note: Alternatives to SparkSession such as SparkContext and SQLContext depends on the version of Spark. See SparkSession vs. SparkContext vs SQLContext for further information.

Step 2: Creating a DataFrame

Once we have a SparkSession, we can create a DataFrame (or RDD or DataSet). DataFrame is an abstraction for the underlying distributed collection of data. DataFrame is also organized into named columns.

For example, we create a DataFrame in the following Scala code:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

object Example {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("SparkActions")
      .master("local[*]")
      .getOrCreate()

     // New: create create a DataFrame from a sequence of numbers
    // with toDF from functions._ import
    val df = Seq((1), (2), (3), (4), (5)).toDF("number")

    // Step 3 next

    spark.stop()
  }
}

Step 3: Applying Spark Actions

With a SparkSession and DataFrame now in place, we are ready to apply a Spark Action. A Spark actions will trigger the computation of a DataFrame(s) and return a result to the driver program or write data to an external storage system. Some examples of Spark Actions are:

  • show(): displays the first n rows of a DataFrame
  • count(): returns the number of rows in a DataFrame
  • collect(): returns all the rows of a DataFrame as a list of Rows
  • write(): writes the contents of a DataFrame to an external storage system

Building upon our preceding examples, we can apply an action function such as sum to our DataFrame using the following code:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

object Example {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("SparkActions")
      .master("local[*]")
      .getOrCreate()

    val df = Seq((1), (2), (3), (4), (5)).toDF("number")

    // Spark action example "agg"
    // calculate the sum of the numbers using agg
    val sum = df.agg(sum("number"))

    // print the result
    println(s"The sum is $sum")

    spark.stop()
  }
}

In this example, we created a DataFrame from a sequence of numbers using SparkSession, and then use the agg Spark transformation, followed by sum action function to calculate the total sum of all the numbers. We then print it out.

Just a simple example as we could have used SparkSession to create the DataFrame by reading from a data source, such as a CSV file in Scala.

Spark Action Code Examples

In all the following Spark action code examples, I’m going to use the spark-shell because it conveniently provides a SparkSession. I encourage you to try it as well.

In my environment, I downloaded a Spark distribution from https://spark.apache.org, extracted and opened a terminal window where I can call “spark-shell” shell script.

For example, my environment looks like this:

reduce

A Spark action used to aggregate the elements of a dataset through func

The reduce Spark action function is used to aggregate the elements of an RDD or DataFrame by combining all the elements using a binary operator. It returns the result as a single value.

scala> val names1 = sc.parallelize(List("abe", "abby", "apple"))
names1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1467] at parallelize at <console>:12

scala> names1.reduce((t1,t2) => t1 + t2)
res778: String = abbyabeapple

scala> names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2)
res779: Int = 12

// another way to show

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1473] at map at <console>:14

scala> names2.flatMap(t => Array(t._2)).reduce(_ + _)
res783: Int = 19

DataFrame alternative using agg function

As we previously saw, when using DataFrame it may be more convenient to use agg with sum function instead of reduce as shown in the following code

scala> val df = spark.range(10).toDF("num")
df: org.apache.spark.sql.DataFrame = [num: bigint]

scala> val sumDF = df.agg(sum("num"))
sumDF: org.apache.spark.sql.DataFrame = [sum(num): bigint]

scala> sumDF.show
+--------+
|sum(num)|
+--------+
|      45|
+--------+

collect

collect is a Spark action which retrieves all RDD or DataFrame data back to the driver program as an array in Scala.

It is important to use collect only on small datasets, as it can cause the driver program to run out of memory if the dataset is too large.

collect should be used only when it is necessary to retrieve as it can significantly decrease the performance due to transferring all the data to the driver.

We’ll be fine using it in small batches from the shell though.

collect is often used in previously provided examples such as Spark Transformation Examples in order to show the values of the return. The REPL, for example, will print the values of the array back to the console. This can be helpful in debugging programs.

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

// or DataFrame version using df DataFrame from previous example

scala> df.collect().foreach(println)
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]

count

Count can be used to determine the number of elements in a RDD (or DataFrame, DataSet).

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1476] at parallelize at <console>:12

scala> names2.count
res784: Long = 3

// or previous DataFame version
scala> df.count
res10: Long = 10

foreach

We already saw use of foreach above in a Dataframe.

The foreach function is used to apply a function to each element of an RDD or DataFrame. The function can be used for various purposes such as writing data to console/log, external systems or updating shared state.

Here’s a version of foreach source code example using an RDD in Scala:

val spark = SparkSession.builder().appName("foreach-example").getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))

rdd.foreach(element => {
  println(element)
})

This code creates an RDD containing the integers 1 through 5, applies the printlin function to the foreach function to print out each element.

first

No fancy explanation needed as it simply returns the first element in the RDD or DataFrame or DataSet.

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1477] at parallelize at <console>:12

scala> names2.first
res785: String = apple

take

The take function is used to retrieve the first n elements of an RDD or DataFrame as shown in the following code examples:

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1478] at parallelize at <console>:12

scala> names2.take(2)
res786: Array[String] = Array(apple, beatty)

takeSample

Similar to take, takeSample action function returns a type of array with size of n.  It also includes boolean option of with or without replacement and random generator seed as shown in the following examples:

scala> val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
teams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1482] at parallelize at <console>:12

scala> teams.takeSample(true, 3)
res789: Array[String] = Array(white sox, twins, white sox)

scala> teams.takeSample(true, 3)
res790: Array[String] = Array(cubs, white sox, twins)

countByKey

This is only available on RDDs of (K,V) and returns a hashmap of (K, count of K)

scala> val hockeyTeams = sc.parallelize(List("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
hockeyTeams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:12

scala> hockeyTeams.map(k => (k,1)).countByKey
res0: scala.collection.Map[String,Long] = Map(jets -> 1, blackhawks -> 1, red wings -> 1, oilers -> 1, whalers -> 1, wild -> 3)

CountByKey would have been helpful to show in most popular baby names spark example from an earlier post.

scala> val babyNamesToTotalCount = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
babyNamesToTotalCount: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[21] at map at <console>:12

scala> babyNamesToTotalCount.countByKey
res2: scala.collection.Map[String,Long] = Map(JADEN -> 65, KACPER -> 2, RACHEL -> 63, JORDYN -> 33, JANA -> 1, CESIA -> 1, IBRAHIM -> 22, LUIS -> 65, DESMOND -> 5, AMANI -> 6, ELIMELECH -> 7, LILA -> 39, NEYMAR -> 1, JOSUE -> 31, LEELA -> 1, DANNY -> 25, GARY -> 3, SIMA -> 10, GOLDY -> 14, SADIE -> 41, MARY -> 40, LINDSAY -> 10, JAKOB -> 2, AHARON -> 5, LEVI -> 39, MADISYN -> 3, HADASSAH -> 5, MALIA -> 10, ANTONIA -> 2, RAIZY -> 16, ISAIAS -> 1, AMINA -> 9, DECLAN -> 33, GILLIAN -> 1, ARYANA -> 1, GRIFFIN -> 25, BRYANNA -> 6, SEBASTIEN -> 1, JENCARLOS -> 1, ELSA -> 1, HANA -> 3, MASON -> 194, SAVANNA -> 6, ROWAN -> 6, DENNIS -> 15, JEROME -> 1, BROOKLYNN -> 2, MIRANDA -> 11, KYLER -> 1, HADLEY -> 2, STEPHANIE -> 46, CAMILA -> 45, MAKENNA -> 3, CARMINE -> 5, KATRINA -> 1, AMALIA -> 1, EN...

saveAsTextFile

Write out the elements of the data set as a text file in a filepath directory on the filesystem, HDFS or any other Hadoop-supported file system.

scala> val onlyInterestedIn = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
onlyInterestedIn: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[27] at map at <console>:12

scala> onlyInterestedIn.saveAsTextFile("results.csv")

Produces:

todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ ls -al results.csv/
total 824
drwxr-xr-x   8 toddmcgrath  staff     272 Dec  3 06:53 .
drwxr-xr-x@ 17 toddmcgrath  staff     578 Dec  3 06:54 ..
-rw-r--r--   1 toddmcgrath  staff       8 Dec  3 06:53 ._SUCCESS.crc
-rw-r--r--   1 toddmcgrath  staff    1600 Dec  3 06:53 .part-00000.crc
-rw-r--r--   1 toddmcgrath  staff    1588 Dec  3 06:53 .part-00001.crc
-rw-r--r--   1 toddmcgrath  staff       0 Dec  3 06:53 _SUCCESS
-rw-r--r--   1 toddmcgrath  staff  203775 Dec  3 06:53 part-00000
-rw-r--r--   1 toddmcgrath  staff  202120 Dec  3 06:53 part-00001

Spark Actions with Scala Conclusion

Throughout this tutorial, we explored different types of Spark actions in Scala. Through Spark action functions, we trigger the computation in Spark and retrieve the results back to the driver program.

We have seen how actions such as collect()count(), and take() can be used to retrieve data from an RDD, Dataframe, DataSet. We have also explored how reduce()fold(), and aggregate() can be used to perform aggregations on an RDD.

Furthermore, we have discussed how foreach() and foreachPartition() can be used to perform operations on each element of an RDD.

We have also explored how saveAsTextFile() can be used to save to a file system.

Spark actions are an essential part of Spark programming, and you need solid understanding how to use them.

Did you type these examples above as I suggested?  I hope so.  And do let me know if you have any questions or suggestions for improvement. 

Are you starting your journey in Spark with Scala and need more? Check out other Spark with Scala tutorials.

Spark Actions Further References

See also  Spark Broadcast and Accumulators by Examples
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

3 thoughts on “Beginning Spark Actions in Scala [9 Popular Examples]”

Leave a Comment