
Spark Action Examples in Scala
When using Spark API “action” functions, a result is produced back to the Spark Driver. Computing this result will trigger any of the RDDs, DataFrames or DataSets needed in order to produce the result. Recall Spark Transformations such as map, flatMap, and other transformations are used to create RDDs, DataFrames or DataSets are lazily initialized. A call to a Spark action will trigger the RDDs, DataFrames or DataSets to be formed in memory or hydrated or initialized or you may choose to say it other ways as well. “hydrated” is the fancy way to say it, but be wary of people who use the term “hydrated”.
As mentioned elsewhere on this site, if you are new to Scala with Spark, you are encouraged to perform these examples in your environment. Don’t just read through them. Type them out on your keyboard. Now, I don’t mean to sound bossy here, but typing these examples really will help you learn. Trust me, I’m a doctor. Actually, I’m not a doctor, but it sounds better than just “Trust me” without the doctor part.
Once you are comfortable here, check out other Scala with Spark tutorials and let me know if you have any questions or suggestions. Thanks, from Todd… who is not a doctor by the way.
Spark Scala Examples
reduce
Aggregate the elements of a dataset through func
scala> val names1 = sc.parallelize(List("abe", "abby", "apple"))
names1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1467] at parallelize at <console>:12
scala> names1.reduce((t1,t2) => t1 + t2)
res778: String = abbyabeapple
scala> names1.flatMap(k => List(k.size) ).reduce((t1,t2) => t1 + t2)
res779: Int = 12
// another way to show
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, a.size))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1473] at map at <console>:14
scala> names2.flatMap(t => Array(t._2)).reduce(_ + _)
res783: Int = 19
map API signature with stripped implicits: map[U](f: (T) ⇒ U): RDD[U]
collect
collect returns the elements of the dataset as an array back to the driver program.
collect is often used in previously provided examples such as Spark Transformation Examples in order to show the values of the return. The REPL, for example, will print the values of the array back to the console. This can be helpful in debugging programs.
collect examples
scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))
Formal API : collect(): Array[T]
Return an array containing all of the elements in this RDD.
count
Number of elements in the RDD
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1476] at parallelize at <console>:12
scala> names2.count
res784: Long = 3
first
Return the first element in the RDD
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1477] at parallelize at <console>:12
scala> names2.first
res785: String = apple
take
From Spark Programming Guide,”Return an array with the first n elements of the dataset.”
scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice"))
names2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1478] at parallelize at <console>:12
scala> names2.take(2)
res786: Array[String] = Array(apple, beatty)
takeSample
Similar to take, in return type of array with size of n. Includes boolean option of with or without replacement and random generator seed.
scala> val teams = sc.parallelize(List("twins", "brewers", "cubs", "white sox", "indians", "bad news bears"))
teams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1482] at parallelize at <console>:12
scala> teams.takeSample(true, 3)
res789: Array[String] = Array(white sox, twins, white sox)
scala> teams.takeSample(true, 3)
res790: Array[String] = Array(cubs, white sox, twins)
countByKey
This is only available on RDDs of (K,V) and returns a hashmap of (K, count of K)
scala> val hockeyTeams = sc.parallelize(List("wild", "blackhawks", "red wings", "wild", "oilers", "whalers", "jets", "wild"))
hockeyTeams: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:12
scala> hockeyTeams.map(k => (k,1)).countByKey
res0: scala.collection.Map[String,Long] = Map(jets -> 1, blackhawks -> 1, red wings -> 1, oilers -> 1, whalers -> 1, wild -> 3)
CountByKey would have been helpful to show in most popular baby names spark example from an earlier post.
scala> val babyNamesToTotalCount = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
babyNamesToTotalCount: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[21] at map at <console>:12
scala> babyNamesToTotalCount.countByKey
res2: scala.collection.Map[String,Long] = Map(JADEN -> 65, KACPER -> 2, RACHEL -> 63, JORDYN -> 33, JANA -> 1, CESIA -> 1, IBRAHIM -> 22, LUIS -> 65, DESMOND -> 5, AMANI -> 6, ELIMELECH -> 7, LILA -> 39, NEYMAR -> 1, JOSUE -> 31, LEELA -> 1, DANNY -> 25, GARY -> 3, SIMA -> 10, GOLDY -> 14, SADIE -> 41, MARY -> 40, LINDSAY -> 10, JAKOB -> 2, AHARON -> 5, LEVI -> 39, MADISYN -> 3, HADASSAH -> 5, MALIA -> 10, ANTONIA -> 2, RAIZY -> 16, ISAIAS -> 1, AMINA -> 9, DECLAN -> 33, GILLIAN -> 1, ARYANA -> 1, GRIFFIN -> 25, BRYANNA -> 6, SEBASTIEN -> 1, JENCARLOS -> 1, ELSA -> 1, HANA -> 3, MASON -> 194, SAVANNA -> 6, ROWAN -> 6, DENNIS -> 15, JEROME -> 1, BROOKLYNN -> 2, MIRANDA -> 11, KYLER -> 1, HADLEY -> 2, STEPHANIE -> 46, CAMILA -> 45, MAKENNA -> 3, CARMINE -> 5, KATRINA -> 1, AMALIA -> 1, EN...
saveAsTextFile
Write out the elements of the data set as a text file in a filepath directory on the filesystem, HDFS or any other Hadoop-supported file system.
scala> val onlyInterestedIn = sc.textFile("baby_names.csv").map(line => line.split(",")).map(n => (n(1), n(4)))
onlyInterestedIn: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[27] at map at <console>:12
scala> onlyInterestedIn.saveAsTextFile("results.csv")
Produces:
todd-mcgraths-macbook-pro:spark-1.1.0-bin-hadoop2.4 toddmcgrath$ ls -al results.csv/ total 824 drwxr-xr-x 8 toddmcgrath staff 272 Dec 3 06:53 . drwxr-xr-x@ 17 toddmcgrath staff 578 Dec 3 06:54 .. -rw-r--r-- 1 toddmcgrath staff 8 Dec 3 06:53 ._SUCCESS.crc -rw-r--r-- 1 toddmcgrath staff 1600 Dec 3 06:53 .part-00000.crc -rw-r--r-- 1 toddmcgrath staff 1588 Dec 3 06:53 .part-00001.crc -rw-r--r-- 1 toddmcgrath staff 0 Dec 3 06:53 _SUCCESS -rw-r--r-- 1 toddmcgrath staff 203775 Dec 3 06:53 part-00000 -rw-r--r-- 1 toddmcgrath staff 202120 Dec 3 06:53 part-00001
Spark Actions with Scala Conclusion
Did you type these examples above as I suggested? I hope so. And do let me know if you have any questions or suggestions for improvement. Next, you should check out other Spark with Scala tutorials. Or, if you are looking for something a bit more guided and are new to both Scala and Spark, a good course for you is the Scala for Spark course.
great post…
thank you , very useful post
Thank you for the simple explanation of RDD.
Great works!