One of PySpark’s many strengths is its ability to handle JSON data. JSON, or JavaScript Object Notation, is a popular data format used for web applications and APIs. With PySpark, users can easily load, manipulate, and analyze JSON data in a distributed computing environment.
This PySpark JSON tutorial will show numerous code examples of how to interact with JSON from PySpark including both reading and writing JSON.
To work with JSON data in PySpark, we can utilize the built-in functions provided by the PySpark SQL module. These functions allow users to parse JSON strings and extract specific fields from nested structures. Additionally, PySpark provides the ability to read and write JSON files directly from Hadoop Distributed File System (HDFS), Amazon S3, or other supported file systems.
Table of Contents
- PySpark JSON Overview
- PySpark JSON Code Examples
- PySpark Read JSON Advanced Example
- JSON Functions in PySpark
- Handling Nested JSON in PySpark
- Best Practices for PySpark JSON
- Troubleshooting Common PySpark JSON Issues
- PySpark SQL Resources
PySpark JSON Overview
One of the first things to understand about PySpark JSON is that it treats JSON data as a collection of nested dictionaries and lists. This means when you load JSON data into PySpark, it will automatically attempt to parse the data into a DataFrame with a schema reflecting the structure of the JSON data. When it works, it can be very useful, as it allows you to work with JSON data in a structured way, without having to manually parse the data yourself.
When you load JSON data into PySpark, it will automatically attempt to infer a schema based on the structure of the JSON data. However, you can also manually specify a schema if you need more control over the structure of your data. A schema is an important concept to understand when working with PySpark JSON. A schema is a way of describing the structure of data, and in PySpark, schemas are used to define the structure of DataFrames.
PySpark provides a wide range of functions for working with JSON data. For example, you can use the get_json_object
function to extract a specific value from a JSON string, or you can use the json_tuple
function to extract multiple values from a JSON string. PySpark also provides functions for converting JSON data to other formats, such as CSV or Parquet.
PySpark JSON Code Examples
Apache Spark distribution conveniently comes with JSON file examples. We are going to one of these examples to get started and then progress to more advanced examples.
There are assumptions you have worked with Spark and Python in the past. See PySpark Quick Start if you are new.
PySpark Read JSON Example Part 1
1. Start pyspark
from Spark home directory; e.g.
$ pwd
/Users/toddmcg/dev/spark-3.4.0-bin-hadoop3
$ bin/pyspark
Python 3.9.6 (default, Oct 18 2022, 12:41:40)
[Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/11 07:14:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.4.0
/_/
Using Python version 3.9.6 (default, Oct 18 2022 12:41:40)
Spark context Web UI available at http://192.168.1.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1689077658795).
SparkSession available as 'spark'.
>>>
In example above, I’ve extracted the Apache Spark distribution to the /Users/toddmcg/dev/spark-3.4.0-bin-hadoop3
and ran bin/pyspark
to get things started.
2. Now, let’s read a JSON file from PySpark which comes with Apache Spark distributions by default.
>>> df = spark.read.json("examples/src/main/resources/people.json")
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
The df
variable now contains a DataFrame with the JSON data. As we can see in this example, PySpark automatically infers the schema of the JSON data as it created a DataFrame with appropriate column names (age, name) and data types (long, string).
3. Register the data as a temp table to ease our future SQL queries
>>> df.createOrReplaceTempView("people")
This will allow us to run SQL to query the JSON data. By the way, the older way to register the temp table was to use the registerTempTable
function but that is being deprecated.
4. Now, we can run some SQL to query the JSON from PySpark
>>> spark.sql("select name from people").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
Ok, this is a simple example, but the real world is rarely this simple. So, in part 2, we’ll cover a more complex example.
PySpark Read JSON Example Part 2
Take a closer look at the people.json file used in Part 1. If you run it through http://jsonlint.com, it will not validate. Please, let’s not debate this being a byproduct of JSON and you can’t technical validate any JSON. Stay with me here.
If you read the Spark SQL documentation closely:
“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”
But, what happens if we have typical JSON? Let’s find out.
Download and save historical World Cup player data from here
For me, in these examples I downloaded the JSON file (world-cup-players.json) to /Users/toddmcg/dev/spark-3.4.0-bin-hadoop3
Here’s a snippet of the content
[
{
"Competition": "World Cup",
"Year": 1930,
"Team": "Argentina",
"Number": "",
"Position": "GK",
"FullName": "Ãngel Bossio",
"Club": "Club Atlético Talleres de Remedios de Escalada",
"ClubCountry": "Argentina",
"DateOfBirth": "1905-5-5",
"IsCaptain": false
},
{
"Competition": "World Cup",
"Year": 1930,
"Team": "Argentina",
"Number": "",
"Position": "GK",
"FullName": "Juan Botasso",
"Club": "Quilmes Atlético Club",
"ClubCountry": "Argentina",
"DateOfBirth": "1908-10-23",
"IsCaptain": false
},
….
]
PySpark Read JSON Advanced Example
Let’s attempt to read this JSON file in a similar way as before
>>> df = spark.read.json("world-cup-players.json")
>>> df.printSchema()
root
|-- _corrupt_record: string (nullable = true)
That doesn’t look good. It didn’t work as before.
How to fix this depends on which version of Spark you are using. Let’s show both examples.
In older versions of Spark, we use to have to use the Spark Context wholeTextFiles method to produce a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.
>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x[1])
Then, we needed to prepare this RDD so it can be parsed by sqlContext and remove the whitespace
>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))
Then, we were able to consume the RDD using jsonRDD of sqlContext
>>> wc_players = sqlContext.jsonRDD(js)
And finally, register the table and run a query
>>> wc_players.registerTempTable("players")
>>> sqlContext.sql("select distinct Team from players").show()
+--------------------+
| Team|
+--------------------+
| Mexico|
| Portugal|
| Colombia|
| SouthKorea|
| Netherlands|
| Belgium|
| Chile|
| Brazil|
|BosniaandHerzegovina|
| IvoryCoast|
| Cameroon|
| England|
| Croatia|
| Argentina|
| Algeria|
| Ghana|
| Iran|
| Nigeria|
| Russia|
| France|
+--------------------+
But, all this is much more straightforward in newer versions of PySpark.
How to read multi-line JSON from PySpark
These days, PySpark provides the multiline
option to read records from multiple lines. Because, by default, PySpark considers every record in a JSON file as a fully qualified record in a single line which I mentioned before.
Using this option, it is easy compared to the previously shown older way.
>>> df = spark.read.option("multiline", "true").json("world-cup-players.json")
>>> df.printSchema()
root
|-- Club: string (nullable = true)
|-- ClubCountry: string (nullable = true)
|-- Competition: string (nullable = true)
|-- DateOfBirth: string (nullable = true)
|-- FullName: string (nullable = true)
|-- IsCaptain: boolean (nullable = true)
|-- Number: string (nullable = true)
|-- Position: string (nullable = true)
|-- Team: string (nullable = true)
|-- Year: long (nullable = true)
PySpark Writing JSON Files Examples
Similar to reading JSON files, PySpark provides ways to write DataFrame data to JSON files. The DataFrame.write.json()
function can be used to write DataFrame data to a JSON file.
# Writing DataFrame to JSON file
df.write.json("path/to/output/json/file")
PySpark will automatically partition the data into multiple files based on the number of partitions in the DataFrame.
It is also possible to customize the output format by passing additional options to the DataFrame.write.json()
function. For example, the compression
option can be used to specify the compression codec to be used for the output JSON files.
# Writing DataFrame to compressed JSON file
df.write.json("path/to/output/json/file", compression="gzip")
This will write the DataFrame data to a compressed JSON file using the gzip compression codec.
For more info on options, see https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.json.html
JSON Functions in PySpark
Custom Schema with from_json
Function
The from_json
function in PySpark is used to convert a JSON string to a struct or map column. This function takes two arguments: the first argument is the column that contains the JSON string, and the second argument is the schema of the resulting struct or map column.
Here’s an example of the from_json
function:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
json_schema = StructType([
StructField("name", StringType()),
StructField("age", StringType()),
StructField("gender", StringType())
])
df = spark.read.json("path/to/json/file")
df = df.withColumn("json_col", from_json(df["json_string"], json_schema))
to_json Function
The to_json
function in PySpark is used to convert a struct or map column to a JSON string. This function takes one argument: the column that contains the struct or map column.
Here’s an example of how to use the to_json
function:
from pyspark.sql.functions import to_json
df = df.withColumn("json_string", to_json(df["json_col"]))
json_tuple Function
The json_tuple
function in PySpark is used to extract a single value from a JSON string. This function takes two arguments: the first argument is the column that contains the JSON string, and the second argument is the name of the field to extract.
Here’s an example of how to use the json_tuple
function:
from pyspark.sql.functions import json_tuple
df = df.withColumn("name", json_tuple(df["json_string"], "name"))
Handling Nested JSON in PySpark
When working with complex data structures, it is common to encounter nested JSON objects. PySpark provides ability to handle nested structures using its built-in functions.
To access nested elements in a JSON object, PySpark provides the getItem()
function. This function takes the name of the nested element as an argument and returns its value. For example, to access the value of a nested element named “address” in a JSON object, the following code can be used:
from pyspark.sql.functions import col
df.select(col("user.address")).show()
In addition to getItem()
, PySpark also provides the explode()
function. This function is used to explode arrays and maps into separate rows. For example, to explode an array named “friends” in a JSON object, the following code can be used:
from pyspark.sql.functions import explode
df.select(explode(col("user.friends"))).show()
It is also possible to use the struct()
function to create a new nested JSON object. This function takes a list of column names as arguments and returns a new column that contains a nested JSON object. For example, to create a new nested JSON object that contains the “name” and “age” columns, the following code can be used:
from pyspark.sql.functions import struct
df.select(struct(col("name"), col("age")).alias("person")).show()
By using these built-in functions, PySpark makes it easy to work with nested JSON objects and extract the data needed for analysis.
Best Practices for PySpark JSON
When working with PySpark JSON, there are several best practices that one should follow to ensure efficient and effective processing of the data.
Firstly, it is recommended to use the spark.read.json()
method to read in JSON files. As shown, this method automatically infers the schema of the JSON file, which can save time and effort in schema inference. Additionally, with the multiline
option set to true, it can handle both single-line and multi-line JSON files.
Secondly, it can be important to properly handle null values in the JSON data. PySpark treats null values as None
objects, which can cause issues when performing operations on the data. To avoid this, one should use the dropna()
method to remove any null values before processing the data.
Lastly, it can be important to optimize PySpark operations by minimizing the amount of data being shuffled. This can be achieved by using operations such as filter()
and select()
to reduce the amount of data being processed. Additionally, one should avoid using expensive operations such as groupby()
and distinct()
unless absolutely necessary. This last recommendation is true for data sources beyond JSON.
Troubleshooting Common PySpark JSON Issues
When working with PySpark and JSON, there are some common issues that users may encounter. Here are some troubleshooting tips to help overcome these issues:
Issue: Invalid JSON Format
One common issue is that the JSON data may not be in a valid format. This can happen if there are missing or extra commas, quotes, or brackets in the JSON data. When this happens, PySpark may not be able to read the JSON data, resulting in errors.
To fix this issue, users can use a JSON validator tool like the previously mentioned http://jsonlint.com to check the JSON data for errors.
Issue: Data Type Mismatch
A third issue that users may encounter is that the data types in the JSON data may not match the expected data types in PySpark. This can happen if the JSON data contains string values that should be integers or floats, for example.
To fix this issue, users can use the cast
function in PySpark to convert the data types to the expected types. They can also use the schema
option when reading the JSON data to specify the expected data types.
By following these troubleshooting tips, users can overcome common issues when working with PySpark and JSON data.
PySpark SQL Resources
PySpark API docs for DataFrameReader.json https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html
And before you go… check out more PySpark SQL tutorials.
Thank you for your tutorial.
I wonder if you explain why you mentioned “all-world-cup-players.json” while in your example you used a different json “2014-world-cup.json”
I tested on the first json and cannot get the expected result, instead I got some error:
+——————–+
| _corrupt_record|
+——————–+
|[{“Competition”:”…|
+——————–+
and of course the sql query won’t produce result for me:
>>> sqlContext.sql(“select Team from players”).show(1)
Traceback (most recent call last):
File “”, line 1, in
File “/opt/spark/python/pyspark/sql/context.py”, line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File “/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in __call__
File “/opt/spark/python/pyspark/sql/utils.py”, line 51, in deco
raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u”cannot resolve ‘Team’ given input columns: [_corrupt_record];”
I guess maybe there is something wrong or not compatible in the original json file and that’s why you extract the 2014 data and generated your new dataset for your tutorial, if so, can you post your 2014-world-cup.json here?
Thanks again for your sharing.
# ===========================================================
# You need to have one json object per row in your input file
# ===========================================================
# original file was written with pretty-print inside a list
with open(“all-world-cup-players.json”) as jsonfile:
json_soccer = json.load(jsonfile)
# write a new file with one object per line
with open(“all-world-cup-players-flat.json”, ‘a’) as outfile:
for d in json_soccer:
json.dump(d, outfile)
outfile.write(‘\n’)
df = sqlcontext.read.json(“all-world-cup-players-flat.json”)
df.show()
df.registerTempTable(“players”)
sqlcontext.sql(“SELECT DISTINCT Team FROM players”).show()
val mdf = spark.read.option(“multiline”, “true”).json(“/Users/azmatsiddique/Desktop/all.json”)