This PySpark JSON tutorial will show numerous code examples of how to interact with JSON from PySpark including both reading and writing JSON.
To work with JSON data in PySpark, we can utilize the built-in functions provided by the PySpark SQL module. These functions allow users to parse JSON strings and extract specific fields from nested structures. Additionally, PySpark provides the ability to read and write JSON files directly from Hadoop Distributed File System (HDFS), Amazon S3, or other supported file systems.
Table of Contents
- PySpark JSON Overview
- PySpark JSON Code Examples
- PySpark Read JSON Advanced Example
- JSON Functions in PySpark
- Handling Nested JSON in PySpark
- Best Practices for PySpark JSON
- Troubleshooting Common PySpark JSON Issues
- PySpark SQL Resources
PySpark JSON Overview
One of the first things to understand about PySpark JSON is that it treats JSON data as a collection of nested dictionaries and lists. This means when you load JSON data into PySpark, it will automatically attempt to parse the data into a DataFrame with a schema reflecting the structure of the JSON data. When it works, it can be very useful, as it allows you to work with JSON data in a structured way, without having to manually parse the data yourself.
When you load JSON data into PySpark, it will automatically attempt to infer a schema based on the structure of the JSON data. However, you can also manually specify a schema if you need more control over the structure of your data. A schema is an important concept to understand when working with PySpark JSON. A schema is a way of describing the structure of data, and in PySpark, schemas are used to define the structure of DataFrames.
PySpark provides a wide range of functions for working with JSON data. For example, you can use the
get_json_object function to extract a specific value from a JSON string, or you can use the
json_tuple function to extract multiple values from a JSON string. PySpark also provides functions for converting JSON data to other formats, such as CSV or Parquet.
PySpark JSON Code Examples
Apache Spark distribution conveniently comes with JSON file examples. We are going to one of these examples to get started and then progress to more advanced examples.
There are assumptions you have worked with Spark and Python in the past. See PySpark Quick Start if you are new.
PySpark Read JSON Example Part 1
pyspark from Spark home directory; e.g.
$ pwd /Users/toddmcg/dev/spark-3.4.0-bin-hadoop3 $ bin/pyspark Python 3.9.6 (default, Oct 18 2022, 12:41:40) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/07/11 07:14:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.4.0 /_/ Using Python version 3.9.6 (default, Oct 18 2022 12:41:40) Spark context Web UI available at http://192.168.1.16:4040 Spark context available as 'sc' (master = local[*], app id = local-1689077658795). SparkSession available as 'spark'. >>>
In example above, I’ve extracted the Apache Spark distribution to the
/Users/toddmcg/dev/spark-3.4.0-bin-hadoop3 and ran
bin/pyspark to get things started.
2. Now, let’s read a JSON file from PySpark which comes with Apache Spark distributions by default.
>>> df = spark.read.json("examples/src/main/resources/people.json") >>> df.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true)
df variable now contains a DataFrame with the JSON data. As we can see in this example, PySpark automatically infers the schema of the JSON data as it created a DataFrame with appropriate column names (age, name) and data types (long, string).
3. Register the data as a temp table to ease our future SQL queries
This will allow us to run SQL to query the JSON data. By the way, the older way to register the temp table was to use the
registerTempTable function but that is being deprecated.
4. Now, we can run some SQL to query the JSON from PySpark
>>> spark.sql("select name from people").show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+
Ok, this is a simple example, but the real world is rarely this simple. So, in part 2, we’ll cover a more complex example.
PySpark Read JSON Example Part 2
Take a closer look at the people.json file used in Part 1. If you run it through http://jsonlint.com, it will not validate. Please, let’s not debate this being a byproduct of JSON and you can’t technical validate any JSON. Stay with me here.
If you read the Spark SQL documentation closely:
“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”
But, what happens if we have typical JSON? Let’s find out.
Download and save historical World Cup player data from here
For me, in these examples I downloaded the JSON file (world-cup-players.json) to /Users/toddmcg/dev/spark-3.4.0-bin-hadoop3
Here’s a snippet of the content
"Competition": "World Cup",
"FullName": "Ãngel Bossio",
"Club": "Club AtlÃ©tico Talleres de Remedios de Escalada",
"Competition": "World Cup",
"FullName": "Juan Botasso",
"Club": "Quilmes AtlÃ©tico Club",
PySpark Read JSON Advanced Example
Let’s attempt to read this JSON file in a similar way as before
>>> df = spark.read.json("world-cup-players.json") >>> df.printSchema() root |-- _corrupt_record: string (nullable = true)
That doesn’t look good. It didn’t work as before.
How to fix this depends on which version of Spark you are using. Let’s show both examples.
In older versions of Spark, we use to have to use the Spark Context wholeTextFiles method to produce a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.
>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x)
Then, we needed to prepare this RDD so it can be parsed by sqlContext and remove the whitespace
>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))
Then, we were able to consume the RDD using jsonRDD of sqlContext
>>> wc_players = sqlContext.jsonRDD(js)
And finally, register the table and run a query
>>> sqlContext.sql("select distinct Team from players").show()
But, all this is much more straightforward in newer versions of PySpark.
How to read multi-line JSON from PySpark
These days, PySpark provides the
multiline option to read records from multiple lines. Because, by default, PySpark considers every record in a JSON file as a fully qualified record in a single line which I mentioned before.
Using this option, it is easy compared to the previously shown older way.
>>> df = spark.read.option("multiline", "true").json("world-cup-players.json") >>> df.printSchema() root |-- Club: string (nullable = true) |-- ClubCountry: string (nullable = true) |-- Competition: string (nullable = true) |-- DateOfBirth: string (nullable = true) |-- FullName: string (nullable = true) |-- IsCaptain: boolean (nullable = true) |-- Number: string (nullable = true) |-- Position: string (nullable = true) |-- Team: string (nullable = true) |-- Year: long (nullable = true)
PySpark Writing JSON Files Examples
Similar to reading JSON files, PySpark provides ways to write DataFrame data to JSON files. The
DataFrame.write.json() function can be used to write DataFrame data to a JSON file.
# Writing DataFrame to JSON file df.write.json("path/to/output/json/file")
PySpark will automatically partition the data into multiple files based on the number of partitions in the DataFrame.
It is also possible to customize the output format by passing additional options to the
DataFrame.write.json() function. For example, the
compression option can be used to specify the compression codec to be used for the output JSON files.
# Writing DataFrame to compressed JSON file df.write.json("path/to/output/json/file", compression="gzip")
This will write the DataFrame data to a compressed JSON file using the gzip compression codec.
For more info on options, see https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.json.html
JSON Functions in PySpark
Custom Schema with
from_json function in PySpark is used to convert a JSON string to a struct or map column. This function takes two arguments: the first argument is the column that contains the JSON string, and the second argument is the schema of the resulting struct or map column.
Here’s an example of the
from pyspark.sql.functions import from_json from pyspark.sql.types import StructType, StructField, StringType json_schema = StructType([ StructField("name", StringType()), StructField("age", StringType()), StructField("gender", StringType()) ]) df = spark.read.json("path/to/json/file") df = df.withColumn("json_col", from_json(df["json_string"], json_schema))
to_json function in PySpark is used to convert a struct or map column to a JSON string. This function takes one argument: the column that contains the struct or map column.
Here’s an example of how to use the
from pyspark.sql.functions import to_json df = df.withColumn("json_string", to_json(df["json_col"]))
json_tuple function in PySpark is used to extract a single value from a JSON string. This function takes two arguments: the first argument is the column that contains the JSON string, and the second argument is the name of the field to extract.
Here’s an example of how to use the
from pyspark.sql.functions import json_tuple df = df.withColumn("name", json_tuple(df["json_string"], "name"))
Handling Nested JSON in PySpark
When working with complex data structures, it is common to encounter nested JSON objects. PySpark provides ability to handle nested structures using its built-in functions.
To access nested elements in a JSON object, PySpark provides the
getItem() function. This function takes the name of the nested element as an argument and returns its value. For example, to access the value of a nested element named “address” in a JSON object, the following code can be used:
from pyspark.sql.functions import col df.select(col("user.address")).show()
In addition to
getItem(), PySpark also provides the
explode() function. This function is used to explode arrays and maps into separate rows. For example, to explode an array named “friends” in a JSON object, the following code can be used:
from pyspark.sql.functions import explode df.select(explode(col("user.friends"))).show()
It is also possible to use the
struct() function to create a new nested JSON object. This function takes a list of column names as arguments and returns a new column that contains a nested JSON object. For example, to create a new nested JSON object that contains the “name” and “age” columns, the following code can be used:
from pyspark.sql.functions import struct df.select(struct(col("name"), col("age")).alias("person")).show()
By using these built-in functions, PySpark makes it easy to work with nested JSON objects and extract the data needed for analysis.
Best Practices for PySpark JSON
When working with PySpark JSON, there are several best practices that one should follow to ensure efficient and effective processing of the data.
Firstly, it is recommended to use the
spark.read.json() method to read in JSON files. As shown, this method automatically infers the schema of the JSON file, which can save time and effort in schema inference. Additionally, with the
multiline option set to true, it can handle both single-line and multi-line JSON files.
Secondly, it can be important to properly handle null values in the JSON data. PySpark treats null values as
None objects, which can cause issues when performing operations on the data. To avoid this, one should use the
dropna() method to remove any null values before processing the data.
Lastly, it can be important to optimize PySpark operations by minimizing the amount of data being shuffled. This can be achieved by using operations such as
select() to reduce the amount of data being processed. Additionally, one should avoid using expensive operations such as
distinct() unless absolutely necessary. This last recommendation is true for data sources beyond JSON.
Troubleshooting Common PySpark JSON Issues
When working with PySpark and JSON, there are some common issues that users may encounter. Here are some troubleshooting tips to help overcome these issues:
Issue: Invalid JSON Format
One common issue is that the JSON data may not be in a valid format. This can happen if there are missing or extra commas, quotes, or brackets in the JSON data. When this happens, PySpark may not be able to read the JSON data, resulting in errors.
To fix this issue, users can use a JSON validator tool like the previously mentioned http://jsonlint.com to check the JSON data for errors.
Issue: Data Type Mismatch
A third issue that users may encounter is that the data types in the JSON data may not match the expected data types in PySpark. This can happen if the JSON data contains string values that should be integers or floats, for example.
To fix this issue, users can use the
cast function in PySpark to convert the data types to the expected types. They can also use the
schema option when reading the JSON data to specify the expected data types.
By following these troubleshooting tips, users can overcome common issues when working with PySpark and JSON data.
PySpark SQL Resources
PySpark API docs for DataFrameReader.json https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html
And before you go… check out more PySpark SQL tutorials.