In PySpark, the DataFrame filter function, filters data together based on specified columns. For example, with a DataFrame containing website click data, we may wish to group together all the platform values contained a certain column. This would allow us to determine the most popular browser type used in website requests.
Solutions like this may be implemented with the PySpark filter
function or through SQL in PySpark. Both will be covered in this PySpark Filter tutorial. We will go through examples using the filter function as well as SQL.
Between the examples, we’ll pause to briefly discuss performance considerations when filtering. Afterwards, we’ll get back to more examples using and then we’ll wrap up with links to resources you may find helpful.
Let me know if you have any questions or suggestions for improvement in the comments below.
Table of Contents
- PySpark filter By Example Setup
- PySpark Filter Examples
- PySpark filter with SQL Example
- PySpark filtering array based columns In SQL
PySpark filter By Example Setup
To run our filter examples, we need some example data. As such, we will load some example data into a DataFrame from a CSV file. See PySpark reading CSV tutorial for a more in depth look at loading CSV in PySpark. We are not going to cover it in detail in this PySpark filter tutorial. In this example, we are going to use a data.csv file. If you’d like to use this file, see Resources section links below for a link to the file.
When running the following examples, it is presumed the data.csv file is in the same directory as where you start up pyspark
. This is shown in the following commands.
~/dev/pyspark/filter $ ls
data.csv
~/dev/pyspark/filter $ pyspark
Python 3.9.12 (main, Mar 26 2022, 15:51:15)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.1
/_/
Using Python version 3.9.12 (main, Mar 26 2022 15:51:15)
Spark context Web UI available at http://192.168.0.146:4040
Spark context available as 'sc' (master = local[*], app id = local-1669476118685).
SparkSession available as 'spark'.
>>> df = spark.read.options(header='true', inferschema='true').csv('data.csv')
>>> df.show()
+--------------------+-------------------+-------+----------+-------+-----------------+
| event_id| event_time|user_id| platform|item_id| referrer|
+--------------------+-------------------+-------+----------+-------+-----------------+
|0000157970c74089b...|2018-06-26 08:21:38| 256177| web| 1648| home|
|000095cc12ed40f2a...|2018-07-09 08:42:12| 287491| web| 390| google_search|
|00009c5122b04bc09...|2017-10-06 09:33:32| 198519|mobile web| 3241| shopping_cart|
|0000ac00b5b741a89...|2014-07-25 17:40:56| 31861| web| 2260| home|
|0000b5cd8dde42559...|2018-06-27 03:37:30| 271734| web| 3747| shopping_cart|
|00015a1e375441328...|2018-04-06 21:14:08| 283465| web| 2889| home|
|0001bc181d374fd6a...|2018-07-12 02:06:05| 271291| web| 1279| google_search|
|0001d177b6ad410b8...|2015-10-14 23:04:24| 67361| android| 3014| item_page|
|0001fd08f7e745f29...|2017-11-08 20:51:54| 237955| web| 1424| google_search|
|0002025142cd4ebe8...|2018-10-11 04:08:13| 297994| iOS| 3900| google_search|
|00020d658efc4e749...|2015-02-26 19:05:10| 57053| android| 1126| home|
|0002a162383c44ec8...|2016-08-28 07:09:37| 145090| iOS| 2430| home|
|0002b1150042493c8...|2017-10-31 15:56:57| 239551| web| 3675| shopping_cart|
|0002c6e5bf2145759...|2017-07-07 03:59:10| 206123| android| 511| item_page|
|0003078ab0964a72b...|2016-03-30 12:35:16| 111792| web| 2782|promo_email_click|
|00031f868e0c4bd8b...|2017-06-18 13:45:39| 210121| web| 3203| shopping_cart|
|000359ed46ef48f78...|2015-06-19 04:24:39| 63880| iOS| 621| shopping_cart|
|00037c1af5f848a4a...|2013-09-09 06:58:02| 4542| web| 769| shopping_cart|
|0003a589651343f5b...|2018-11-11 21:36:53| 296576| web| 2515| user_wishlist|
|0003b13ca5154593b...|2015-02-17 14:26:11| 35960| web| 1647| item_page|
+--------------------+-------------------+-------+----------+-------+-----------------+
only showing top 20 rows
PySpark Filter Examples
Ok, we are now ready to run through examples of filtering in PySpark. Let’s start with something simple.
Simple filter Example
>>> from pyspark.sql import functions as F
>>> df.filter(F.col("platform") == "android").select("*").show(5)
+--------------------+-------------------+-------+--------+-------+-------------+
| event_id| event_time|user_id|platform|item_id| referrer|
+--------------------+-------------------+-------+--------+-------+-------------+
|0001d177b6ad410b8...|2015-10-14 23:04:24| 67361| android| 3014| item_page|
|00020d658efc4e749...|2015-02-26 19:05:10| 57053| android| 1126| home|
|0002c6e5bf2145759...|2017-07-07 03:59:10| 206123| android| 511| item_page|
|0003ed6e469943f79...|2014-10-08 02:29:41| 40901| android| 1241| item_page|
|0004c92ab23f4f658...|2017-01-27 08:30:58| 145763| android| 3997|user_wishlist|
+--------------------+-------------------+-------+--------+-------+-------------+
only showing top 5 rows
>>>
Notice the import of F
. This provides a convenience for us in this filter example and future ones.
At first glimpse this first example looks simple, but filter
has a profound impact on performance on large data sets. Whenever you are reading from some external source always attempt to push the predicate. You can see whether the predicate pushed to the source system or not as shown in below query plan.
>>> df.filter(F.col("platform") == "android").select("*").explain()
== Physical Plan ==
*(1) Filter (isnotnull(platform#20) AND (platform#20 = android))
+- FileScan csv [event_id#17,event_time#18,user_id#19,platform#20,item_id#21,referrer#22] Batched: false, DataFilters: [isnotnull(platform#20), (platform#20 = android)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/toddmcg/dev/pyspark/filter/data.csv], PartitionFilters: [], PushedFilters: [IsNotNull(platform), EqualTo(platform,android)], ReadSchema: struct<event_id:string,event_time:timestamp,user_id:int,platform:string,item_id:int,referrer:string>
See “PushedFilters” in above.
PySpark Filter on array values in column
Let’s assume our data set contains an array as a value in a column. In this case, we have to filter values from the array for our different use cases. We’ll create a temporary view, so it’s easier to show the results.
>>> df.groupBy(["event_time"]) \
.agg(F.collect_list("referrer").alias("referrer_list"),
F.collect_list("user_id").alias("user_id_list"),
F.collect_list("platform").alias("platform_list"),
F.collect_list("event_time").alias("event_time_list"),
) \
.createOrReplaceTempView("data_array")
>>> new_sdf = spark.table("data_array")
>>> new_sdf.show()
+-------------------+-------------------+------------+-------------+--------------------+
| event_time| referrer_list|user_id_list|platform_list| event_time_list|
+-------------------+-------------------+------------+-------------+--------------------+
|2013-06-25 16:56:27| [shopping_cart]| [1437]| [mobile web]|[2013-06-25 16:56...|
|2013-07-23 12:26:01| [home]| [3548]| [web]|[2013-07-23 12:26...|
|2013-07-28 02:00:14| [user_wishlist]| [1889]| [android]|[2013-07-28 02:00...|
|2013-07-30 20:29:21| [shopping_cart]| [3417]| [mobile web]|[2013-07-30 20:29...|
|2013-08-01 08:21:58| [home]| [3683]| [android]|[2013-08-01 08:21...|
|2013-08-09 22:21:27| [item_page]| [4028]| [web]|[2013-08-09 22:21...|
|2013-08-10 22:50:19|[promo_email_click]| [372]| [web]|[2013-08-10 22:50...|
|2013-08-14 14:05:55| [shopping_cart]| [547]| [web]|[2013-08-14 14:05...|
|2013-08-19 22:28:54| [shopping_cart]| [3022]| [iOS]|[2013-08-19 22:28...|
|2013-09-07 22:27:01| [user_wishlist]| [2150]| [web]|[2013-09-07 22:27...|
|2013-09-08 22:51:07| [home]| [4239]| [mobile web]|[2013-09-08 22:51...|
|2013-09-09 06:58:02| [shopping_cart]| [4542]| [web]|[2013-09-09 06:58...|
|2013-09-12 03:14:44| [user_wishlist]| [5228]| [web]|[2013-09-12 03:14...|
|2013-09-18 15:02:32| [home]| [1666]| [iOS]|[2013-09-18 15:02...|
|2013-10-05 02:51:16| [shopping_cart]| [1150]| [iOS]|[2013-10-05 02:51...|
|2013-10-23 07:17:21| [home]| [3808]| [iOS]|[2013-10-23 07:17...|
|2013-11-03 21:19:05| [shopping_cart]| [4101]| [mobile web]|[2013-11-03 21:19...|
|2013-11-05 11:59:22| [home]| [7319]| [android]|[2013-11-05 11:59...|
|2013-11-09 04:26:16| [shopping_cart]| [6963]| [mobile web]|[2013-11-09 04:26...|
|2013-11-14 14:46:47| [item_page]| [6593]| [android]|[2013-11-14 14:46...|
+-------------------+-------------------+------------+-------------+--------------------+
>>>
As previously noted, the use of F
in this example is dependent on having successfully completed the first example.
How to PySpark filter with custom function
>>> def custom_filter(x):
... return F.month(F.to_date(x)) > 5
...
>>> new_sdf.select(F.col("event_time"),
... F.filter(F.col("event_time_list"), custom_filter))\
... .show(5)
+-------------------+-----------------------------------------------------------------------------------------------------------+
| event_time|filter(event_time_list, lambdafunction((month(to_date(namedlambdavariable())) > 5), namedlambdavariable()))|
+-------------------+-----------------------------------------------------------------------------------------------------------+
|2013-06-25 16:56:27| [2013-06-25 16:56...|
|2013-07-23 12:26:01| [2013-07-23 12:26...|
|2013-07-28 02:00:14| [2013-07-28 02:00...|
|2013-07-30 20:29:21| [2013-07-30 20:29...|
|2013-08-01 08:21:58| [2013-08-01 08:21...|
+-------------------+-----------------------------------------------------------------------------------------------------------+
only showing top 5 rows
>>>
PySpark filter with SQL Example
Filtering in SQL is implemented through the WHERE clause. This is pretty basic easy to understand.
>>> df.createOrReplaceTempView("clicks")
>>> spark.sql("select * from clicks where platform = 'web'").show(5)
+--------------------+-------------------+-------+--------+-------+-------------+
| event_id| event_time|user_id|platform|item_id| referrer|
+--------------------+-------------------+-------+--------+-------+-------------+
|0000157970c74089b...|2018-06-26 08:21:38| 256177| web| 1648| home|
|000095cc12ed40f2a...|2018-07-09 08:42:12| 287491| web| 390|google_search|
|0000ac00b5b741a89...|2014-07-25 17:40:56| 31861| web| 2260| home|
|0000b5cd8dde42559...|2018-06-27 03:37:30| 271734| web| 3747|shopping_cart|
|00015a1e375441328...|2018-04-06 21:14:08| 283465| web| 2889| home|
+--------------------+-------------------+-------+--------+-------+-------------+
only showing top 5 rows
PySpark filtering array based columns In SQL
>>> spark.sql("WITH CTE_ANDROID \
... AS \
... ( SELECT event_time, FILTER (platform_list, i -> i LIKE '%android%') AS android FROM data_array) \
... SELECT * FROM CTE_ANDROID WHERE size(android) > 0 \
... ").show()
+-------------------+---------+
| event_time| android|
+-------------------+---------+
|2013-07-28 02:00:14|[android]|
|2013-08-01 08:21:58|[android]|
|2013-11-05 11:59:22|[android]|
|2013-11-14 14:46:47|[android]|
|2013-12-10 08:30:40|[android]|
|2013-12-14 19:26:04|[android]|
|2014-01-10 23:24:34|[android]|
|2014-02-11 18:36:29|[android]|
|2014-02-14 16:52:22|[android]|
|2014-02-15 09:09:00|[android]|
|2014-03-05 02:01:59|[android]|
|2014-04-05 10:05:39|[android]|
|2014-04-12 23:02:54|[android]|
|2014-05-03 22:38:30|[android]|
|2014-05-10 23:02:28|[android]|
|2014-05-16 07:02:06|[android]|
|2014-05-23 00:52:41|[android]|
|2014-06-20 08:37:58|[android]|
|2014-07-20 05:52:28|[android]|
|2014-08-01 12:51:49|[android]|
+-------------------+---------+
Further Resources
- The data.csv file used in this example may be downloaded from https://github.com/supergloo/pyspark/tree/main/filter
- More examples of PySpark Transformations such as filter
- API documentation of the filter function