In PySpark, the DataFrame groupBy function, groups data together based on specified columns, so aggregations can be run on the collected groups. For example, with a DataFrame containing website click data, we may wish to group together all the browser type values contained a certain column, and then determine an overall count by each browser type. This would allow us to determine the most popular browser type used in website requests.
Solutions like this start with the groupBy
function.
In this tutorial, let’s start by running a couple of examples. Then, we’ll take a break to briefly discuss performance considerations when grouping. Afterwards, we’ll get back to more examples using and then we’ll wrap up with links to resources you may find helpful.
Examples will be provided through PySpark groupBy functions as well as showing how to PySpark group by in SQL as well.
But, wait, let’s not all. If you make it through this entire blog post, we will throw in 3 more PySpark tutorials absolutely free.
Ok, let’s go.
Table of Contents
- PySpark groupBy Examples Setup
- PySpark groupBy Examples
- PySpark groupBy Performance Considerations
- PySpark Group By with SQL Example
PySpark groupBy Examples Setup
To run our “group by” examples, we need some example data. PySpark reading CSV has been covered already. In this example, we are going to use a data.csv file. If you’d like to use this file, see Resources section links below for a link to the file.
When running the following examples, it is presumed the data.csv file is in the same directory as where you start up pyspark
. This is shown in the following commands.
~/dev/pyspark/groupBy [main] $ ls
data.csv
~/dev/pyspark/groupBy [main] $ pyspark
Python 3.9.12 (main, Mar 26 2022, 15:51:15)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.1
/_/
Using Python version 3.9.12 (main, Mar 26 2022 15:51:15)
Spark context Web UI available at http://192.168.0.146:4040
Spark context available as 'sc' (master = local[*], app id = local-1669476118685).
SparkSession available as 'spark'.
>>> df = spark.read.options(header='true', inferschema='true').csv('data.csv')
>>> df.show()
+--------------------+-------------------+-------+----------+-------+-----------------+
| event_id| event_time|user_id| platform|item_id| referrer|
+--------------------+-------------------+-------+----------+-------+-----------------+
|0000157970c74089b...|2018-06-26 08:21:38| 256177| web| 1648| home|
|000095cc12ed40f2a...|2018-07-09 08:42:12| 287491| web| 390| google_search|
|00009c5122b04bc09...|2017-10-06 09:33:32| 198519|mobile web| 3241| shopping_cart|
|0000ac00b5b741a89...|2014-07-25 17:40:56| 31861| web| 2260| home|
|0000b5cd8dde42559...|2018-06-27 03:37:30| 271734| web| 3747| shopping_cart|
|00015a1e375441328...|2018-04-06 21:14:08| 283465| web| 2889| home|
|0001bc181d374fd6a...|2018-07-12 02:06:05| 271291| web| 1279| google_search|
|0001d177b6ad410b8...|2015-10-14 23:04:24| 67361| android| 3014| item_page|
|0001fd08f7e745f29...|2017-11-08 20:51:54| 237955| web| 1424| google_search|
|0002025142cd4ebe8...|2018-10-11 04:08:13| 297994| iOS| 3900| google_search|
|00020d658efc4e749...|2015-02-26 19:05:10| 57053| android| 1126| home|
|0002a162383c44ec8...|2016-08-28 07:09:37| 145090| iOS| 2430| home|
|0002b1150042493c8...|2017-10-31 15:56:57| 239551| web| 3675| shopping_cart|
|0002c6e5bf2145759...|2017-07-07 03:59:10| 206123| android| 511| item_page|
|0003078ab0964a72b...|2016-03-30 12:35:16| 111792| web| 2782|promo_email_click|
|00031f868e0c4bd8b...|2017-06-18 13:45:39| 210121| web| 3203| shopping_cart|
|000359ed46ef48f78...|2015-06-19 04:24:39| 63880| iOS| 621| shopping_cart|
|00037c1af5f848a4a...|2013-09-09 06:58:02| 4542| web| 769| shopping_cart|
|0003a589651343f5b...|2018-11-11 21:36:53| 296576| web| 2515| user_wishlist|
|0003b13ca5154593b...|2015-02-17 14:26:11| 35960| web| 1647| item_page|
+--------------------+-------------------+-------+----------+-------+-----------------+
only showing top 20 rows
PySpark groupBy Examples
PySpark groupBy Single Aggregation using Multiple Columns
>>> from pyspark.sql import functions as F
>>> df = df.withColumn("date", F.to_date(F.col("event_time")))
>>> df.groupBy(["date"]).sum("user_id", "item_id").show(2);
+----------+------------+------------+
| date|sum(user_id)|sum(item_id)|
+----------+------------+------------+
|2013-09-09| 4542| 769|
|2018-03-17| 527098| 5488|
+----------+------------+------------+
only showing top 2 rows
>>>
The purpose of this example to show that we can pass multiple columns in single aggregate function. Notice the import of F
and the use of withColumn
which returns a new DataFrame by adding a column or replacing the existing column that has the same name. This allows us to groupBy date and sum multiple columns.
Multiple aggregate function in a single group by
Let’s see the latest event time of user interaction on each day in this next example.
>>> df.groupBy(["date", "user_id"]) \
... .agg(F.max("event_time").alias("last_interaction_time"), F.count("item_id").alias("item_id"))\
... .show(5)
+----------+-------+---------------------+-------+
| date|user_id|last_interaction_time|item_id|
+----------+-------+---------------------+-------+
|2017-06-18| 210121| 2017-06-18 13:45:39| 1|
|2016-09-29| 152196| 2016-09-29 16:24:53| 1|
|2017-12-07| 228551| 2017-12-07 23:19:12| 1|
|2016-07-09| 110412| 2016-07-09 12:22:56| 1|
|2016-05-11| 98632| 2016-05-11 16:44:09| 1|
+----------+-------+---------------------+-------+
only showing top 5 rows
>>>
Note: the use of F
in this example is dependent on having successfully completed the previous example.
Group By Aggregation on Value of Column
>>> df.groupBy(["date"]) \
... .agg(F.count(F.when(F.col("referrer") == "shopping_cart", 1)).alias("shopping_cart_page"), \
... F.count(F.when(F.col("referrer") == "home", 1)).alias("home_page"), \
... F.count(F.when(F.col("referrer") == "promo_email_click", 1)).alias("promo_email_click_page"), \
... ).show(5)
+----------+------------------+---------+----------------------+
| date|shopping_cart_page|home_page|promo_email_click_page|
+----------+------------------+---------+----------------------+
|2013-09-09| 1| 0| 0|
|2018-03-17| 0| 0| 0|
|2018-06-06| 0| 1| 0|
|2016-04-25| 0| 1| 0|
|2015-03-06| 0| 0| 1|
+----------+------------------+---------+----------------------+
only showing top 5 rows
In the above example, we are interested in determining how many users came from shopping_cart, home, and promo_email_click page on each day.
PySpark groupBy Performance Considerations
The performance of groupBy
operations, has following execution plan
- Local aggregation on each executer
- Data Exchange (as groupBy causes data to shuffle)
- Final data merge after the shuffle
Spark is smart enough to only select necessary columns. We can reduce shuffle operation in groupBy if data is partitioned correctly by bucketing.
PySpark Group By with SQL Example
How about the times when want a SQL solution for “group by” rather than the groupBy
function. The last example of aggregation on particular value in a column in SQL is possible with SQL as shown in the following.
>>> df.createOrReplaceTempView("clicks")
>>> spark.sql("SELECT TO_DATE(event_time), count(CASE WHEN REFERRER == 'shopping_cart' THEN 1 END) as shopping_cart_page, count(CASE WHEN REFERRER == 'home' THEN 1 END) as home_page, count(CASE WHEN REFERRER == 'promo_email_click' THEN 1 END) as promo_email_click_page FROM clicks GROUP BY TO_DATE(event_time)").show(5)
+-------------------+------------------+---------+----------------------+
|to_date(event_time)|shopping_cart_page|home_page|promo_email_click_page|
+-------------------+------------------+---------+----------------------+
| 2013-09-09| 1| 0| 0|
| 2018-03-17| 0| 0| 0|
| 2018-06-06| 0| 1| 0|
| 2016-04-25| 0| 1| 0|
| 2015-03-06| 0| 0| 1|
+-------------------+------------------+---------+----------------------+
only showing top 5 rows
Further Resources
- The data.csv file used in this example may be downloaded from https://github.com/supergloo/pyspark/tree/main/groupBy
- More examples of PySpark Transformations such as groupBy
- See GroupedData all the available aggregate functions at https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.html#pyspark.sql.GroupedData
- API documentation