In this Spark tutorial, we will use Spark SQL with a CSV input data source using the Python API. We will continue to use the Uber CSV source file as used in the Getting Started with Spark and Python tutorial presented earlier.
Also, this Spark SQL CSV tutorial assumes you are familiar with using SQL against relational databases directly or from Python. So, in other words, you have experience with SQL and would like to know how to use with Spark.
Spark SQL uses a type of Resilient Distributed Dataset called DataFrames. DataFrames are composed of Row objects accompanied with a schema which describes the data types of each column. A DataFrame may be considered similar to a table in a traditional relational database. As you might expect, DataFrames may be created from a variety of input sources including CSV text files.
This intro to Spark SQL post will uses a CSV file from previous Spark Python tutorials found here:
We’re going to use the ipython notebook and the spark-csv package available from Spark Packages to make our lives easier. The spark-csv package is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames” This library is compatible with Spark 1.3 and above.
Spark SQL CSV with Python Example Tutorial Part 1
1. Depending on your version of Scala, start the pyspark shell with a packages command line argument.
At time of this writing, scala 2.10 version:
$SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0
At time of this writing, scala 2.11 version:
$SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.11:1.3.0
2. Using the available sqlContext from the shell load the CSV read, format, option and load functions
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Uber-Jan-Feb-FOIL.csv')
In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the shell in step 1. “header” set to true signifies the first row has column names. “inferSchema” instructs Spark to attempt to infer the schema of the CSV and finally load function passes in the path and name of the CSV source file. In this example, we can tell the Uber-Jan-Feb-FOIL.csv file is in the same directory as where pyspark was launched.
3. Register a temp table
4. We’re now ready to query using SQL such as finding the distinct NYC Uber bases in the CSV
>>> distinct_bases = sqlContext.sql("select distinct dispatching_base_number from uber") >>> for b in distinct_bases.collect(): print b Row(dispatching_base_number=u'B02598') Row(dispatching_base_number=u'B02764') Row(dispatching_base_number=u'B02765') Row(dispatching_base_number=u'B02617') Row(dispatching_base_number=u'B02682') Row(dispatching_base_number=u'B02512')
5. It might be handy to know the schema
>>> df.printSchema() root |-- dispatching_base_number: string (nullable = true) |-- date: string (nullable = true) |-- active_vehicles: integer (nullable = true) |-- trips: integer (nullable = true)
Spark SQL CSV with Python Example Tutorial Part 2
But, let’s try some more advanced SQL, such as determining which Uber bases is the busiest based on number of trips
>>> sqlContext.sql("""select distinct(`dispatching_base_number`), sum(`trips`) as cnt from uber group by `dispatching_base_number` order by cnt desc""").show() +-----------------------+-------+ |dispatching_base_number| cnt| +-----------------------+-------+ | B02764|1914449| | B02617| 725025| | B02682| 662509| | B02598| 540791| | B02765| 193670| | B02512| 93786| +-----------------------+-------+
Or the 5 busiest days based on number of trips in the time range of the data:
>>> sqlContext.sql("""select distinct(`date`), sum(`trips`) as cnt from uber group by `date` order by cnt desc limit 5""").show() +---------+------+ | date| cnt| +---------+------+ |2/20/2015|100915| |2/14/2015|100345| |2/21/2015| 98380| |2/13/2015| 98024| |1/31/2015| 92257| +---------+------+
Featured image credit https://flic.kr/p/4FbcGr