PySpark Reading CSV with SQL Examples

Spark SQL CSV Python

In this pyspark reading csv tutorial, we will use Spark SQL with a CSV input data source using the Python API.  We will continue to use the Uber CSV source file as used in the Getting Started with Spark and Python tutorial presented earlier.

Also, this Spark SQL CSV tutorial assumes you are familiar with using SQL against relational databases directly or from Python.  So, in other words, you have experience with SQL and would like to know how to use with Spark.

PySpark Overview

Spark SQL uses a type of Resilient Distributed Dataset called DataFrames.  DataFrames are composed of Row objects accompanied by a schema which describes the data types of each column. A DataFrame may be considered similar to a table in a traditional relational database. As you might expect, DataFrames may be created from a variety of input sources including CSV text files.

This intro to PySpark SQL post will use a CSV file from previous Spark Python tutorials found here:

https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

Methodology Depends on Spark Version

Our methodology will depend on the version of Spark being used.

If you are an old version of Spark ( < Spark 2.0) the spark-csv package available from Spark Packages was released to make your lives easier, but it’s not a requirement to show pyspark reading csv.  The spark-csv package is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames”  This library is compatible with Spark 1.3 and above.

However, if you are using Spark 2.0 and above, the spark-csv package has now been included in as described in the spark-csv github repo.

See also  Apache Spark and ipython notebook - The Easy Way

Table of Contents

PySpark SQL CSV Examples Setup

To make things easier, we are going to dive the PySpark SQL examples with CSV files into two versions: either version of Spark less than 2.0 and another version of Spark 2.0 or above.

PySpark SQL CSV Example Setup < Spark 2.0

1. Depending on your version of Scala, start the pyspark shell with a packages command line argument.

At time of this writing, scala 2.10 version:

$SPARK_HOME/bin/pyspark – packages com.databricks:spark-csv_2.10:1.3.0

At time of this writing, scala 2.11 version:

$SPARK_HOME/bin/pyspark – packages com.databricks:spark-csv_2.11:1.3.0

2. Using the available sqlContext from the shell load the CSV read, format, option and load functions

>>> df = sqlContext.read.format('com.databricks.spark.csv')
.options(header='true', inferschema='true')
.load('Uber-Jan-Feb-FOIL.csv')

In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the shell in step 1.  “header” set to true signifies the first row has column names.  “inferSchema” instructs Spark to attempt to infer the schema of the CSV and finally load function passes in the path and name of the CSV source file.  In this example, we can tell the Uber-Jan-Feb-FOIL.csv file is in the same directory as where pyspark was launched.

Note: you may be interested in comparing this example above to the Spark >= 2.0 version below. There is a csv function now which makes things more convenient.

See also  PySpark Join Examples with DataFrame join function

3. Register a temp table

>>> df.registerTempTable("uber")

Similar to how the spark-csv package requirement has changed over time, the registerTempTable function has also changed. It is no longer used in Spark 2.0 and above as shown in the next setup example.

PySpark SQL CSV Example Setup > Spark 2.0

1. Start PySpark

~/dev/tmp $pyspark
Python 3.9.12 (main, Mar 26 2022, 15:51:15)
[Clang 13.1.6 (clang-1316.0.21.2)] on darwin
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Python version 3.9.12 (main, Mar 26 2022 15:51:15)
Spark context Web UI available at http://192.168.1.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1669304534671).
SparkSession available as 'spark'.
>>>

This is just to show the Python and Spark version used in this setup.

2. Next, using the available SparkContext from the shell load the CSV read, format, options and csv functions

>>> df = spark.read.options(header='true', inferschema='true').csv('Uber-Jan-Feb-FOIL.csv')

In the options function code above,  the header option being set to true signifies the first row has column names.  Also, with inferSchema being set to true, this instructs Spark to attempt to infer the schema of the CSV. Last, we use the csv function to pass in the path and name of the CSV source file.  In this example, we can tell the Uber-Jan-Feb-FOIL.csv file is in the same directory as where pyspark was launched.

3. Register a temp table

>>> df.createOrReplaceTempView("uber")

The createOrReplaceTempView either creates or replaces a local, temporary view with the provided DataFrame. It used instead of registerTempTable in newer versions of Spark. In either case, temporary views is similar in concept to SQL tables where each table contains rows and columns.

See also  PySpark Filter by Example

PySpark CSV with SQL Examples

We’re now ready to query using SQL such as finding the distinct NYC Uber bases  in the CSV

>>> distinct_bases = sqlContext.sql("select distinct dispatching_base_number from uber")
>>> for b in distinct_bases.collect(): print b
Row(dispatching_base_number=u'B02598')
Row(dispatching_base_number=u'B02764')
Row(dispatching_base_number=u'B02765')
Row(dispatching_base_number=u'B02617')
Row(dispatching_base_number=u'B02682')
Row(dispatching_base_number=u'B02512')

5. It might be handy to know the schema

>>> df.printSchema()

root
 | – dispatching_base_number: string (nullable = true)
 | – date: string (nullable = true)
 | – active_vehicles: integer (nullable = true)
 | – trips: integer (nullable = true)

PySpark SQL with CSV More Advanced

Let’s try some more advanced SQL, such as determining which Uber bases is the busiest based on the number of trips

>>> sqlContext.sql("""select distinct(`dispatching_base_number`), 
                        sum(`trips`) as cnt from uber 
                          group by `dispatching_base_number` 
                        order by cnt desc""").show()

+-----------------------+-------+
|dispatching_base_number|    cnt|
+-----------------------+-------+
|                 B02764|1914449|
|                 B02617| 725025|
|                 B02682| 662509|
|                 B02598| 540791|
|                 B02765| 193670|
|                 B02512|  93786|
+-----------------------+-------+

Or the 5 busiest days based on the number of trips in the time range of the data:

>>> sqlContext.sql("""select distinct(date),sum(trips) as cnt 
                      from uber group by date
                      order by cnt desc limit 5""").show()

+---------+------+
|     date|   cnt|
+---------+------+
|2/20/2015|100915|
|2/14/2015|100345|
|2/21/2015| 98380|
|2/13/2015| 98024|
|1/31/2015| 92257|
+---------+------+

PySpark SQL Resource

An older version of ipython notebook file https://github.com/tmcgrath/spark-with-python-course/blob/master/Spark-SQL-CSV-with-Python.ipynb

Further Spark Python References

PySpark Tutorials

PySpark Quick Start Tutorial

Featured image credit https://flic.kr/p/4FbcGr

One thought on “PySpark Reading CSV with SQL Examples

  1. Are we able to read a csv file directly from HTTPS, instead of the need to download that csv file to pyspark directory?

Leave a Reply

Your email address will not be published.