Spark Transformations produce a new Resilient Distributed Dataset (RDD) or DataFrame or DataSet depending on your version of Spark and knowing Spark transformations is a requirement to be productive with Apache Spark. This is true whether you are using Scala or Python.
The best way to becoming productive and confident in anything is to actually start doing it. This Spark transformations tutorial is designed for you to start doing by going through examples. You are encouraged to complete these examples and consider what each transformation in Scala code is doing.
This Spark transformation tutorial will cover the Spark with Scala approach. If you are interested in Python and PySpark approach, see the PySpark Transformations tutorial instead.
In this post, we will keep things as simple as possible in this post and avoid any complicated setup. As you progress, you will likely need more. For example, see my posts on using IntelliJ with Spark.
In addition to Spark transformations, you need to know Spark actions which are the counterpart to transformations. You need to know Spark actions and transformations.
Let’s cover transformations in Scala now.
Spark Transformations Table of Contents
- What are Spark Transformations?
- Types of Spark Transformations
- Spark Transformation Examples Setup
- Spark Transformations Examples in Scala
- The Keys
- Spark Examples of Transformations in Scala Conclusion
- Further Resources
What are Spark Transformations?
Spark Transformations create a new Resilient Distributed Dataset (RDD) from an existing one. As you hopefully already know, RDDs are the fundamental data structure in Spark. Yes, we have other abstraction layers in Spark such as DataFrames and DataSets, but it all started with RDDs. Spark RDDs are immutable, meaning that they cannot be modified once they are created. We cannot begin to appreciated DataFrames and DataSets without knowing RDDs. For purposes of Spark transformation examples in this post, we will focus on RDDs.
As previously mentioned, transformations create new RDDs that can be used in further operations. You will likely chain various transformations together to create RDDs which are filtered, aggregated, combined, etc. and closer to an abstraction layer to create the desired outcome or result in your code logic. For example, you may start with 1 RDD which is transformed into another RDD which is transformed into another RDD which is combined with another different RDD where that is filtered into a RDD and finally a Spark action function such as count
is called for the final result. Again, just an example, but the point is RDDs are often chained together.
Assuming you are familiar with Apache Spark fundamentals, you should know transformations are executed lazily. This means the new RDDs produced from a transformation function are not executed until it is called in the runtime. This allows for more efficient computation, as Spark can optimize the execution plan based on the transformations that are called.
There are two types of transformations in Spark which you may hear referred to in a few different ways. In this post we will differentiation between narrow transformations and wide transformations. Narrow transformations do not require shuffling of data between partitions, while wide transformations require shuffling of data across partitions.
The need to shuffle or share data across nodes in distributed systems can be painful for overall performance. For now, let’s just say want to try to avoid “shuffling” whenever possible. You will want, and sometimes in fact require, data to be local to the node on which the processing is occurring. Don’t worry if this doesn’t make sense yet. Shuffling, partitioning, and data locality will come up again and again.
By the way, this is not a construct unique to Apache Spark. Generally speaking, these are concepts applicable to most distributed systems.
Anyhow, we will cover many examples, but at a high level
- Narrow transformation examples include
map
,filter
, andunion
. - Wide transformation examples include
reduceByKey
,groupByKey
, andjoin
.
Types of Spark Transformations
There are two types of Spark transformations: narrow and wide transformations.
Spark Narrow Transformations
Narrow transformations are operations that do not require data to be shuffled across partitions. They operate on each partition independently, and the results are combined to form the final RDD. Long stroy short, this means, from a compute standpoint, they are not as expensive as wide transforms because they do not require data shuffling.
Spark Wide Transformations
Wide transformations require data to be “shuffled” across partitions. They can involve data exchanges between partitions and over the network of the Spark cluster, which can be expensive in terms of both time and resources.
If you are new to Spark with Scala, don’t worry about this too much yet. Follow the old adage of getting things to work first and then follow quickly with optimize. You will hear much of chatter and doomsday about data shuffling in Spark, don’t worry. For now, try to filter it out until you are ready to optimize. Now, let me be clear, don’t forget or ignore the optimize stage. That’s a common mistake too. Plan your time budget to optimize after getting it working.
There’s no need to cover all these concerns in this post. Let’s focus on getting you comfortable with Spark transformations through hands-on examples.
Here we go.
Spark Transformation Examples Setup
To go through the examples, I’m going to fire up the spark-shell. As mentioned, this is a beginner tutorial, so let’s keep it focused and as simple as possible.
On my environment, starting the spark-shell looks like this:
Spark Transformations Examples in Scala
map
What does it do? Map processes each element of the supplied RDD through and into a supplied function.
scala> val data = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:22
scala>val double_it = data.map(x => x * 2)
doubled: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:23
scala>double_it.collect().foreach(println)
2
4
6
8
10
What did this example do? map
iterates over every value in the data
RDD and multiplies each value by 2 into a new RDD. This new RDD is printed to display the results.
For this example only, let’s apply how this example may look in a Scala program outside of the spark-shell.
import org.apache.spark.sql.SparkSession
object MapExample {
def main(args: Array[String]): Unit = {
// create SparkSession
val spark = SparkSession.builder()
.appName("MapExample")
.master("local[*]")
.getOrCreate()
// create an RDD with some data
val data = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5))
// apply map function to RDD to double each element
val doubled = data.map(x => x * 2)
// print the result
doubled.collect().foreach(println)
// stop the SparkSession
spark.stop()
}
}
Let’s continue with spark-shell based examples in the following because it should be easy for you to translate from shell based examples.
flatMap
FlatMap is similar to map
, but each input item can be mapped to 0 or more output items (so the supplied func should return a Seq
rather than a single item).” Whereas the map
function can be thought of as a one-to-one operation, the flatMap
function can be considered a one-to-many.
I often confuse the difference between map
and flatMap
, but thinking in terms of one-to-one vs one-to-many helped me.
It map help to compare flatMap
to map
in the following:
scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect
res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect
res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3))
flatMap
can be especially helpful with nested datasets. For example, it may be helpful to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections). This is unlike CSV which should have no hierarchical structural.
You may be interested in Spark with JSON examples.
By the way, to sum of of you more familiar with Scala already, these examples may blur the line between Scala and Spark. Both Scala and Spark have both map
and flatMap
in their APIs. In a sense, the only Spark unique portion of this code example above is the use of `parallelize` from a SparkContext. When calling `parallelize`, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel. Being able to operate in parallel is a Spark feature. But, you knew that already.
Adding collect
to both the flatMap
and map
results was shown for clarity. We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use collect
as seen in the following:
scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x))
res202: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[373] at flatMap at <console>:13
scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x))
res203: org.apache.spark.rdd.RDD[List[Int]] = MappedRDD[375] at map at <console>:13
Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]
filter
Filter creates a new RDD by passing in the supplied function used to filter the results. For those folks with a relational database background, or coming from a SQL perspective, it may be helpful think of filter
similar in concept to the WHERE clause in a SQL statement. In other words, as WHERE in SQL is used to filter the results according to criteria such as “WHERE state = ‘MN'”.
val file = sc.textFile("catalina.out")
val errors = file.filter(line => line.contains("ERROR"))
Formal API: filter(f: (T) ⇒ Boolean): RDD[T]
mapPartitions
We may consider mapPartitions
a tool for performance optimization. I know I warned you about not thinking about performance optimizations early in the journey and it especially won’t do much good for you when running examples on your local machine, but let’s just say, don’t forget it when running on a Spark cluster. It’s the same as map
but works with Spark RDD partitions. Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets. Distributed across nodes in the cluster. Or, put another way, you could say it is distributed over partitions.
scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[450] at parallelize at <console>:12
scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res383: Array[Int] = Array(1, 4, 7)
// compare to the same, but with default parallelize
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[452] at parallelize at <console>:12
scala> parallel.mapPartitions( x => List(x.next).iterator).collect
res384: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]
mapPartitionsWithIndex
Similar to mapPartitions
but also provides a function with an Int
value to indicate the index position of the partition.
When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs. For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[455] at parallelize at <console>:12
scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res389: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9)
scala> val parallel = sc.parallelize(1 to 9, 3)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[457] at parallelize at <console>:12
scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect
res390: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)
Returns a new RDD by applying a function to each partition of the RDD and provides tracking the index of the original partition.
preservesPartitioning
indicates whether or not the input function preserves the partitioner. It should be false
unless this is a pair RDD and the input function does not modify the keys of the pair RDD.
sample
Return a random sample subset RDD of the supplied input RDD and can be seen in the following:
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[470] at parallelize at <console>:12
scala> parallel.sample(true,.2).count
res403: Long = 3
scala> parallel.sample(true,.2).count
res404: Long = 2
scala> parallel.sample(true,.1)
res405: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[473] at sample at <console>:15
Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
union
This one is simple and does exactly what you may already expect: return the union of two RDDs as seen:
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12
scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12
scala> parallel.union(par2).collect
res408: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
intersection
Again, this one is simple. Spark intersection
in Scala is similar to union
, but returns the intersection of two RDDs as can be seen:
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12
scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12
scala> parallel.intersection(par2).collect
res409: Array[Int] = Array(8, 9, 5, 6, 7)
Formal API: intersection(other: RDD[T]): RDD[T]
distinct
Another simple one. We’re on-a-roll now. Return a new RDD with distinct elements within a source RDD as shown in this Scala code example:
scala> val parallel = sc.parallelize(1 to 9)
parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12
scala> val par2 = sc.parallelize(5 to 15)
par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12
scala> parallel.union(par2).distinct.collect
res412: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
Formal API: distinct(): RDD[T]
The Keys
This is a little section I like to call “The Keys”. Why? Why Not I say.
The group of Spark transformations such as groupByKey, reduceByKey, aggregateByKey, sortByKey, join all act on RDDs with both keys and values. So, that’s why I named this section “The Keys”. Cool name, right? Well, not really, but it sounded much better than “The Keys and the Values”. Naming things can be difficult.
Let’s take a quick minute here. You will often work with RDDs with both keys and values in your Spark adventures. Get use to it. Get comfortable with it.
For deeper exploration of “byKey” functions, check the Scala API docs for PairRDDFunctions
class at https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html (link below
groupByKey
“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. “
The following groups all names to counties in which they appear over the years.
scala> val data = spark.sparkContext.parallelize(Seq(("apple", 1), ("banana", 2), ("apple", 3), ("banana", 4), ("orange", 5)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22
scala> val grouped = data.groupByKey()
grouped: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[8] at groupByKey at <console>:23
scala> grouped.collect().foreach(println)
(orange,CompactBuffer(5))
(apple,CompactBuffer(1, 3))
(banana,CompactBuffer(2, 4))
As seen above, groupByKey() function is applied to the data
RDD in order to group the data by key and create a new RDD grouped
. The result is printed to System.out in the collect() function. Th result is all the keyed elements of the RDD with values of an array of tuples associated with that key.
reduceByKey
This is a popular one. Whether you like it or not, you will likely use this one quite a bit.
Operates on (K,V) pairs of course, but the func must be of type (V,V) => V
Let’s sum the counts of the fruits in the previously referenced in previous examples.
scala>val reduced = data.reduceByKey((x, y) => x + y)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23
scala> reduced.collect().foreach(println)
(orange,5)
(apple,4)
(banana,6)
Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
Does this one need much explantaion or does the code speak for itself? The supplied lambda (x, y) => x + y
specifies how to combine the values associated with each key in order to determine a total count. This total count is the reduction into a single value.
aggregateByKey
Ok, I admit, this one drives me a bit nuts. Why wouldn’t we just use reduceByKey? I don’t feel smart enough to know when to use aggregateByKey over reduceByKey. For example, the same results may be produced:
scala> val agg = data.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[43] at aggregateByKey at <console>:23
scala> agg.foreach(println)
(banana,6)
(orange,5)
(apple,4)
Help me out here folks. Seems way more complicated. Why aggregateByKey
vs reduceByKey
?
sortByKey
This simply sorts the (K,V) pair by K. But, it’s not as simple as it may appear! Look at the examples below closely and see if you can find the differences.
scala> agg.sortByKey().foreach(println)
(apple,4)
(banana,6)
(orange,5)
scala> agg.sortByKey().collect().foreach(println)
(apple,4)
(banana,6)
(orange,5)
scala> agg.sortByKey(false).collect().foreach(println)
(orange,5)
(banana,6)
(apple,4)
Notice the differences with and without collect
?
It’s covered elsewhere on this site, but the collect()
function is Spark action returning all the elements of an RDD (Resilient Distributed Dataset) or a DataFrame to the driver program as an array.
When we call collect()
on an RDD, Spark triggers the computation of all the transformations that lead up to that RDD and then bring all the data from the worker nodes to the driver node. This can be useful for inspecting the contents of the RDD or DataFrame, or for performing local computations on the data.
As previous warned, calling collect() can be one of those expensive operations in Spark when running in a cluster. In our case, we are not in spark-shell. But, on a cluster, with large RDDs, it can cause the driver node to run out of memory. It is recommended to use collect()
only on small datasets after all the necessary Spark transformations have been applied.
In addition to collect(), there are actions in Spark, such as take(), first(), count(), and foreach(), which can be used to retrieve a smaller subset of the data or perform some operation on each element without bringing all the data to the driver node.
These actions can be more efficient than collect() in many cases.
join
If you have relational database experience, this will be easy. It’s joining of two datasets. Other joins are available as well such as leftOuterJoin and rightOuterJoin.
scala> val data1 = spark.sparkContext.parallelize(Seq(("apple", 1), ("banana", 2), ("orange", 5)))
data1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:22
scala> val data2 = spark.sparkContext.parallelize(Seq(("apple", "fruit"), ("banana", "fruit"), ("pear", "fruit"), ("tomato", "vegetable")))
data2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at parallelize at <console>:22
scala> // let's do the Spark join
scala> val joined = data1.join(data2)
joined: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[42] at join at <console>:24
scala> joined.collect().foreach(println)
(apple,(1,fruit))
(banana,(2,fruit))
In this example, we are performing an inner join, which returns only the tuples having matching keys in both of the RDDs. As you might imagine, there are other types of joins that can be performed in Spark, such as outer join, left join, right join, and so on, which can be specified using the joinType
parameter of the join()
function.
Spark Examples of Transformations in Scala Conclusion
As mentioned at the top, the way to really get a feel for your Spark API options with Spark Transformations is to perform these examples in your own environment or “hands on the keyboard” as some people like refer to it.
Further Resources
- https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/PairRDDFunctions.html
- Spark with Scala tutorials page
- Source code from the above examples https://github.com/supergloo/spark-scala-examples/blob/main/spark-shell/spark-transformations.md
Featured image credit https://flic.kr/p/8R8uP9
I loved your lessons. I tried out the transformations and actions but most importantly had fun while doing so because of the little notes of humor that you had left in between. I’m glad google led me to your site.
hello i am new to spark but you tutorial are easy to understand, can you please throw more light into what the K and the V stand for. thanks
K and V stands for Key and Value respectively.
val babyNames = spark.read.format(“csv”).option(“header”,”true”).load(“file:/C:/Bigdata/baby_names.csv”)
val rows = babyNames.map(line => line.split(“,”))
:26: error: value Map is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
val rows = babyNames.Map(line => line.split(“,”))
i am getting this error after reading this please help me