PySpark Transformations Tutorial [14 Examples]


If you’ve read the previous PySpark tutorials on this site, you know that PySpark transformations are functions produce a DataFrame, DataSet or Resilient Distributed Dataset (RDD).  Resilient distributed datasets are Spark’s main programming abstraction and RDDs are automatically parallelized across the cluster. 

PySpark transformation functions are lazily initialized.

As Spark matured, this abstraction changed from RDDs to DataFrame to DataSets, but the underlying concept of a Spark transformation remains the same: transformations produce a new, lazily initialized abstraction for data set whether the underlying implementation is an RDD, DataFrame or DataSet. 

Note: as you would probably expect when using Python, RDDs can hold objects of multiple types because Python is dynamically typed.

In some of the Spark Transformation examples in Python examples shown below, a CSV file is loaded.  A snippet of this CSV file:

Year,First Name,County,Sex,Count
2012,DOMINIC,CAYUGA,M,62012,ADDISON,ONONDAGA,F,14
2012,ADDISON,ONONDAGA,F,14
2012,JULIA,ONONDAGA,F,15

There is a link to download this file in Resources section below.

PySpark Transformations Examples


map

Map transformation returns a new RDD by applying a function to each element of this RDD

>>> baby_names = sc.textFile("baby_names.csv")
>>> rows = baby_names.map(lambda line: line.split(","))

So, in this pyspark transformation example, we’re creating a new RDD called “rows” by splitting every row in the baby_names RDD.  We accomplish this by mapping over every element in baby_names and passing in a lambda function to split by commas.

From here, we could use Python to access the array as shown next:

>>> for row in rows.take(rows.count()): print(row[1])

First Name
DAVID
JAYDEN
...

flatMap

flatMap is similar to map, because it applies a function to all elements in a RDD.  But, flatMap flattens the results.

Compare flatMap to map in the following

>>> sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x]).collect()
[2, 2, 2, 3, 3, 3, 4, 4, 4]

>>> sc.parallelize([1,2,3]).map(lambda x: [x,x,x]).collect()
[[1, 1, 1], [2, 2, 2], [3, 3, 3]]

This is helpful with nested datasets such as found in JSON.

Adding collect to flatMap and map results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use collect:

>>> sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x])
PythonRDD[36] at RDD at PythonRDD.scala:43

filter

Create a new RDD bye returning only the elements that satisfy the search filter.  For SQL minded, think where clause.

>>> rows.filter(lambda line: "MICHAEL" in line).collect()
Out[36]:
[[u'2013', u'MICHAEL', u'QUEENS', u'M', u'155'],
 [u'2013', u'MICHAEL', u'KINGS', u'M', u'146'],
 [u'2013', u'MICHAEL', u'SUFFOLK', u'M', u'142']...

For a more in depth tutorial on filter see PySpark Filter Tutorial.

See also  Deploy PySpark to a Spark Cluster with spark-submit [3 Examples]

mapPartitions

Consider mapPartitions a tool for performance optimization if you have the resources available.  It won’t do much when running examples on your laptop.  It’s the same as “map”, but works with Spark RDD partitions which are distributed.  Remember the first D in RDD – Resilient Distributed Datasets.

In examples below that when using parallelize, elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.

>>> one_through_9 = range(1,10)
>>> parallel = sc.parallelize(one_through_9, 3)
>>> def f(iterator): yield sum(iterator)
>>> parallel.mapPartitions(f).collect()
[6, 15, 24]

>>> parallel = sc.parallelize(one_through_9)
>>> parallel.mapPartitions(f).collect()
[1, 2, 3, 4, 5, 6, 7, 17]

See what’s happening?  Results [6,15,24] are created because mapPartitions loops through 3 partitions which is the second argument to the sc.parallelize call.

Partion 1: 1+2+3 = 6

Partition 2: 4+5+6 = 15

Partition 3: 7+8+9 = 24

The second example produces [1,2,3,4,5,6,7,17] which I’m guessing means the default number of partitions on my laptop is 8.

Partion 1 = 1

Partition 2= 2

Partion 3 = 3

Partition 4 = 4

Partion 5 = 5

Partition 6 = 6

Partion 7 = 7

Partition 8: 8+9 = 17

Typically you want 2-4 partitions for each CPU core in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster or hardware based on standalone environment.

To find the default number of partitions and confirm the guess of 8 above:

>>> print sc.defaultParallelism
8

mapPartitionsWithIndex

Similar to mapPartitions, but also provides a function with an int value to indicate the index position of the partition.

>>> parallel = sc.parallelize(range(1,10),4)
>>> def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
>>> parallel.mapPartitionsWithIndex(show).collect()

['index: 0 values: 1',
 'index: 1 values: 3',
 'index: 2 values: 5',
 'index: 3 values: 7']

When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelized list with 3 slices, our output changes significantly:

>>> parallel = sc.parallelize(range(1,10),3)
>>> def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))
>>> parallel.mapPartitionsWithIndex(show).collect()

['index: 0 values: [1, 2, 3]',
 'index: 1 values: [4, 5, 6]',
 'index: 2 values: [7, 8, 9]']

sample

Return a random sample subset RDD of the input RDD

>>> parallel = sc.parallelize(range(1,10))
>>> parallel.sample(True,.2).count()
2

>>> parallel.sample(True,.2).count()
1

>>> parallel.sample(True,.2).count()
2

sample(withReplacement, fraction, seed=None)

Parameters:
  • withReplacement – can elements be sampled multiple times (replaced when sampled out)
  • fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
  • seed – seed for the random number generator

union

Simple.  Return the union of two RDDs

>>> one = sc.parallelize(range(1,10))
>>> two = sc.parallelize(range(10,21))
>>> one.union(two).collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

Back to Top

See also  PySpark Examples of Actions

intersection

Again, simple.  Similar to union but return the intersection of two RDDs

>>> one = sc.parallelize(range(1,10))
>>> two = sc.parallelize(range(5,15))
>>> one.intersection(two).collect()
[5, 6, 7, 8, 9]

distinct

Another simple one.  Return a new RDD with distinct elements within a source RDD

>>> parallel = sc.parallelize(range(1,9))
>>> par2 = sc.parallelize(range(5,15))

>>> parallel.union(par2).distinct().collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

Formal API: distinct(): RDD[T]

Back to Top

The Keys

The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key, value pair RDDs.

For the following, we’re going to use the baby_names.csv file again which was introduced in a previous post What is Apache Spark?

All the following examples presume the baby_names.csv file has been loaded and split such as:

>>> baby_names = sc.textFile("baby_names.csv")
>>> rows = baby_names.map(lambda line: line.split(","))

Back to Top

groupByKey

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. “

The following groups all names to counties in which they appear over the years.

>>> rows = baby_names.map(lambda line: line.split(","))
>>> namesToCounties = rows.map(lambda n: (str(n[1]),str(n[2]) )).groupByKey()
>>> namesToCounties.map(lambda x : {x[0]: list(x[1])}).collect()

[{'GRIFFIN': ['ERIE',
   'ONONDAGA',
   'NEW YORK',
   'ERIE',
   'SUFFOLK',
   'MONROE',
   'NEW YORK',
...

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark? See reading CSV from PySpark tutorial for loading CSV in PySpark.

For a more in depth tutorial on groupBy see PySpark groupBy Tutorial.

reduceByKey

Operates on key, value pairs again, but the func must be of type (V,V) => V

Let’s sum the yearly name counts over the years in the CSV.  Notice we need to filter out the header row.  Also notice we are going to use the “Count” column value (n[4])

>>> filtered_rows = baby_names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))
>>> filtered_rows.map(lambda n:  (str(n[1]), int(n[4]) ) ).reduceByKey(lambda v1,v2: v1 + v2).collect()

[('GRIFFIN', 268),
 ('KALEB', 172),
 ('JOHNNY', 219),
 ('SAGE', 5),
 ('MIKE', 40),
 ('NAYELI', 44),
....

Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

See also  PySpark Quick Start [Introduction to Apache Spark for Python Developers]

The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

aggregateByKey

Ok, I admit, this one drives me a bit nuts.  Why wouldn’t we just use reduceByKey?  I don’t feel smart enough to know when to use aggregateByKey over reduceByKey.  For example, the same results may be produced as reduceByKey:

>>> filtered_rows = baby_names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))
>>> filtered_rows.map(lambda n:  (str(n[1]), int(n[4]) ) ).aggregateByKey(0, lambda k,v: int(v)+k, lambda v,k: k+v).collect()

[('GRIFFIN', 268),
 ('KALEB', 172),
 ('JOHNNY', 219),
 ('SAGE', 5),
...

And again,  the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

There’s a gist of aggregateByKey as well.

sortByKey

This simply sorts the (K,V) pair by K.  Try it out. See examples above on where babyNames originates.

>>> filtered_rows.map (lambda n:  (str(n[1]), int(n[4]) ) ).sortByKey().collect()
[('AADEN', 18),
 ('AADEN', 11),
 ('AADEN', 10),
 ('AALIYAH', 50),
 ('AALIYAH', 44),
...

#opposite
>>> filtered_rows.map (lambda n:  (str(n[1]), int(n[4]) ) ).sortByKey(False).collect()

[('ZOIE', 5),
 ('ZOEY', 37),
 ('ZOEY', 32),
 ('ZOEY', 30),
...

join

If you have relational database experience, this will be easy.  It’s joining of two datasets.  Other joins are available as well such as leftOuterJoin and rightOuterJoin.

>>> names1 = sc.parallelize(("abe", "abby", "apple")).map(lambda a: (a, 1))
>>> names2 = sc.parallelize(("apple", "beatty", "beatrice")).map(lambda a: (a, 1))
>>> names1.join(names2).collect()

[('apple', (1, 1))]

leftOuterJoin, rightOuterJoin

>>> names1.leftOuterJoin(names2).collect()
[('abe', (1, None)), ('apple', (1, 1)), ('abby', (1, None))]

>>> names1.rightOuterJoin(names2).collect()
[('apple', (1, 1)), ('beatrice', (None, 1)), ('beatty', (None, 1))]

If you are interested in learning more, there are PySpark Joins with SQL and PySpark Joins with DataFrame tutorials.

Frequently Asked Questions

1. What is a transformation in PySpark?

A PySpark transformation are operations which creates a new RDD (Resilient Distributed Dataset) / DataFrame from an existing one.

Transformations are lazily evaluated, meaning they are not executed immediately when called, but rather, create a plan for how to execute the operation when an action is called. This allows PySpark to optimize the execution plan and reduce unnecessary computation.

Transformations are an essential part of PySpark programming.

2. What is the difference between transformations and actions in PySpark?

Transformations create a new DataFrame without immediately computing the result, while PySpark actions trigger the computation of the DataFrame and return a value or perform a side effect on the data.

See PySpark action examples for deeper dive on PySpark actions.

3. What are the different transformation types in PySpark?

As we saw above, there are many different transformation types in Pyspark.

References

Be sure to check out the other PySpark tutorials.

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

4 thoughts on “PySpark Transformations Tutorial [14 Examples]”

  1. Hii ,

    I am facing issue while spliting columns of rdd in pyspark,while same approach works fine with
    scala.
    i have python list

    final_file_for_Df
    [‘/home/fact/test/file_1.dat,test,file_1.dat,2’, ‘/home/facts/test/file_2.dat,test,file_2.dat,2’]

    –covert to rdd
    rdd_f_n_cnt = sc.parallelize(final_file_for_Df)

    –splitting
    rdd_f_n_cnt_2 = rdd_f_n_cnt.map(lambda l:l.split(“,”))
    –seperating columns to asssign schema later in dataframe
    rdd_f_n_cnt_3= rdd_f_n_cnt_2.map(lambda cols:Row(cols(0),cols(1),cols(2),cols(3)))

    but at rdd_f_n_cnt_3 .collect() it throws error like

    TypeError: ‘list’ object is not callable

    can you please help in this case?

    Reply
    • final_file_for_Df = [“/home/fact/test/file_1.dat,test,file_1.dat,2”, “/home/facts/test/file_2.dat,test,file_2.dat,2”]
      rdd_f_n_cnt = sc.parallelize(final_file_for_Df)
      rdd_f_n_cnt_2 = rdd_f_n_cnt.map(lambda l: l.split(“,”))
      rdd_f_n_cnt_2.collect()

      Output:[[‘/home/fact/test/file_1.dat’, ‘test’, ‘file_1.dat’, ‘2’],
      [‘/home/facts/test/file_2.dat’, ‘test’, ‘file_2.dat’, ‘2’]]

      What are you trying to get with the Row function?

      Reply
  2. I have a query, I am new to Apache spark and works on pyspark 2.3.x, how can i use map function with a custom defined function (using python def) however I am able to use map with lambdas. Unable to find enough material on internet. kindly suggest. thanks in advance

    Reply

Leave a Comment