Apache Spark Transformations in Scala Examples

Spark Transformation Examples

Spark Transformations in Scala Examples

Spark Transformations produce a new Resilient Distributed Dataset (RDD) or DataFrame or DataSet depending on your version of Spark.  Resilient distributed datasets are Spark’s main and original programming abstraction for working with data distributed across multiple nodes in your cluster.  RDDs are automatically parallelized across the cluster.

In the Scala Spark transformations code examples below, it could be very helpful for you reference the previous post What is Apache Spark tutorials; especially when there are references to the baby_names.csv file.  Also, for further exploration of Spark with Scala, check out the Scala with Spark Tutorials page.

If you are new to Spark and Scala, I encourage you to type these examples below; not just read them.  Also, for more depth coverage of Scala with Spark, this might be a good spot to mention my Scala for Spark course.

Scala Spark Transformations Function Examples


map
flatMap
filter
mapPartitions
mapPartitionsWithIndex
sample
Hammer Time (Can’t Touch This)
union
intersection
distinct
The Keys (To Success? The Florida Keys? To the Castle?)
groupByKey
reduceByKey
aggregateByKey
sortByKey
join

map(func)

What does it do? Pass each element of the RDD through and into the supplied function; i.e. `func`

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[360] at map at <console>:14

What did this example do?  Iterates over every line in the babyNames RDD (originally created from baby_names.csv file) and splits into new RDD of Arrays of Strings.  The arrays contain a String separated by comma characters in the source RDD (CSV).  Makes sense?

Back to Top

flatMap(func)

“Similar to map, but each input item can be mapped to 0 or more output items (so `func` should return a Seq rather than a single item).”  Whereas themapfunction can be thought of as a one-to-one operation, theflatMapfunction can be considered a one-to-many.  Well, that helped me if I considered it that way anyhow.

Compare flatMaptomapin the following

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

flatMapcan be especially helpful with nested datasets. For example, it may be helpful to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections).  This is unlike CSV which should have no hierarchical structural. 

By the way, these examples may blur the line between Scala and Spark.  Both Scala and Spark have bothmap and flatMapin their APIs.   In a sense, the only Spark unique portion of this code example above is the use of `parallelize` from a SparkContext.  When calling `parallelize`, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel.  Being able to operate in parallel is a Spark feature.  But, you knew that already.

Adding `collect` to both the `flatMap` and `map` results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use `collect` as seen in the following:

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x))
res202: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[373] at flatMap at <console>:13

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x))
res203: org.apache.spark.rdd.RDD[List[Int]] = MappedRDD[375] at map at <console>:13

Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

Back to Top

filter(func)

Filter creates a new RDD by passing in the supplied funcused to filter the results.  For those people with relational database background or coming from a SQL perspective, it may be helpful think of `filter` as the `where` clause in a SQL statement.  In other words, where in SQL is used to filter the desired results according to some criteria such as ... where state = 'MN'

Spark filter examples

val file = sc.textFile("catalina.out")
val errors = file.filter(line => line.contains("ERROR"))

Formal API: filter(f: (T) ⇒ Boolean): RDD[T]

Back to Top

mapPartitions(func)

Consider mapPartitionsa tool for performance optimization. Now, it won’t do much good for you when running examples on your local machine, but don’t forget it when running on a Spark cluster.  It’s the same as map but works with Spark RDD partitions.  Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets.  Or, put another way, you could say it is distributed over partitions.

// from laptop
scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[450] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res383: Array[Int] = Array(1, 4, 7)

// compare to the same, but with default parallelize
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[452] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res384: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]

Back to Top

mapPartitionsWithIndex(func)

Similar to mapPartitions but also provides a function with an Int value to indicate the index position of the partition.

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[455] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res389: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9)

When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:

scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[457] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res390: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

Formal API signature (implicts stripped) and definition from Spark Scala API docs:

mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

“Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn’t modify the keys.”

Back to Top

sample(withReplacement,fraction, seed)

Return a random sample subset RDD of the input RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[470] at parallelize at <console>:12

scala> parallel.sample(true,.2).count
res403: Long = 3

scala> parallel.sample(true,.2).count
res404: Long = 2

scala> parallel.sample(true,.1)
res405: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[473] at sample at <console>:15

Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

Back to Top

The Next Three (AKA: Hammer Time)

Stop.  Hammer Time.  Does anyone remember that?  Remember those pants!  Oh man, I wish I could forget it.  Google “Hammer Time” if you don’t know.  Anyhow, where was I?

The next three functions union, intersection and distinct really play well off of each other.  “can’t Touch this” 🙂 . See what I did there <-  Hah! I threw in some MC Hammer.  Hammer Time.

union(a different rdd)

Simple.  Return the union of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).collect
res408: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Back to Top

intersection(a different rdd)

Simple.  Similar to union but return the intersection of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.intersection(par2).collect
res409: Array[Int] = Array(8, 9, 5, 6, 7)

Formal API: intersection(other: RDD[T]): RDD[T]

Back to Top

distinct([numTasks])

Another simple one.  Return a new RDD with distinct elements within a source RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).distinct.collect
res412: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Formal API: distinct(): RDD[T]

Back to Top

The Keys

The group of Spark transformations such as groupByKey, reduceByKey, aggregateByKey, sortByKey, join all act on key, value RDDs.  So, this section will be known as “The Keys”.  Cool name, huh?  Well, not really, but it sounded much better than “The Keys and the Values” which for some unexplained reason, triggers memories of “The Young and the Restless”.  Let’s have some fun.

Hey, it might be important for some of you to note here.  You’re going to work much with key, value structured data in your Spark adventures.

The following key functions are available through org.apache.spark.rdd.PairRDDFunctions which are operations available only on RDDs of key-value pairs.  “These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when you import org.apache.spark.SparkContext._.”

For the following, we’re going to use the baby_names.csv file introduced in the previous post What is Apache Spark?

All the following examples presume the baby_names.csv file has been loaded and split such as:

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[495] at textFile at <console>:12

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[496] at map at <console>:14

Back to Top

groupByKey([numTasks])

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. ”

The following groups all names to counties in which they appear over the years.

scala> val namesToCounties = rows.map(name => (name(1),name(2)))
namesToCounties: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[513] at map at <console>:16

scala> namesToCounties.groupByKey.collect
res429: Array[(String, Iterable[String])] = Array((BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)), (MATTEO,CompactBuffer(NEW YORK, SUFFOLK, NASSAU, KINGS, WESTCHESTER, WESTCHESTER, KINGS, SUFFOLK, NASSAU, QUEENS, QUEENS, NEW YORK, NASSAU, QUEENS, KINGS, SUFFOLK, WESTCHESTER, WESTCHESTER, SUFFOLK, KINGS, NASSAU, QUEENS, SUFFOLK, NASSAU, WESTCHESTER)), (HAZEL,CompactBuffer(ERIE, MONROE, KINGS, NEW YORK, KINGS, MONROE, NASSAU, SUFFOLK, QUEENS, KINGS, SUFFOLK, NEW YORK, KINGS, SUFFOLK)), (SKYE,CompactBuffer(NASSAU, KINGS, MONROE, BRONX, KINGS, KINGS, NASSAU)), (JOSUE,CompactBuffer(SUFFOLK, NASSAU, WESTCHESTER, BRONX, KINGS, QUEENS, SUFFOLK, QUEENS, NASSAU, WESTCHESTER, BRONX, BRONX, QUEENS, SUFFOLK, KINGS, WESTCHESTER, QUEENS, NASSAU, SUFFOLK, BRONX, KINGS, ...

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

reduceByKey(func, [numTasks])

Operates on (K,V) pairs of course, but the func must be of type (V,V) => V

Let’s sum the yearly name counts over the years in the CSV.  Notice we need to filter out the header row.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...

Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

And for the last time, the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Ok, I admit, this one drives me a bit nuts.  Why wouldn’t we just use reduceByKey?  I don’t feel smart enough to know when to use aggregateByKey over reduceByKey.  For example, the same results may be produced:

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...


scala> filteredRows.map ( n => (n(1), n(4))).aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).sortBy(_._2).collect

There’s a gist of aggregateByKey as well.

Back to Top

sortByKey([ascending], [numTasks])

This simply sorts the (K,V) pair by K.  Try it out.  See examples above on where babyNames originates.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey().foreach (println _)

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey(false).foreach (println _) // opposite order

Back to Top

join(otherDataset, [numTasks])

If you have relational database experience, this will be easy.  It’s joining of two datasets.  Other joins are available as well such as leftOuterJoin and rightOuterJoin.

scala> val names1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1))
names1: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1441] at map at <console>:14

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1443] at map at <console>:14

scala> names1.join(names2).collect
res735: Array[(String, (Int, Int))] = Array((apple,(1,1)))

scala> names1.leftOuterJoin(names2).collect
res736: Array[(String, (Int, Option[Int]))] = Array((abby,(1,None)), (apple,(1,Some(1))), (abe,(1,None)))

scala> names1.rightOuterJoin(names2).collect
res737: Array[(String, (Option[Int], Int))] = Array((apple,(Some(1),1)), (beatty,(None,1)), (beatrice,(None,1)))

Back to Top

Spark Transformations Examples in Scala Conclusion

As mentioned at the top, the way to really get a feel for your Spark API options with Spark Transformations is to perform these examples in your own environment.  “hands on the keyboard” as some people refer to it.  If you have any questions or suggestions, let me know.  And keep an eye on the Spark with Scala tutorials page for the latest tutorials using Scala with Spark.

Featured image credit https://flic.kr/p/8R8uP9

4 thoughts on “Apache Spark Transformations in Scala Examples

  1. I loved your lessons. I tried out the transformations and actions but most importantly had fun while doing so because of the little notes of humor that you had left in between. I’m glad google led me to your site.

  2. hello i am new to spark but you tutorial are easy to understand, can you please throw more light into what the K and the V stand for. thanks

  3. val babyNames = spark.read.format(“csv”).option(“header”,”true”).load(“file:/C:/Bigdata/baby_names.csv”)

    val rows = babyNames.map(line => line.split(“,”))
    :26: error: value Map is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
    val rows = babyNames.Map(line => line.split(“,”))

    i am getting this error after reading this please help me

Leave a Reply

Your email address will not be published. Required fields are marked *