How to PySpark GroupBy through Examples

PySpark GroupBy Examples Tutorial

In PySpark, the DataFrame groupBy function, groups data together based on specified columns, so aggregations can be run on the collected groups.  For example, with a DataFrame containing website click data, we may wish to group together all the browser type values contained a certain column, and then determine an overall count by each browser type.  This would allow us to determine the most popular browser type used in website requests.

Solutions like this start with the groupBy function.

In this tutorial, let’s start by running a couple of examples.  Then, we’ll take a break to briefly discuss performance considerations when grouping.  Afterwards, we’ll get back to more examples using and then we’ll wrap up with links to resources you may find helpful.

Examples will be provided through PySpark groupBy functions as well as showing how to PySpark group by in SQL as well.

But, wait, let’s not all.  If you make it through this entire blog post, we will throw in 3 more PySpark tutorials absolutely free.

Ok, let’s go.

Table of Contents

PySpark groupBy Examples Setup

To run our “group by” examples, we need some example data.  PySpark reading CSV has been covered already.  In this example, we are going to use a data.csv file.  If you’d like to use this file, see Resources section links below for a link to the file.

See also  Deep dive into PySpark SQL Functions

When running the following examples, it is presumed the data.csv file is in the same directory as where you start up pyspark.  This is shown in the following commands.

~/dev/pyspark/groupBy [main] $ ls
data.csv
~/dev/pyspark/groupBy [main] $ pyspark
Python 3.9.12 (main, Mar 26 2022, 15:51:15)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Python version 3.9.12 (main, Mar 26 2022 15:51:15)
Spark context Web UI available at http://192.168.0.146:4040
Spark context available as 'sc' (master = local[*], app id = local-1669476118685).
SparkSession available as 'spark'.
>>> df = spark.read.options(header='true', inferschema='true').csv('data.csv')
>>> df.show()
+--------------------+-------------------+-------+----------+-------+-----------------+
|            event_id|         event_time|user_id|  platform|item_id|         referrer|
+--------------------+-------------------+-------+----------+-------+-----------------+
|0000157970c74089b...|2018-06-26 08:21:38| 256177|       web|   1648|             home|
|000095cc12ed40f2a...|2018-07-09 08:42:12| 287491|       web|    390|    google_search|
|00009c5122b04bc09...|2017-10-06 09:33:32| 198519|mobile web|   3241|    shopping_cart|
|0000ac00b5b741a89...|2014-07-25 17:40:56|  31861|       web|   2260|             home|
|0000b5cd8dde42559...|2018-06-27 03:37:30| 271734|       web|   3747|    shopping_cart|
|00015a1e375441328...|2018-04-06 21:14:08| 283465|       web|   2889|             home|
|0001bc181d374fd6a...|2018-07-12 02:06:05| 271291|       web|   1279|    google_search|
|0001d177b6ad410b8...|2015-10-14 23:04:24|  67361|   android|   3014|        item_page|
|0001fd08f7e745f29...|2017-11-08 20:51:54| 237955|       web|   1424|    google_search|
|0002025142cd4ebe8...|2018-10-11 04:08:13| 297994|       iOS|   3900|    google_search|
|00020d658efc4e749...|2015-02-26 19:05:10|  57053|   android|   1126|             home|
|0002a162383c44ec8...|2016-08-28 07:09:37| 145090|       iOS|   2430|             home|
|0002b1150042493c8...|2017-10-31 15:56:57| 239551|       web|   3675|    shopping_cart|
|0002c6e5bf2145759...|2017-07-07 03:59:10| 206123|   android|    511|        item_page|
|0003078ab0964a72b...|2016-03-30 12:35:16| 111792|       web|   2782|promo_email_click|
|00031f868e0c4bd8b...|2017-06-18 13:45:39| 210121|       web|   3203|    shopping_cart|
|000359ed46ef48f78...|2015-06-19 04:24:39|  63880|       iOS|    621|    shopping_cart|
|00037c1af5f848a4a...|2013-09-09 06:58:02|   4542|       web|    769|    shopping_cart|
|0003a589651343f5b...|2018-11-11 21:36:53| 296576|       web|   2515|    user_wishlist|
|0003b13ca5154593b...|2015-02-17 14:26:11|  35960|       web|   1647|        item_page|
+--------------------+-------------------+-------+----------+-------+-----------------+
only showing top 20 rows

PySpark groupBy Examples

PySpark groupBy Single Aggregation using Multiple Columns

>>> from pyspark.sql import functions as F
>>> df = df.withColumn("date", F.to_date(F.col("event_time")))
>>> df.groupBy(["date"]).sum("user_id", "item_id").show(2);
+----------+------------+------------+
|      date|sum(user_id)|sum(item_id)|
+----------+------------+------------+
|2013-09-09|        4542|         769|
|2018-03-17|      527098|        5488|
+----------+------------+------------+
only showing top 2 rows

>>>

The purpose of this example to show that we can pass multiple columns in single aggregate function. Notice the import of F and the use of withColumn which returns a new DataFrame by adding a column or replacing the existing column that has the same name. This allows us to groupBy date and sum multiple columns.

See also  PySpark withColumn by Example

Multiple aggregate function in a single group by

Let’s see the latest event time of user interaction on each day in this next example.

>>> df.groupBy(["date", "user_id"]) \
... .agg(F.max("event_time").alias("last_interaction_time"), F.count("item_id").alias("item_id"))\
... .show(5)
+----------+-------+---------------------+-------+
|      date|user_id|last_interaction_time|item_id|
+----------+-------+---------------------+-------+
|2017-06-18| 210121|  2017-06-18 13:45:39|      1|
|2016-09-29| 152196|  2016-09-29 16:24:53|      1|
|2017-12-07| 228551|  2017-12-07 23:19:12|      1|
|2016-07-09| 110412|  2016-07-09 12:22:56|      1|
|2016-05-11|  98632|  2016-05-11 16:44:09|      1|
+----------+-------+---------------------+-------+
only showing top 5 rows

>>>

Note: the use of F in this example is dependent on having successfully completed the previous example.

Group By Aggregation on Value of Column

>>> df.groupBy(["date"]) \
... .agg(F.count(F.when(F.col("referrer") == "shopping_cart", 1)).alias("shopping_cart_page"), \
... F.count(F.when(F.col("referrer") == "home", 1)).alias("home_page"), \
... F.count(F.when(F.col("referrer") == "promo_email_click", 1)).alias("promo_email_click_page"), \
... ).show(5)
+----------+------------------+---------+----------------------+
|      date|shopping_cart_page|home_page|promo_email_click_page|
+----------+------------------+---------+----------------------+
|2013-09-09|                 1|        0|                     0|
|2018-03-17|                 0|        0|                     0|
|2018-06-06|                 0|        1|                     0|
|2016-04-25|                 0|        1|                     0|
|2015-03-06|                 0|        0|                     1|
+----------+------------------+---------+----------------------+
only showing top 5 rows

In the above example, we are interested in determining how many users came from shopping_cart, home, and promo_email_click page on each day.

PySpark groupBy Performance Considerations

The performance of groupBy operations, has following execution plan

  1. Local aggregation on each executer
  2. Data Exchange (as groupBy causes data to shuffle)
  3. Final data merge after the shuffle

Spark is smart enough to only select necessary columns. We can reduce shuffle operation in groupBy if data is partitioned correctly by bucketing.

PySpark Group By with SQL Example

How about the times when want a SQL solution for “group by” rather than the groupBy function. The last example of aggregation on particular value in a column in SQL is possible with SQL as shown in the following.

>>> df.createOrReplaceTempView("clicks")
>>> spark.sql("SELECT TO_DATE(event_time), count(CASE WHEN REFERRER == 'shopping_cart' THEN 1 END) as shopping_cart_page, count(CASE WHEN REFERRER == 'home' THEN 1 END) as home_page, count(CASE WHEN REFERRER == 'promo_email_click' THEN 1 END) as promo_email_click_page FROM clicks GROUP BY TO_DATE(event_time)").show(5)
+-------------------+------------------+---------+----------------------+
|to_date(event_time)|shopping_cart_page|home_page|promo_email_click_page|
+-------------------+------------------+---------+----------------------+
|         2013-09-09|                 1|        0|                     0|
|         2018-03-17|                 0|        0|                     0|
|         2018-06-06|                 0|        1|                     0|
|         2016-04-25|                 0|        1|                     0|
|         2015-03-06|                 0|        0|                     1|
+-------------------+------------------+---------+----------------------+
only showing top 5 rows

Further Resources

See also  PySpark SQL MySQL Python Example with JDBC

Leave a Reply

Your email address will not be published. Required fields are marked *