Apache Spark: Examples of Transformations

Spark Transformation Examples

Spark Transformation Examples

Transformation functions produce a new Resilient Distributed Dataset (RDD).  Resilient distributed datasets are Spark’s main programming abstraction.  RDDs are automatically parallelized across the cluster.

In the Scala Spark transformation code examples below, it could be very helpful for you reference the previous post in the Spark with Scala tutorials; especially when there are references to baby_names.csv file.


map
flatMap
filter
mapPartitions
mapPartitionsWithIndex
sample
Hammer Time (Can’t Touch This)
union
intersection
distinct
The Keys (To Success? The Florida Keys? To the Castle?)
groupByKey
reduceByKey
aggregateByKey
sortByKey
join

map(func)

What does it do? Pass each element of the RDD through the supplied function; i.e. `func`

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[360] at map at <console>:14

What did this example do?  Iterates over every line in the babyNames RDD (originally created from baby_names.csv file) and splits into new RDD of Arrays of Strings.  The arrays contain a String separated by comma characters in the source RDD (CSV).

Back to Top

flatMap(func)

“Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).”

Compare flatMap to map in the following

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))

`flatMap` is helpful with nested datasets.  It may be beneficial to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections).  This is unlike CSV which has no hierarchical structural.

By the way, these examples may blur the line between Scala and Spark for you.  These examples highlight Scala and not necessarily Spark.   In a sense, the only Spark specific portion of this code example is the use of `parallelize` from a SparkContext.  When calling `parallelize`, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel.  Being able to operate in parallel is a Spark feature.

Adding `collect` to both the `flatMap` and `map` results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use `collect` as seen in the following:

scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x))
res202: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[373] at flatMap at <console>:13

scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x))
res203: org.apache.spark.rdd.RDD[List[Int]] = MappedRDD[375] at map at <console>:13

Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

Back to Top

filter(func)

Filter creates a new RDD by passing in the supplied func used to filter the results.  For those people with relational database background or coming from a SQL perspective, it may be helpful think of `filter` as the `where` clause in a SQL statement.

Spark filter examples

val file = sc.textFile("catalina.out")
val errors = file.filter(line => line.contains("ERROR"))

Formal API: filter(f: (T) ⇒ Boolean): RDD[T]

Back to Top

mapPartitions(func)

Consider `mapPartitions` a tool for performance optimization if you have the horsepower.  It won’t do much for you when running examples on your local machine compared to running across a cluster.  It’s the same as `map`, but works with Spark RDD partitions.  Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets.  Or, put another way, you could say it is distributed over partitions.

// from laptop
scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[450] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res383: Array[Int] = Array(1, 4, 7)

// compare to the same, but with default parallelize
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[452] at parallelize at <console>:12

scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res384: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]

Back to Top

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides a function with an Int value to indicate the index position of the partition.

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[455] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res389: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9)

When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:

scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[457] at parallelize at <console>:12

scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res390: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

Formal API signature (implicts stripped) and definition from Spark Scala API docs:

mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

“Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn’t modify the keys.”

Back to Top

sample(withReplacement,fraction, seed)

Return a random sample subset RDD of the input RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[470] at parallelize at <console>:12

scala> parallel.sample(true,.2).count
res403: Long = 3

scala> parallel.sample(true,.2).count
res404: Long = 2

scala> parallel.sample(true,.1)
res405: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[473] at sample at <console>:15

Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

Back to Top

The Next Three (AKA: Hammer Time)

Stop.  Hammer Time.  The next three functions (union, intersection and distinct) really play well off of each other when describing.  Can’t Touch this.

union(a different rdd)

Simple.  Return the union of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).collect
res408: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Back to Top

intersection(a different rdd)

Simple.  Similar to union but return the intersection of two RDDs

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.intersection(par2).collect
res409: Array[Int] = Array(8, 9, 5, 6, 7)

Formal API: intersection(other: RDD[T]): RDD[T]

Back to Top

distinct([numTasks])

Another simple one.  Return a new RDD with distinct elements within a source RDD

scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12

scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12

scala> parallel.union(par2).distinct.collect
res412: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

Formal API: distinct(): RDD[T]

Back to Top

The Keys

The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key,value RDDs.  So, this section will be known as “The Keys”.  Cool name, huh?  Well, not really, but it sounded much better than The Keys and the Values which for some unexplained reason, triggers memories of “The Young and the Restless”.)

The following key functions are available though org.apache.spark.rdd.PairRDDFunctions which are operations available only on RDDs of key-value pairs.  “These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when you import org.apache.spark.SparkContext._.”

For the following, we’re going to use the baby_names.csv file introduced in previous post What is Apache Spark?

All the following examples presume the baby_names.csv file has been loaded and split such as:

scala> val babyNames = sc.textFile("baby_names.csv")
babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[495] at textFile at <console>:12

scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[496] at map at <console>:14

Back to Top

groupByKey([numTasks])

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. ”

The following groups all names to counties in which they appear over the years.

scala> val namesToCounties = rows.map(name => (name(1),name(2)))
namesToCounties: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[513] at map at <console>:16

scala> namesToCounties.groupByKey.collect
res429: Array[(String, Iterable[String])] = Array((BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)), (MATTEO,CompactBuffer(NEW YORK, SUFFOLK, NASSAU, KINGS, WESTCHESTER, WESTCHESTER, KINGS, SUFFOLK, NASSAU, QUEENS, QUEENS, NEW YORK, NASSAU, QUEENS, KINGS, SUFFOLK, WESTCHESTER, WESTCHESTER, SUFFOLK, KINGS, NASSAU, QUEENS, SUFFOLK, NASSAU, WESTCHESTER)), (HAZEL,CompactBuffer(ERIE, MONROE, KINGS, NEW YORK, KINGS, MONROE, NASSAU, SUFFOLK, QUEENS, KINGS, SUFFOLK, NEW YORK, KINGS, SUFFOLK)), (SKYE,CompactBuffer(NASSAU, KINGS, MONROE, BRONX, KINGS, KINGS, NASSAU)), (JOSUE,CompactBuffer(SUFFOLK, NASSAU, WESTCHESTER, BRONX, KINGS, QUEENS, SUFFOLK, QUEENS, NASSAU, WESTCHESTER, BRONX, BRONX, QUEENS, SUFFOLK, KINGS, WESTCHESTER, QUEENS, NASSAU, SUFFOLK, BRONX, KINGS, ...

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

reduceByKey(func, [numTasks])

Operates on (K,V) pairs of course, but the func must be of type (V,V) => V

Let’s sum the yearly name counts over the years in the CSV.  Notice we need to filter out the header row.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...

Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

Back to Top

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Ok, I admit, this one drives me a bit nuts.  Why wouldn’t we just use reduceByKey?  I don’t feel smart enough to know when to use aggregateByKey over reduceByKey.  For example, the same results may be produced:

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect
res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR...


scala> filteredRows.map ( n => (n(1), n(4))).aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).sortBy(_._2).collect

And again,  the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

There’s a gist of aggregateByKey as well.

Back to Top

sortByKey([ascending], [numTasks])

This simply sorts the (K,V) pair by K.  Try it out.  See examples above on where babyNames originates.

scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(","))
filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey().foreach (println _)

scala>  filteredRows.map ( n => (n(1), n(4))).sortByKey(false).foreach (println _) // opposite order

Back to Top

join(otherDataset, [numTasks])

If you have relational database experience, this will be easy.  It’s joining of two datasets.  Other joins are available as well such as leftOuterJoin and rightOuterJoin.

scala> val names1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1))
names1: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1441] at map at <console>:14

scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1))
names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1443] at map at <console>:14

scala> names1.join(names2).collect
res735: Array[(String, (Int, Int))] = Array((apple,(1,1)))

scala> names1.leftOuterJoin(names2).collect
res736: Array[(String, (Int, Option[Int]))] = Array((abby,(1,None)), (apple,(1,Some(1))), (abe,(1,None)))

scala> names1.rightOuterJoin(names2).collect
res737: Array[(String, (Option[Int], Int))] = Array((apple,(Some(1),1)), (beatty,(None,1)), (beatrice,(None,1)))

 

Back to Top

 

Featured image credit https://flic.kr/p/8R8uP9

2 thoughts on “Apache Spark: Examples of Transformations

  1. I loved your lessons. I tried out the transformations and actions but most importantly had fun while doing so because of the little notes of humor that you had left in between. I’m glad google led me to your site.

  2. hello i am new to spark but you tutorial are easy to understand, can you please throw more light into what the K and the V stand for. thanks

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.