PySpark MySQL [Hands-on Example with JDBC]


In order to use PySpark with MySQL, we must first establish a connection between the two systems. This can be done using a JDBC (Java Database Connectivity) driver, which allows PySpark to interact with MySQL and transfer data between the two systems.

Once this connection is established, PySpark can extract data from MySQL, perform transformations and analysis, and then load the results back into the database for example.

In this PySpark mySQL tutorial, let’s cover how to use PySpark SQL with Python using mySQL database input data source. 

This tutorial assumes familiarity with PySpark, so be sure to check out introductory PySpark Tutorials if any of the following is confusing.

Table of Contents

PySpark MySQL Overview

Before diving into the hands-on example, let’s explore from a high-level and then move into a specific example you can try.

PySpark MySQL connection

Connecting PySpark to MySQL Overview

To connect PySpark to MySQL, we must download the necessary JDBC driver for MySQL. We will do it in the hands-on example below, but for now, let’s just assume it’s been downloaded and available. From PySpark, we establish a connection to the MySQL database by specifying the driver class name, the database URL, and the login credentials.

The following code example shows how to connect PySpark to MySQL using the PySpark SQL module:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark MySQL Example") \
    .config("spark.driver.extraClassPath", "/path/to/mysql-connector-java.jar") \
    .getOrCreate()

jdbc_url = "jdbc:mysql://localhost:3306/mydatabase"
connection_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "com.mysql.cj.jdbc.Driver"
}

df = spark.read.jdbc(url=jdbc_url, table="mytable", properties=connection_properties)

In this example, a SparkSession is created with the appName and config parameters specifying the name of the application and the path to the MySQL JDBC driver. The jdbc_url variable contains the URL of the MySQL database, and the connection_properties dictionary contains the login credentials and the driver class name.

Finally, the read.jdbc method is used to read data from the mytable table in the MySQL database and create a PySpark DataFrame.

Executing SQL Queries

Once the connection is established, SQL queries can be executed using PySpark. Here is example code showing how to execute a SQL query using the DataFrame previously created:

df.createOrReplaceTempView("myview")

result = spark.sql("SELECT * FROM myview WHERE age > 30")

In this example, a temporary view is created from the df DataFrame using the createOrReplaceTempView method. Then the SQL is executed using the spark.sql method, which returns a new DataFrame containing the results of the query.

Transforming Data

Data can be manipulated and transformed using PySpark. Here is example code showing how to group the data by a particular column and calculate the average age:

from pyspark.sql.functions import avg

result.groupBy("gender").agg(avg("age").alias("avg_age")).show()

In this example, the groupBy method is used to group the data by the “gender” column. The agg method is used to calculate the average age for each group, and the alias method is used to rename the resulting column to “avg_age”. The show method is used to display the results.

For more examples of pyspark.sql.functions see:

PySpark MySQL Hands-On Example

Ok, let’s try an hands-on example using the pyspark shell.

We’re going to load some NYC Uber data into a database for this Spark SQL with MySQL tutorial.  Then, we’re going to fire up pyspark with a command line argument to specify the JDBC driver needed to connect to the JDBC data source.  

We’ll make sure we can authenticate and then start running some queries.

PySpark mySQL Hands-On Setup Requirements

1. MySQL database with at least one table containing data.

2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)

3. Uber NYC data file available here: https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

Setup Reference

This PySpark SQL with MySQL JDBC example assumes a mysql db named “uber” with table called “trips”.  The “trips” table was populated with the Uber NYC data used in Spark SQL Python CSV tutorial.

For an example of how I loaded the CSV into mySQL for Spark SQL tutorials, check this YouTube video (note: there is no sound)

PySpark MySQL (JDBC) Read Tutorial

1. Start the pyspark shell with –jars argument

$SPARK_HOME/bin/pyspark  –jars mysql-connector-java-5.1.38-bin.jar

Hopefully obvious, but you can see this example assumes the mysql connector jdbc jar file is located in the same directory as where you are calling spark-shell.  If it is not, you can specify the path location such as:

$SPARK_HOME/bin/pyspark  –jars /home/example/jars/mysql-connector-java-5.1.38-bin.jar

2. Once the shell is running, let’s establish connection to mysql db and read the “trips” table:

Spark 1 version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/uber").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "trips").option("user", "root").option("password", "root").load()

Change the mysql url and user/password values in the above code appropriate to your environment.

This change a bit in Spark 2 and above.

Spark => 2 version

$ bin/pyspark
Python 3.9.6 (default, Oct 18 2022, 12:41:40)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Python version 3.9.6 (default, Oct 18 2022 12:41:40)
Spark context Web UI available at http://192.168.1.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1689369610262).
SparkSession available as 'spark'.
>>> jdbc_url = "jdbc:mysql://localhost:3306/mydatabase"
>>> connection_properties = {
...     "user": "myuser",
...     "password": "mypassword",
...     "driver": "com.mysql.cj.jdbc.Driver"
... }
>>>
>>> dataframe_mysql = spark.read.jdbc(url=jdbc_url, table="mytable", properties=connection_properties)

For more info Spark 1 vs Spark 2, see SparkSession and SparkContext.

3. Let’s confirm the DataFrame by show the schema of the table

>>> dataframe_mysql.show()
+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02764|1/1/2015|           3427|29421|
|                 B02682|1/1/2015|            945| 7679|
|                 B02617|1/1/2015|           1228| 9537|
|                 B02598|1/1/2015|            870| 6903|
|                 B02598|1/2/2015|            785| 4768|
|                 B02617|1/2/2015|           1137| 7065|
|                 B02512|1/2/2015|            175|  875|
|                 B02682|1/2/2015|            890| 5506|
|                 B02765|1/2/2015|            196| 1001|
|                 B02764|1/2/2015|           3147|19974|
|                 B02765|1/3/2015|            201| 1526|
|                 B02617|1/3/2015|           1188|10664|
|                 B02598|1/3/2015|            818| 7432|
|                 B02682|1/3/2015|            915| 8010|
|                 B02512|1/3/2015|            173| 1088|
|                 B02764|1/3/2015|           3215|29729|
|                 B02512|1/4/2015|            147|  791|
|                 B02682|1/4/2015|            812| 5621|
+-----------------------+--------+---------------+-----+

3. Register the data as a temp table for future SQL queries

Spark 1

>>> dataframe_mysql.registerTempTable("trips")

Spark => 2 version

>>> dataframe_mysql.createOrReplaceTempView("trips")

4. We are now in position to run some SQL such as

>>> sqlContext.sql("select * from trips where dispatching_base_number like '%2512%'").show()
+-----------------------+---------+---------------+-----+
|dispatching_base_number|     date|active_vehicles|trips|
+-----------------------+---------+---------------+-----+
|                 B02512| 1/1/2015|            190| 1132|
|                 B02512| 1/2/2015|            175|  875|
|                 B02512| 1/3/2015|            173| 1088|
|                 B02512| 1/4/2015|            147|  791|
|                 B02512| 1/5/2015|            194|  984|
|                 B02512| 1/6/2015|            218| 1314|
|                 B02512| 1/7/2015|            217| 1446|
|                 B02512| 1/8/2015|            238| 1772|
|                 B02512| 1/9/2015|            224| 1560|
|                 B02512|1/10/2015|            206| 1646|
|                 B02512|1/11/2015|            162| 1104|
|                 B02512|1/12/2015|            217| 1399|
|                 B02512|1/13/2015|            234| 1652|
|                 B02512|1/14/2015|            233| 1582|
|                 B02512|1/15/2015|            237| 1636|
|                 B02512|1/16/2015|            234| 1481|
|                 B02512|1/17/2015|            201| 1281|
|                 B02512|1/18/2015|            177| 1521|
|                 B02512|1/19/2015|            168| 1025|
|                 B02512|1/20/2015|            221| 1310|
+-----------------------+---------+---------------+-----+

Spark => 2

>>> spark.sql("select * from trips where dispatching_base_number like '%2512%'").show()
+-----------------------+---------+---------------+-----+
|dispatching_base_number|     date|active_vehicles|trips|
+-----------------------+---------+---------------+-----+
|                 B02512| 1/1/2015|            190| 1132|
|                 B02512| 1/2/2015|            175|  875|
|                 B02512| 1/3/2015|            173| 1088|
|                 B02512| 1/4/2015|            147|  791|
|                 B02512| 1/5/2015|            194|  984|
|                 B02512| 1/6/2015|            218| 1314|
|                 B02512| 1/7/2015|            217| 1446|
|                 B02512| 1/8/2015|            238| 1772|
|                 B02512| 1/9/2015|            224| 1560|
|                 B02512|1/10/2015|            206| 1646|
|                 B02512|1/11/2015|            162| 1104|
|                 B02512|1/12/2015|            217| 1399|
|                 B02512|1/13/2015|            234| 1652|
|                 B02512|1/14/2015|            233| 1582|
|                 B02512|1/15/2015|            237| 1636|
|                 B02512|1/16/2015|            234| 1481|
|                 B02512|1/17/2015|            201| 1281|
|                 B02512|1/18/2015|            177| 1521|
|                 B02512|1/19/2015|            168| 1025|
|                 B02512|1/20/2015|            221| 1310|
+-----------------------+---------+---------------+-----+

Writing Data to MySQL Using PySpark

Overview

Once data is loaded into a PySpark DataFrame, it can be transformed using various PySpark functions to clean, filter, or aggregate the data as required. It is important to ensure that the data is correctly formatted and validated before writing it to the MySQL database.

PySpark Write MySQL Example

Executing Write Operations

To write data to MySQL using PySpark, the PySpark SQL library can be used similar to the previously shown and described read examples. There is a method called “write” that can be used to write data to various output formats including MySQL.

To write data to MySQL, the following steps can be followed:

1. Define the MySQL connection details including the hostname, port, database name, username, and password.

2. Create a PySpark DataFrame that contains the data to be written to the MySQL database.

3. Use the PySpark SQL “write” method to write the data to the MySQL database. The method requires the following parameters:

  • The format of the output data (in this case, “jdbc”).

  • The JDBC URL for the MySQL database.

  • The table name in the MySQL database where the data will be written.

  • The MySQL connection properties including the username and password.
An example of the PySpark SQL “write” method is shown below:

df.write.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", tableName) \
    .option("user", username) \
    .option("password", password) \
    .mode("overwrite") \
    .save()

In this example, the PySpark DataFrame “df” is written to the MySQL database using the JDBC URL “jdbc:mysql://localhost:3306/mydatabase”, the table name “mytable”, and the MySQL connection properties including the username and password.

Once the data has been written to the MySQL database, it can be queried and analyzed using various SQL tools and techniques.

Optimizing PySpark MySQL Performance Quick Tips

Tuning PySpark

When working with PySpark and MySQL, there are several tuning options that can be adjusted to optimize performance. One important factor to consider is the amount of memory allocated to PySpark. By default, PySpark uses a small amount of memory, which can cause performance issues when working with large datasets. Increasing the amount of memory allocated to PySpark can help improve performance.

MySQL Optimization Tips

In addition to tuning PySpark, there are also several optimization tips that can be applied to MySQL to improve performance. One important tip is to optimize database queries. This can be done by creating indexes on frequently queried columns, minimizing the use of subqueries, and avoiding the use of “SELECT *”.

Another tip is to optimize MySQL’s configuration settings. This can be done by adjusting settings such as the buffer pool size, the query cache size, and the maximum allowed packet size. These settings can have a significant impact on MySQL’s performance, so it’s important to ensure that they are set appropriately for the workload.

Overall, optimizing PySpark MySQL performance requires a combination of tuning PySpark and optimizing MySQL. By adjusting these settings and following best practices, it’s possible to achieve significant performance improvements when working with large datasets.

Troubleshooting Common Issues

When working with PySpark and MySQL, there are a few common issues that users may encounter.

Here are some troubleshooting tips to help resolve these issues:

#1 Connection Issues

The most common issue users encounter is establishing a successful connection between PySpark and MySQL. This can occur due to a variety of reasons, such as incorrect login credentials, firewall restrictions, or network connectivity issues.

To troubleshoot this issue, users should verify that their login credentials are correct and that their firewall is not blocking the connection. Try to connect outside of PySpark and work backwards.

#2 Data Type Mismatch

Another common issue that users may encounter is data type mismatch between PySpark and MySQL. This can occur when the data type of a column in PySpark does not match the data type of the corresponding column in MySQL.

To troubleshoot this issue, users should check the data types of their columns in both PySpark and MySQL and ensure that they are compatible. Additionally, users may need to perform data type conversions to ensure that the data types match.

#3 Performance Issues

Users may also experience performance issues when working with PySpark and MySQL. This can occur due to a variety of reasons, such as inefficient queries, large datasets, or insufficient system resources.

To troubleshoot this issue, users should optimize their queries to ensure that they are efficient and minimize the amount of data that needs to be processed. Additionally, users may need to consider increasing the resources available to their system, such as increasing the amount of memory or using a more powerful CPU.

By following these troubleshooting tips, users can resolve common issues when working with PySpark and MySQL and ensure that their data processing tasks are performed efficiently and accurately.

Conclusion PySpark SQL MySQL (JDBC)

This example was designed to get you up and running with Spark SQL, mySQL or any JDBC compliant database and Python.

A benefit of using PySpark with MySQL is the flexibility it provides. Users can easily integrate with other data sources, such as Hadoop Distributed File System (HDFS) or Amazon S3, to perform more complex analysis.

Overall, the integration of PySpark with MySQL offers a powerful and flexible solution for data processing and analysis. With its distributed computing capabilities and support for machine learning algorithms, PySpark provides users with the tools they need to gain insights from large datasets quickly and efficiently.

PySpark Additional Resources

Spark with Python tutorials

PySpark SQL tutorials

See also  PySpark UDFs Demystified: Learn with Step-by-Step Examples
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

12 thoughts on “PySpark MySQL [Hands-on Example with JDBC]”

    • Hi Sinsa,
      One way is to pass the required partition options when connecting. So, update this line

      dataframe_mysql = sqlContext.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost/uber")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", "trips").option("user", "root")
      .option("password", "root").load()

      to set the all partition options: partitionColumn, lowerBound, upperBound, and numPartitions. So, for example:

      dataframe_mysql = sqlContext.read.format(“jdbc”).option(“url”, “jdbc:mysql://localhost/sparksql”)
      .option(“partitionColumn”, “id”)
      .option(“lowerBound”, 1)
      .option(“upperBound”, maxId)
      .option(“numPartitions”, 75)
      .option(“driver”, “com.mysql.jdbc.Driver”)
      .option(“dbtable”, ““)
      .option(“user”, “root”)
      .option(“password”, “root”).load()

      Reply
  1. Thanks for the tutorial!
    I am trying to load a local mysql database into pyspark to do some analysis works, but when I try to run the shell command “$SPARK_HOME/bin/pyspark –jars mysql-connector-java-5.1.39-bin.jar”, “pyspark does not support any application options.” keep showing up as the error msg.
    And also how can I load the database in python and use –submit command to run it?
    Thanks a lot!

    Reply
    • More information please. Are you cutting and pasting from this article? If so, there could be possible encoding issues for the hyphen vs dash characters. Example: “–jars” If you are cutting and pasting please try updating “–” by typing instead of pasting.

      Reply
  2. In this method , i think we need to establish one connection per table.

    Is there a way to get establish a connection first and get the tables later using the connection

    Thanks!

    Reply
  3. Hi, Can you please help me how to make a SSL connection connect to RDS using sqlContext.read.jdbc. I tried to use ssl_ca=”path to pem file” but getting access denied.

    Reply
  4. same issue i.e
    ./pyspark -jars mysql-connector-java-5.1.42-bin.jar
    Exception in thread “main” java.lang.IllegalArgumentException: pyspark does not support any application options.

    Reply
  5. Thank for your sharing your information !
    but we load data from mysql , we find out that spark executor memory leak, we are using spark streaming to read data every minute and these data join which are read by mysql. when spark app
    run 24 hours, some executor memory leak and was killed. if we mark join code (did not read data from mysql) executor was not killed in 24 hour.
    would you please give me some comments . Thanks

    Reply
  6. hi, thanks for the well written post. i am trying to do the same from python ide and can’t find a way to include the driver, and keep getting “no suitable driver” error. when i do it from the console it works. i have the proper driver…

    Reply

Leave a Comment