Spark SQL MySQL Python Example with JDBC

Spark SQL Python mySQL

Let’s cover how to use Spark SQL with Python and a mySQL database input data source.  Consider this tutorial an introductory step when learning how to use Spark SQL with a relational database and Python.

Overview

We’re going to load some NYC Uber data into a database.  Then, we’re going to fire up pyspark with a command line argument to specify the JDBC driver needed to connect to the JDBC data source.  We’ll make sure we can authenticate and then start running some queries.

Setup Requirements

1. MySQL database with at least one table containing data.

2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)

3. Uber NYC data file available here: https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

For reference of the setup used in this Spark SQL mySQL Python tutorial, see the Setup Reference section at the bottom of this post.

Spark SQL MySQL (JDBC) Python Quick Start Tutorial

1. Start the pyspark shell with –jars argument

$SPARK_HOME/bin/pyspark  –jars mysql-connector-java-5.1.38-bin.jar

This example assumes the mysql connector jdbc jar file is located in the same directory as where you are calling spark-shell.  If it is not, you can specify the path location such as:

$SPARK_HOME/bin/pyspark  –jars /home/example/jars/mysql-connector-java-5.1.38-bin.jar

2. Once the shell is running, let’s establish connection to mysql db and read the “trips” table:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/uber").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "trips").option("user", "root").option("password", "root").load()

Change the mysql url and user/password values in the above code appropriate to your environment.

3. Let’s confirm the dataframe by show the schema of the table

>>> dataframe_mysql.show()
+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02764|1/1/2015|           3427|29421|
|                 B02682|1/1/2015|            945| 7679|
|                 B02617|1/1/2015|           1228| 9537|
|                 B02598|1/1/2015|            870| 6903|
|                 B02598|1/2/2015|            785| 4768|
|                 B02617|1/2/2015|           1137| 7065|
|                 B02512|1/2/2015|            175|  875|
|                 B02682|1/2/2015|            890| 5506|
|                 B02765|1/2/2015|            196| 1001|
|                 B02764|1/2/2015|           3147|19974|
|                 B02765|1/3/2015|            201| 1526|
|                 B02617|1/3/2015|           1188|10664|
|                 B02598|1/3/2015|            818| 7432|
|                 B02682|1/3/2015|            915| 8010|
|                 B02512|1/3/2015|            173| 1088|
|                 B02764|1/3/2015|           3215|29729|
|                 B02512|1/4/2015|            147|  791|
|                 B02682|1/4/2015|            812| 5621|
+-----------------------+--------+---------------+-----+

3. Register the data as a temp table for future SQL queries

>>> dataframe_mysql.registerTempTable("trips")

4. We are now in position to run some SQL such as

>>> sqlContext.sql("select * from trips where dispatching_base_number like '%2512%'").show()
+-----------------------+---------+---------------+-----+
|dispatching_base_number|     date|active_vehicles|trips|
+-----------------------+---------+---------------+-----+
|                 B02512| 1/1/2015|            190| 1132|
|                 B02512| 1/2/2015|            175|  875|
|                 B02512| 1/3/2015|            173| 1088|
|                 B02512| 1/4/2015|            147|  791|
|                 B02512| 1/5/2015|            194|  984|
|                 B02512| 1/6/2015|            218| 1314|
|                 B02512| 1/7/2015|            217| 1446|
|                 B02512| 1/8/2015|            238| 1772|
|                 B02512| 1/9/2015|            224| 1560|
|                 B02512|1/10/2015|            206| 1646|
|                 B02512|1/11/2015|            162| 1104|
|                 B02512|1/12/2015|            217| 1399|
|                 B02512|1/13/2015|            234| 1652|
|                 B02512|1/14/2015|            233| 1582|
|                 B02512|1/15/2015|            237| 1636|
|                 B02512|1/16/2015|            234| 1481|
|                 B02512|1/17/2015|            201| 1281|
|                 B02512|1/18/2015|            177| 1521|
|                 B02512|1/19/2015|            168| 1025|
|                 B02512|1/20/2015|            221| 1310|
+-----------------------+---------+---------------+-----+

Conclusion Spark SQL MySQL (JDBC) with Python

This example was designed to get you up and running with Spark SQL, mySQL or any JDBC compliant database and Python.  Would you like to see other examples?  Leave ideas or questions in comments below.

 

Setup Reference

The Spark SQL with MySQL JDBC example assumes a mysql db named “uber” with table called “trips”.  The “trips” table was populated with the Uber NYC data used in Spark SQL Python CSV tutorial.

For an example of how I loaded the CSV into mySQL for Spark SQL tutorials, check this YouTube video and subscribe to our channel.

 

 

Featured Image credit: https://flic.kr/p/53udJ

7 thoughts on “Spark SQL MySQL Python Example with JDBC

    1. Hi Sinsa,
      One way is to pass the required partition options when connecting. So, update this line

      dataframe_mysql = sqlContext.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost/uber")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", "trips").option("user", "root")
      .option("password", "root").load()

      to set the all partition options: partitionColumn, lowerBound, upperBound, and numPartitions. So, for example:

      dataframe_mysql = sqlContext.read.format(“jdbc”).option(“url”, “jdbc:mysql://localhost/sparksql”)
      .option(“partitionColumn”, “id”)
      .option(“lowerBound”, 1)
      .option(“upperBound”, maxId)
      .option(“numPartitions”, 75)
      .option(“driver”, “com.mysql.jdbc.Driver”)
      .option(“dbtable”, ““)
      .option(“user”, “root”)
      .option(“password”, “root”).load()

  1. Thanks for the tutorial!
    I am trying to load a local mysql database into pyspark to do some analysis works, but when I try to run the shell command “$SPARK_HOME/bin/pyspark –jars mysql-connector-java-5.1.39-bin.jar”, “pyspark does not support any application options.” keep showing up as the error msg.
    And also how can I load the database in python and use –submit command to run it?
    Thanks a lot!

    1. More information please. Are you cutting and pasting from this article? If so, there could be possible encoding issues for the hyphen vs dash characters. Example: “–jars” If you are cutting and pasting please try updating “–” by typing instead of pasting.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.