
Let’s cover how to use Spark SQL with Python and a mySQL database input data source. Shall we? Yes, yes we shall.
Consider this tutorial an introductory step when learning how to use Spark SQL with a relational database and Python. If you are brand new, check out the Spark with Python Tutorial.
PySpark MySQL Overview
We’re going to load some NYC Uber data into a database for this Spark SQL with MySQL tutorial. Then, we’re going to fire up pyspark with a command line argument to specify the JDBC driver needed to connect to the JDBC data source. We’ll make sure we can authenticate and then start running some queries.
PySpark SQL and mySQL Setup Requirements
1. MySQL database with at least one table containing data.
2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)
3. Uber NYC data file available here: https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv
For reference of the setup used in this Spark SQL mySQL Python tutorial, see the Setup Reference section at the bottom of this post.
PySpark SQL MySQL (JDBC) Quick Start Tutorial
1. Start the pyspark shell with –jars argument
$SPARK_HOME/bin/pyspark –jars mysql-connector-java-5.1.38-bin.jar
This example assumes the mysql connector jdbc jar file is located in the same directory as where you are calling spark-shell. If it is not, you can specify the path location such as:
$SPARK_HOME/bin/pyspark –jars /home/example/jars/mysql-connector-java-5.1.38-bin.jar
2. Once the shell is running, let’s establish connection to mysql db and read the “trips” table:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.4.1
/_/
Using Python version 2.7.11 (default, Dec 6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/uber").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "trips").option("user", "root").option("password", "root").load()
Change the mysql url and user/password values in the above code appropriate to your environment.
3. Let’s confirm the dataframe by show the schema of the table
>>> dataframe_mysql.show()
+-----------------------+--------+---------------+-----+
|dispatching_base_number| date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
| B02512|1/1/2015| 190| 1132|
| B02765|1/1/2015| 225| 1765|
| B02764|1/1/2015| 3427|29421|
| B02682|1/1/2015| 945| 7679|
| B02617|1/1/2015| 1228| 9537|
| B02598|1/1/2015| 870| 6903|
| B02598|1/2/2015| 785| 4768|
| B02617|1/2/2015| 1137| 7065|
| B02512|1/2/2015| 175| 875|
| B02682|1/2/2015| 890| 5506|
| B02765|1/2/2015| 196| 1001|
| B02764|1/2/2015| 3147|19974|
| B02765|1/3/2015| 201| 1526|
| B02617|1/3/2015| 1188|10664|
| B02598|1/3/2015| 818| 7432|
| B02682|1/3/2015| 915| 8010|
| B02512|1/3/2015| 173| 1088|
| B02764|1/3/2015| 3215|29729|
| B02512|1/4/2015| 147| 791|
| B02682|1/4/2015| 812| 5621|
+-----------------------+--------+---------------+-----+
3. Register the data as a temp table for future SQL queries
>>> dataframe_mysql.registerTempTable("trips")
4. We are now in position to run some SQL such as
>>> sqlContext.sql("select * from trips where dispatching_base_number like '%2512%'").show()
+-----------------------+---------+---------------+-----+
|dispatching_base_number| date|active_vehicles|trips|
+-----------------------+---------+---------------+-----+
| B02512| 1/1/2015| 190| 1132|
| B02512| 1/2/2015| 175| 875|
| B02512| 1/3/2015| 173| 1088|
| B02512| 1/4/2015| 147| 791|
| B02512| 1/5/2015| 194| 984|
| B02512| 1/6/2015| 218| 1314|
| B02512| 1/7/2015| 217| 1446|
| B02512| 1/8/2015| 238| 1772|
| B02512| 1/9/2015| 224| 1560|
| B02512|1/10/2015| 206| 1646|
| B02512|1/11/2015| 162| 1104|
| B02512|1/12/2015| 217| 1399|
| B02512|1/13/2015| 234| 1652|
| B02512|1/14/2015| 233| 1582|
| B02512|1/15/2015| 237| 1636|
| B02512|1/16/2015| 234| 1481|
| B02512|1/17/2015| 201| 1281|
| B02512|1/18/2015| 177| 1521|
| B02512|1/19/2015| 168| 1025|
| B02512|1/20/2015| 221| 1310|
+-----------------------+---------+---------------+-----+
Conclusion PySpark SQL MySQL (JDBC)
This example was designed to get you up and running with Spark SQL, mySQL or any JDBC compliant database and Python. Would you like to see other examples? Leave ideas or questions in comments below.
Setup Reference
This PySpark SQL with MySQL JDBC example assumes a mysql db named “uber” with table called “trips”. The “trips” table was populated with the Uber NYC data used in Spark SQL Python CSV tutorial.
For an example of how I loaded the CSV into mySQL for Spark SQL tutorials, check this YouTube video and subscribe to our channel.
PySpark Additional Resources
PySpark SQL tutorials
Featured Image credit: https://flic.kr/p/53udJ
thank you for your sharing!
would you tell me how to set the number of partitions, thanks a lot!
Hi Sinsa,
One way is to pass the required partition options when connecting. So, update this line
dataframe_mysql = sqlContext.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/uber")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "trips").option("user", "root")
.option("password", "root").load()
to set the all partition options: partitionColumn, lowerBound, upperBound, and numPartitions. So, for example:
Thanks for the tutorial!
I am trying to load a local mysql database into pyspark to do some analysis works, but when I try to run the shell command “$SPARK_HOME/bin/pyspark –jars mysql-connector-java-5.1.39-bin.jar”, “pyspark does not support any application options.” keep showing up as the error msg.
And also how can I load the database in python and use –submit command to run it?
Thanks a lot!
accidentally unsubscripted…
i tried all this steps and it not work
More information please. Are you cutting and pasting from this article? If so, there could be possible encoding issues for the hyphen vs dash characters. Example: “–jars” If you are cutting and pasting please try updating “–” by typing instead of pasting.
The tutorial is cool. But how to create a mysql database pool?
In this method , i think we need to establish one connection per table.
Is there a way to get establish a connection first and get the tables later using the connection
Thanks!
Hi, Can you please help me how to make a SSL connection connect to RDS using sqlContext.read.jdbc. I tried to use ssl_ca=”path to pem file” but getting access denied.
same issue i.e
./pyspark -jars mysql-connector-java-5.1.42-bin.jar
Exception in thread “main” java.lang.IllegalArgumentException: pyspark does not support any application options.
Thank for your sharing your information !
but we load data from mysql , we find out that spark executor memory leak, we are using spark streaming to read data every minute and these data join which are read by mysql. when spark app
run 24 hours, some executor memory leak and was killed. if we mark join code (did not read data from mysql) executor was not killed in 24 hour.
would you please give me some comments . Thanks
hi, thanks for the well written post. i am trying to do the same from python ide and can’t find a way to include the driver, and keep getting “no suitable driver” error. when i do it from the console it works. i have the proper driver…