Mastering PySpark Filter: A Step-by-Step Guide through Examples


In PySpark, the DataFrame filter function, filters data together based on specified columns.  For example, with a DataFrame containing website click data, we may wish to group together all the platform values contained a certain column.  This would allow us to determine the most popular browser type used in website requests.

Solutions like this may be implemented with the PySpark filter function or through SQL in PySpark.  Both will be covered in this PySpark Filter tutorial.  We will go through examples using the filter function as well as SQL.

Between the examples, we’ll pause to briefly discuss performance considerations when filtering.  Afterwards, we’ll get back to more examples using and then we’ll wrap up with links to resources you may find helpful.

Let me know if you have any questions or suggestions for improvement in the comments below.

Table of Contents

PySpark filter By Example Setup

To run our filter examples, we need some example data.  As such, we will load some example data into a DataFrame from a  CSV file.  See PySpark reading CSV tutorial for a more in depth look at loading CSV in PySpark.  We are not going to cover it in detail in this PySpark filter tutorial.  In this example, we are going to use a data.csv file.  If you’d like to use this file, see Resources section links below for a link to the file.

When running the following examples, it is presumed the data.csv file is in the same directory as where you start up pyspark.  This is shown in the following commands.

~/dev/pyspark/filter $ ls
data.csv
~/dev/pyspark/filter $ pyspark
Python 3.9.12 (main, Mar 26 2022, 15:51:15)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Python version 3.9.12 (main, Mar 26 2022 15:51:15)
Spark context Web UI available at http://192.168.0.146:4040
Spark context available as 'sc' (master = local[*], app id = local-1669476118685).
SparkSession available as 'spark'.
>>> df = spark.read.options(header='true', inferschema='true').csv('data.csv')
>>> df.show()
+--------------------+-------------------+-------+----------+-------+-----------------+
|            event_id|         event_time|user_id|  platform|item_id|         referrer|
+--------------------+-------------------+-------+----------+-------+-----------------+
|0000157970c74089b...|2018-06-26 08:21:38| 256177|       web|   1648|             home|
|000095cc12ed40f2a...|2018-07-09 08:42:12| 287491|       web|    390|    google_search|
|00009c5122b04bc09...|2017-10-06 09:33:32| 198519|mobile web|   3241|    shopping_cart|
|0000ac00b5b741a89...|2014-07-25 17:40:56|  31861|       web|   2260|             home|
|0000b5cd8dde42559...|2018-06-27 03:37:30| 271734|       web|   3747|    shopping_cart|
|00015a1e375441328...|2018-04-06 21:14:08| 283465|       web|   2889|             home|
|0001bc181d374fd6a...|2018-07-12 02:06:05| 271291|       web|   1279|    google_search|
|0001d177b6ad410b8...|2015-10-14 23:04:24|  67361|   android|   3014|        item_page|
|0001fd08f7e745f29...|2017-11-08 20:51:54| 237955|       web|   1424|    google_search|
|0002025142cd4ebe8...|2018-10-11 04:08:13| 297994|       iOS|   3900|    google_search|
|00020d658efc4e749...|2015-02-26 19:05:10|  57053|   android|   1126|             home|
|0002a162383c44ec8...|2016-08-28 07:09:37| 145090|       iOS|   2430|             home|
|0002b1150042493c8...|2017-10-31 15:56:57| 239551|       web|   3675|    shopping_cart|
|0002c6e5bf2145759...|2017-07-07 03:59:10| 206123|   android|    511|        item_page|
|0003078ab0964a72b...|2016-03-30 12:35:16| 111792|       web|   2782|promo_email_click|
|00031f868e0c4bd8b...|2017-06-18 13:45:39| 210121|       web|   3203|    shopping_cart|
|000359ed46ef48f78...|2015-06-19 04:24:39|  63880|       iOS|    621|    shopping_cart|
|00037c1af5f848a4a...|2013-09-09 06:58:02|   4542|       web|    769|    shopping_cart|
|0003a589651343f5b...|2018-11-11 21:36:53| 296576|       web|   2515|    user_wishlist|
|0003b13ca5154593b...|2015-02-17 14:26:11|  35960|       web|   1647|        item_page|
+--------------------+-------------------+-------+----------+-------+-----------------+
only showing top 20 rows

PySpark Filter Examples

Ok, we are now ready to run through examples of filtering in PySpark. Let’s start with something simple.

Simple filter Example

>>> from pyspark.sql import functions as F
>>> df.filter(F.col("platform") == "android").select("*").show(5)
+--------------------+-------------------+-------+--------+-------+-------------+
|            event_id|         event_time|user_id|platform|item_id|     referrer|
+--------------------+-------------------+-------+--------+-------+-------------+
|0001d177b6ad410b8...|2015-10-14 23:04:24|  67361| android|   3014|    item_page|
|00020d658efc4e749...|2015-02-26 19:05:10|  57053| android|   1126|         home|
|0002c6e5bf2145759...|2017-07-07 03:59:10| 206123| android|    511|    item_page|
|0003ed6e469943f79...|2014-10-08 02:29:41|  40901| android|   1241|    item_page|
|0004c92ab23f4f658...|2017-01-27 08:30:58| 145763| android|   3997|user_wishlist|
+--------------------+-------------------+-------+--------+-------+-------------+
only showing top 5 rows

>>>

Notice the import of F. This provides a convenience for us in this filter example and future ones.

At first glimpse this first example looks simple, but filter has a profound impact on performance on large data sets. Whenever you are reading from some external source always attempt to push the predicate. You can see whether the predicate pushed to the source system or not as shown in below query plan.

>>> df.filter(F.col("platform") == "android").select("*").explain()
== Physical Plan ==
*(1) Filter (isnotnull(platform#20) AND (platform#20 = android))
+- FileScan csv [event_id#17,event_time#18,user_id#19,platform#20,item_id#21,referrer#22] Batched: false, DataFilters: [isnotnull(platform#20), (platform#20 = android)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/toddmcg/dev/pyspark/filter/data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(platform), EqualTo(platform,android)], ReadSchema: struct<event_id:string,event_time:timestamp,user_id:int,platform:string,item_id:int,referrer:string>

See “PushedFilters” in above.

PySpark Filter on array values in column

Let’s assume our data set contains an array as a value in a column. In this case, we have to filter values from the array for our different use cases. We’ll create a temporary view, so it’s easier to show the results.

>>> df.groupBy(["event_time"]) \
        .agg(F.collect_list("referrer").alias("referrer_list"),
           F.collect_list("user_id").alias("user_id_list"),
           F.collect_list("platform").alias("platform_list"),
           F.collect_list("event_time").alias("event_time_list"),
          ) \
        .createOrReplaceTempView("data_array") 
>>> new_sdf = spark.table("data_array")
>>> new_sdf.show()
+-------------------+-------------------+------------+-------------+--------------------+
|         event_time|      referrer_list|user_id_list|platform_list|     event_time_list|
+-------------------+-------------------+------------+-------------+--------------------+
|2013-06-25 16:56:27|    [shopping_cart]|      [1437]| [mobile web]|[2013-06-25 16:56...|
|2013-07-23 12:26:01|             [home]|      [3548]|        [web]|[2013-07-23 12:26...|
|2013-07-28 02:00:14|    [user_wishlist]|      [1889]|    [android]|[2013-07-28 02:00...|
|2013-07-30 20:29:21|    [shopping_cart]|      [3417]| [mobile web]|[2013-07-30 20:29...|
|2013-08-01 08:21:58|             [home]|      [3683]|    [android]|[2013-08-01 08:21...|
|2013-08-09 22:21:27|        [item_page]|      [4028]|        [web]|[2013-08-09 22:21...|
|2013-08-10 22:50:19|[promo_email_click]|       [372]|        [web]|[2013-08-10 22:50...|
|2013-08-14 14:05:55|    [shopping_cart]|       [547]|        [web]|[2013-08-14 14:05...|
|2013-08-19 22:28:54|    [shopping_cart]|      [3022]|        [iOS]|[2013-08-19 22:28...|
|2013-09-07 22:27:01|    [user_wishlist]|      [2150]|        [web]|[2013-09-07 22:27...|
|2013-09-08 22:51:07|             [home]|      [4239]| [mobile web]|[2013-09-08 22:51...|
|2013-09-09 06:58:02|    [shopping_cart]|      [4542]|        [web]|[2013-09-09 06:58...|
|2013-09-12 03:14:44|    [user_wishlist]|      [5228]|        [web]|[2013-09-12 03:14...|
|2013-09-18 15:02:32|             [home]|      [1666]|        [iOS]|[2013-09-18 15:02...|
|2013-10-05 02:51:16|    [shopping_cart]|      [1150]|        [iOS]|[2013-10-05 02:51...|
|2013-10-23 07:17:21|             [home]|      [3808]|        [iOS]|[2013-10-23 07:17...|
|2013-11-03 21:19:05|    [shopping_cart]|      [4101]| [mobile web]|[2013-11-03 21:19...|
|2013-11-05 11:59:22|             [home]|      [7319]|    [android]|[2013-11-05 11:59...|
|2013-11-09 04:26:16|    [shopping_cart]|      [6963]| [mobile web]|[2013-11-09 04:26...|
|2013-11-14 14:46:47|        [item_page]|      [6593]|    [android]|[2013-11-14 14:46...|
+-------------------+-------------------+------------+-------------+--------------------+

>>>

As previously noted, the use of F in this example is dependent on having successfully completed the first example.

How to PySpark filter with custom function

>>> def custom_filter(x):
...      return F.month(F.to_date(x)) > 5
...
>>> new_sdf.select(F.col("event_time"),
...                F.filter(F.col("event_time_list"), custom_filter))\
...         .show(5)
+-------------------+-----------------------------------------------------------------------------------------------------------+
|         event_time|filter(event_time_list, lambdafunction((month(to_date(namedlambdavariable())) > 5), namedlambdavariable()))|
+-------------------+-----------------------------------------------------------------------------------------------------------+
|2013-06-25 16:56:27|                                                                                       [2013-06-25 16:56...|
|2013-07-23 12:26:01|                                                                                       [2013-07-23 12:26...|
|2013-07-28 02:00:14|                                                                                       [2013-07-28 02:00...|
|2013-07-30 20:29:21|                                                                                       [2013-07-30 20:29...|
|2013-08-01 08:21:58|                                                                                       [2013-08-01 08:21...|
+-------------------+-----------------------------------------------------------------------------------------------------------+
only showing top 5 rows

>>>

PySpark filter with SQL Example

Filtering in SQL is implemented through the WHERE clause. This is pretty basic easy to understand.

>>> df.createOrReplaceTempView("clicks")
>>> spark.sql("select * from clicks where platform = 'web'").show(5)
+--------------------+-------------------+-------+--------+-------+-------------+
|            event_id|         event_time|user_id|platform|item_id|     referrer|
+--------------------+-------------------+-------+--------+-------+-------------+
|0000157970c74089b...|2018-06-26 08:21:38| 256177|     web|   1648|         home|
|000095cc12ed40f2a...|2018-07-09 08:42:12| 287491|     web|    390|google_search|
|0000ac00b5b741a89...|2014-07-25 17:40:56|  31861|     web|   2260|         home|
|0000b5cd8dde42559...|2018-06-27 03:37:30| 271734|     web|   3747|shopping_cart|
|00015a1e375441328...|2018-04-06 21:14:08| 283465|     web|   2889|         home|
+--------------------+-------------------+-------+--------+-------+-------------+
only showing top 5 rows

PySpark filtering array based columns In SQL

>>> spark.sql("WITH CTE_ANDROID \
... AS \
... ( SELECT event_time, FILTER (platform_list, i -> i LIKE '%android%') AS android FROM data_array) \
... SELECT * FROM CTE_ANDROID WHERE size(android) > 0 \
... ").show()
+-------------------+---------+
|         event_time|  android|
+-------------------+---------+
|2013-07-28 02:00:14|[android]|
|2013-08-01 08:21:58|[android]|
|2013-11-05 11:59:22|[android]|
|2013-11-14 14:46:47|[android]|
|2013-12-10 08:30:40|[android]|
|2013-12-14 19:26:04|[android]|
|2014-01-10 23:24:34|[android]|
|2014-02-11 18:36:29|[android]|
|2014-02-14 16:52:22|[android]|
|2014-02-15 09:09:00|[android]|
|2014-03-05 02:01:59|[android]|
|2014-04-05 10:05:39|[android]|
|2014-04-12 23:02:54|[android]|
|2014-05-03 22:38:30|[android]|
|2014-05-10 23:02:28|[android]|
|2014-05-16 07:02:06|[android]|
|2014-05-23 00:52:41|[android]|
|2014-06-20 08:37:58|[android]|
|2014-07-20 05:52:28|[android]|
|2014-08-01 12:51:49|[android]|
+-------------------+---------+

Further Resources

See also  PySpark Joins with SQL
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment