Apache Spark Examples of Actions in Scala

Spark Action Examples in Scala

When using Spark API “action” functions, a result is produced back to the Spark Driver.  Computing this result will trigger any of the RDDs, DataFrames or DataSets needed in order to produce the result.  Recall Spark Transformations such as map, flatMap, and other transformations are used to create RDDs, DataFrames or DataSets are lazily initialized.   A call to a Spark action will trigger the RDDs, DataFrames or DataSets to be formed in memory or hydrated or initialized or you may choose to say it other ways as well.  “hydrated” is the fancy way to say it, but be wary of people who use the term “hydrated”.

As mentioned elsewhere on this site, if you are new to Scala with Spark, you are encouraged to perform these examples in your environment.  Don’t just read through them.  Type them out on your keyboard.  Now, I don’t mean to sound bossy here, but typing these examples really will help you learn.  Trust me, I’m a doctor.  Actually, I’m not a doctor, but it sounds better than just “Trust me” without the doctor part.

Once you are comfortable here, check out other Scala with Spark tutorials and let me know if you have any questions or suggestions.  Thanks, from Todd… who is not a doctor by the way.  🙂


reduce
collect
count
first
take
takeSample
countByKey
saveAsTextFile

reduce(func)

Aggregate the elements of a dataset through func

scala> val names1 = sc.parallelize(List("abe", "abby", "apple"))
names1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1467] at parallelize at <console>:12

scala> names1.reduce((t1,t2) => t1 + t2)
res778: String = abbyabeapple

scala> names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2)
res779: Int = 12

// another way to show

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1473] at map at <console>:14

scala> names2.flatMap(t => Array(t._2)).reduce(_ + _)
res783: Int = 19

map API signature with stripped implicits: map[U](f: (T) ⇒ U): RDD[U]

Back to Top

collect(func)

collect returns the elements of the dataset as an array back to the driver program.

collect is often used in previously provided examples such as Spark Transformation Examples in order to show the values of the return.  The REPL, for example, will print the values of the array back to the console.  This can be helpful in debugging programs.

Examples

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

Formal API : collect(): Array[T]

Return an array containing all of the elements in this RDD.

Back to Top

count()

Number of elements in the RDD

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1476] at parallelize at <console>:12

scala> names2.count
res784: Long = 3

Back to Top

first()

Return the first element in the RDD

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1477] at parallelize at <console>:12

scala> names2.first
res785: String = apple

Back to Top

take(n)

From Spark Programming Guide,”Return an array with the first n elements of the dataset.”

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1478] at parallelize at <console>:12

scala> names2.take(2)
res786: Array[String] = Array(apple, beatty)

Back to Top

takeSample(withReplacement: Boolean, n:Int, [seed:Int]): Array[T]

Similar to take, in return type of array with size of n.  Includes boolean option of with or without replacement and random generator seed.

scala> val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
teams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1482] at parallelize at <console>:12

scala> teams.takeSample(true, 3)
res789: Array[String] = Array(white sox, twins, white sox)

scala> teams.takeSample(true, 3)
res790: Array[String] = Array(cubs, white sox, twins)

Back to Top

countByKey()

This is only available on RDDs of (K,V) and returns a hashmap of (K, count of K)

scala> val hockeyTeams = sc.parallelize(List("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
hockeyTeams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:12

scala> hockeyTeams.map(k => (k,1)).countByKey
res0: scala.collection.Map[String,Long] = Map(jets -> 1, blackhawks -> 1, red wings -> 1, oilers -> 1, whalers -> 1, wild -> 3)

CountByKey would have been helpful to show in most popular baby names spark example from an earlier post.

scala> val babyNamesToTotalCount = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
babyNamesToTotalCount: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[21] at map at <console>:12

scala> babyNamesToTotalCount.countByKey
res2: scala.collection.Map[String,Long] = Map(JADEN -> 65, KACPER -> 2, RACHEL -> 63, JORDYN -> 33, JANA -> 1, CESIA -> 1, IBRAHIM -> 22, LUIS -> 65, DESMOND -> 5, AMANI -> 6, ELIMELECH -> 7, LILA -> 39, NEYMAR -> 1, JOSUE -> 31, LEELA -> 1, DANNY -> 25, GARY -> 3, SIMA -> 10, GOLDY -> 14, SADIE -> 41, MARY -> 40, LINDSAY -> 10, JAKOB -> 2, AHARON -> 5, LEVI -> 39, MADISYN -> 3, HADASSAH -> 5, MALIA -> 10, ANTONIA -> 2, RAIZY -> 16, ISAIAS -> 1, AMINA -> 9, DECLAN -> 33, GILLIAN -> 1, ARYANA -> 1, GRIFFIN -> 25, BRYANNA -> 6, SEBASTIEN -> 1, JENCARLOS -> 1, ELSA -> 1, HANA -> 3, MASON -> 194, SAVANNA -> 6, ROWAN -> 6, DENNIS -> 15, JEROME -> 1, BROOKLYNN -> 2, MIRANDA -> 11, KYLER -> 1, HADLEY -> 2, STEPHANIE -> 46, CAMILA -> 45, MAKENNA -> 3, CARMINE -> 5, KATRINA -> 1, AMALIA -> 1, EN...

Back to Top

saveAsTextFile(filepath)

Write out the elements of the data set as a text file in a filepath directory on the filesystem, HDFS or any other Hadoop-supported file system.

scala> val onlyInterestedIn = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
onlyInterestedIn: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[27] at map at <console>:12

scala> onlyInterestedIn.saveAsTextFile("results.csv")

Produces:

todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ ls -al results.csv/
total 824
drwxr-xr-x   8 toddmcgrath  staff     272 Dec  3 06:53 .
drwxr-xr-x@ 17 toddmcgrath  staff     578 Dec  3 06:54 ..
-rw-r--r--   1 toddmcgrath  staff       8 Dec  3 06:53 ._SUCCESS.crc
-rw-r--r--   1 toddmcgrath  staff    1600 Dec  3 06:53 .part-00000.crc
-rw-r--r--   1 toddmcgrath  staff    1588 Dec  3 06:53 .part-00001.crc
-rw-r--r--   1 toddmcgrath  staff       0 Dec  3 06:53 _SUCCESS
-rw-r--r--   1 toddmcgrath  staff  203775 Dec  3 06:53 part-00000
-rw-r--r--   1 toddmcgrath  staff  202120 Dec  3 06:53 part-00001

Back to Top

Spark Actions with Scala Conclusion

Did you type these examples above as I suggested?  I hope so.  And do let me know if you have any questions or suggestions for improvement.  Next, you should check out other Spark with Scala tutorials. Or, if you are looking for something a bit more guided and are new to both Scala and Spark, a good course for you is the Scala for Spark course.

3 thoughts on “Apache Spark Examples of Actions in Scala

Leave a Reply

Your email address will not be published. Required fields are marked *