Apache Spark Transformations in Scala Examples

Spark Transformation Examples

Spark Transformations in Scala Examples

Spark Transformations produce a new Resilient Distributed Dataset (RDD) or DataFrame or DataSet depending on your version of Spark.  Resilient distributed datasets are Spark’s main and original programming abstraction for working with data distributed across multiple nodes in your cluster.  RDDs are automatically parallelized across the cluster.

In the Scala Spark transformations code examples below, it could be very helpful for you reference the previous post What is Apache Spark tutorials; especially when there are references to the baby_names.csv file.  Also, for further exploration of Spark with Scala, check out the Scala with Spark Tutorials page.

If you are new to Spark and Scala, I encourage you to type these examples below; not just read them. Try them to start building confidence and familiarity.  Also, for more depth coverage of Scala with Spark, this might be a good spot to mention my Scala for Spark course.

Scala Spark Transformations Function Examples

map

What does it do? Pass each element of the RDD through and into the supplied function; i.e. `func`

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[360] at map at <console>:14

What did this example do?  Iterates over every line in the babyNames RDD (originally created from baby_names.csv file) and splits into new RDD of Arrays of Strings.  The arrays contain a String separated by comma characters in the source RDD (CSV).  Makes sense?

Back to Top

flatMap

“Similar to map, but each input item can be mapped to 0 or more output items (so `func` should return a Seq rather than a single item).”  Whereas themapfunction can be thought of as a one-to-one operation, theflatMapfunction can be considered a one-to-many.  Well, that helped me if I considered it that way anyhow.

Compare flatMaptomapin the following

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

flatMapcan be especially helpful with nested datasets. For example, it may be helpful to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections).  This is unlike CSV which should have no hierarchical structural. 

By the way, these examples may blur the line between Scala and Spark.  Both Scala and Spark have bothmap and flatMapin their APIs.   In a sense, the only Spark unique portion of this code example above is the use of `parallelize` from a SparkContext.  When calling `parallelize`, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel.  Being able to operate in parallel is a Spark feature.  But, you knew that already.

Adding `collect` to both the `flatMap` and `map` results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use `collect` as seen in the following:

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x))
res202: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[373] at flatMap at <console>:13

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x))
res203: org.apache.spark.rdd.RDD[List[Int]] = MappedRDD[375] at map at <console>:13

Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

See also  Spark Streaming Example - How to Stream from Slack

filter

Filter creates a new RDD by passing in the supplied funcused to filter the results.  For those people with relational database background or coming from a SQL perspective, it may be helpful think of `filter` as the `where` clause in a SQL statement.  In other words, where in SQL is used to filter the desired results according to some criteria such as ... where state = 'MN'

Spark filter examples

val file = sc.textFile("catalina.out")
val errors = file.filter(line => line.contains("ERROR"))

Formal API: filter(f: (T) ⇒ Boolean): RDD[T]

mapPartitions

Consider mapPartitionsa tool for performance optimization. Now, it won’t do much good for you when running examples on your local machine, but don’t forget it when running on a Spark cluster.  It’s the same as map but works with Spark RDD partitions.  Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets.  Or, put another way, you could say it is distributed over partitions.

scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[450] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res383: Array[Int] = Array(1, 4, 7)

// compare to the same, but with default parallelize
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[452] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res384: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]

mapPartitionsWithIndex

Similar to mapPartitions but also provides a function with an Int value to indicate the index position of the partition.

When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[455] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res389: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9)
scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[457] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res390: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

Formal API signature (implicts stripped) and definition from Spark Scala API docs:

mapPartitionsWithIndex

“Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn’t modify the keys.”

sample

Return a random sample subset RDD of the input RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[470] at parallelize at <console>:12

scala> parallel.sample(true,.2).count
res403: Long = 3

scala> parallel.sample(true,.2).count
res404: Long = 2

scala> parallel.sample(true,.1)
res405: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[473] at sample at <console>:15

Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

The Next Three

Stop.  Hammer Time.  Does anyone remember that?  Remember those pants!  Oh man, I wish I could forget it.  Google “Hammer Time” if you don’t know.  Anyhow, where was I?

See also  Kafka Consumer in Scala

The next three functions union, intersection and distinct really play well off of each other.  “can’t Touch this” 🙂 . See what I did there <-  Hah! I threw in some MC Hammer.  Hammer Time.

union

Simple.  Return the union of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).collect
res408: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Back to Top

intersection

Simple.  Similar to union but return the intersection of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.intersection(par2).collect
res409: Array[Int] = Array(8, 9, 5, 6, 7)

Formal API: intersection(other: RDD[T]): RDD[T]

distinct

Another simple one.  Return a new RDD with distinct elements within a source RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).distinct.collect
res412: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Formal API: distinct(): RDD[T]

The Keys

The group of Spark transformations such as groupByKey, reduceByKey, aggregateByKey, sortByKey, join all act on key, value RDDs.  So, this section will be known as “The Keys”.  Cool name, huh?  Well, not really, but it sounded much better than “The Keys and the Values” which for some unexplained reason, triggers memories of “The Young and the Restless”.  Let’s have some fun.

Hey, it might be important for some of you to note here.  You’re going to work much with key, value structured data in your Spark adventures.

The following key functions are available through org.apache.spark.rdd.PairRDDFunctions which are operations available only on RDDs of key-value pairs.  “These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when you import org.apache.spark.SparkContext._.”

For the following, we’re going to use the baby_names.csv file introduced in the previous post What is Apache Spark?

All the following examples presume the baby_names.csv file has been loaded and split such as:

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[495] at textFile at <console>:12

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[496] at map at <console>:14

groupByKey

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. “

The following groups all names to counties in which they appear over the years.

scala> val namesToCounties = rows.map(name => (name(1),name(2)))
namesToCounties: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[513] at map at <console>:16

scala> namesToCounties.groupByKey.collect
res429: Array[(String, Iterable[String])] = Array((BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)), (MATTEO,CompactBuffer(NEW YORK, SUFFOLK, NASSAU, KINGS, WESTCHESTER, WESTCHESTER, KINGS, SUFFOLK, NASSAU, QUEENS, QUEENS, NEW YORK, NASSAU, QUEENS, KINGS, SUFFOLK, WESTCHESTER, WESTCHESTER, SUFFOLK, KINGS, NASSAU, QUEENS, SUFFOLK, NASSAU, WESTCHESTER)), (HAZEL,CompactBuffer(ERIE, MONROE, KINGS, NEW YORK, KINGS, MONROE, NASSAU, SUFFOLK, QUEENS, KINGS, SUFFOLK, NEW YORK, KINGS, SUFFOLK)), (SKYE,CompactBuffer(NASSAU, KINGS, MONROE, BRONX, KINGS, KINGS, NASSAU)), (JOSUE,CompactBuffer(SUFFOLK, NASSAU, WESTCHESTER, BRONX, KINGS, QUEENS, SUFFOLK, QUEENS, NASSAU, WESTCHESTER, BRONX, BRONX, QUEENS, SUFFOLK, KINGS, WESTCHESTER, QUEENS, NASSAU, SUFFOLK, BRONX, KINGS, ...

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

See also  How to Debug Scala Spark in IntelliJ

reduceByKey

Operates on (K,V) pairs of course, but the func must be of type (V,V) => V

Let’s sum the yearly name counts over the years in the CSV.  Notice we need to filter out the header row.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...

Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

And for the last time, the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

aggregateByKey

Ok, I admit, this one drives me a bit nuts.  Why wouldn’t we just use reduceByKey?  I don’t feel smart enough to know when to use aggregateByKey over reduceByKey.  For example, the same results may be produced:

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...


scala> filteredRows.map ( n => (n(1), n(4))).aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).sortBy(_._2).collect

There’s a gist of aggregateByKey as well.

sortByKey

This simply sorts the (K,V) pair by K.  Try it out.  See examples above on where babyNames originates.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey().foreach (println _)

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey(false).foreach (println _) // opposite order

join

If you have relational database experience, this will be easy.  It’s joining of two datasets.  Other joins are available as well such as leftOuterJoin and rightOuterJoin.

scala> val names1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1))
names1: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1441] at map at <console>:14

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1443] at map at <console>:14

scala> names1.join(names2).collect
res735: Array[(String, (Int, Int))] = Array((apple,(1,1)))

scala> names1.leftOuterJoin(names2).collect
res736: Array[(String, (Int, Option[Int]))] = Array((abby,(1,None)), (apple,(1,Some(1))), (abe,(1,None)))

scala> names1.rightOuterJoin(names2).collect
res737: Array[(String, (Option[Int], Int))] = Array((apple,(Some(1),1)), (beatty,(None,1)), (beatrice,(None,1)))

Spark Transformations Examples in Scala Conclusion

As mentioned at the top, the way to really get a feel for your Spark API options with Spark Transformations is to perform these examples in your own environment.  “hands on the keyboard” as some people refer to it.  If you have any questions or suggestions, let me know.  And keep an eye on the Spark with Scala tutorials page for the latest tutorials using Scala with Spark.

Featured image credit https://flic.kr/p/8R8uP9

4 thoughts on “Apache Spark Transformations in Scala Examples

  1. I loved your lessons. I tried out the transformations and actions but most importantly had fun while doing so because of the little notes of humor that you had left in between. I’m glad google led me to your site.

  2. hello i am new to spark but you tutorial are easy to understand, can you please throw more light into what the K and the V stand for. thanks

  3. val babyNames = spark.read.format(“csv”).option(“header”,”true”).load(“file:/C:/Bigdata/baby_names.csv”)

    val rows = babyNames.map(line => line.split(“,”))
    :26: error: value Map is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
    val rows = babyNames.Map(line => line.split(“,”))

    i am getting this error after reading this please help me

Leave a Reply

Your email address will not be published. Required fields are marked *