PySpark JSON: A Comprehensive Guide to Working with JSON Data in PySpark


One of PySpark’s many strengths is its ability to handle JSON data. JSON, or JavaScript Object Notation, is a popular data format used for web applications and APIs. With PySpark, users can easily load, manipulate, and analyze JSON data in a distributed computing environment.

This PySpark JSON tutorial will show numerous code examples of how to interact with JSON from PySpark including both reading and writing JSON.

To work with JSON data in PySpark, we can utilize the built-in functions provided by the PySpark SQL module. These functions allow users to parse JSON strings and extract specific fields from nested structures. Additionally, PySpark provides the ability to read and write JSON files directly from Hadoop Distributed File System (HDFS), Amazon S3, or other supported file systems.

Table of Contents

PySpark JSON Overview

One of the first things to understand about PySpark JSON is that it treats JSON data as a collection of nested dictionaries and lists. This means when you load JSON data into PySpark, it will automatically attempt to parse the data into a DataFrame with a schema reflecting the structure of the JSON data. When it works, it can be very useful, as it allows you to work with JSON data in a structured way, without having to manually parse the data yourself.

When you load JSON data into PySpark, it will automatically attempt to infer a schema based on the structure of the JSON data. However, you can also manually specify a schema if you need more control over the structure of your data. A schema is an important concept to understand when working with PySpark JSON. A schema is a way of describing the structure of data, and in PySpark, schemas are used to define the structure of DataFrames.

PySpark provides a wide range of functions for working with JSON data. For example, you can use the get_json_object function to extract a specific value from a JSON string, or you can use the json_tuple function to extract multiple values from a JSON string. PySpark also provides functions for converting JSON data to other formats, such as CSV or Parquet.

PySpark JSON Code Examples

Apache Spark distribution conveniently comes with JSON file examples. We are going to one of these examples to get started and then progress to more advanced examples.

There are assumptions you have worked with Spark and Python in the past.  See PySpark Quick Start if you are new.

PySpark Read JSON

PySpark Read JSON Example Part 1

1. Start pyspark from Spark home directory; e.g.

$ pwd
/Users/toddmcg/dev/spark-3.4.0-bin-hadoop3
$ bin/pyspark
Python 3.9.6 (default, Oct 18 2022, 12:41:40)
[Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/11 07:14:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Python version 3.9.6 (default, Oct 18 2022 12:41:40)
Spark context Web UI available at http://192.168.1.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1689077658795).
SparkSession available as 'spark'.
>>>

In example above, I’ve extracted the Apache Spark distribution to the /Users/toddmcg/dev/spark-3.4.0-bin-hadoop3 and ran bin/pyspark to get things started.

2. Now, let’s read a JSON file from PySpark which comes with Apache Spark distributions by default.  

>>> df = spark.read.json("examples/src/main/resources/people.json")
>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

The df variable now contains a DataFrame with the JSON data. As we can see in this example, PySpark automatically infers the schema of the JSON data as it created a DataFrame with appropriate column names (age, name) and data types (long, string).

3. Register the data as a temp table to ease our future SQL queries

>>> df.createOrReplaceTempView("people")

This will allow us to run SQL to query the JSON data. By the way, the older way to register the temp table was to use the registerTempTable function but that is being deprecated.

4. Now, we can run some SQL to query the JSON from PySpark

>>> spark.sql("select name from people").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Ok, this is a simple example, but the real world is rarely this simple.  So, in part 2, we’ll cover a more complex example.

PySpark Read JSON Example Part 2

Take a closer look at the people.json file used in Part 1.  If you run it through http://jsonlint.com, it will not validate.  Please, let’s not debate this being a byproduct of JSON and you can’t technical validate any JSON.  Stay with me here.

If you read the Spark SQL documentation closely:

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

But, what happens if we have typical JSON? Let’s find out.

Download and save historical World Cup player data from here

For me, in these examples I downloaded the JSON file (world-cup-players.json) to /Users/toddmcg/dev/spark-3.4.0-bin-hadoop3

Here’s a snippet of the content

[
{
"Competition": "World Cup",
"Year": 1930,
"Team": "Argentina",
"Number": "",
"Position": "GK",
"FullName": "Ãngel Bossio",
"Club": "Club Atlético Talleres de Remedios de Escalada",
"ClubCountry": "Argentina",
"DateOfBirth": "1905-5-5",
"IsCaptain": false
},
{
"Competition": "World Cup",
"Year": 1930,
"Team": "Argentina",
"Number": "",
"Position": "GK",
"FullName": "Juan Botasso",
"Club": "Quilmes Atlético Club",
"ClubCountry": "Argentina",
"DateOfBirth": "1908-10-23",
"IsCaptain": false
},
….
]

PySpark Read JSON Advanced Example

Let’s attempt to read this JSON file in a similar way as before

>>> df = spark.read.json("world-cup-players.json")
>>> df.printSchema()
root
 |-- _corrupt_record: string (nullable = true)

That doesn’t look good. It didn’t work as before.

How to fix this depends on which version of Spark you are using. Let’s show both examples.

In older versions of Spark, we use to have to use the Spark Context wholeTextFiles method to produce a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.

>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x[1])

Then, we needed to prepare this RDD so it can be parsed by sqlContext and remove the whitespace

>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))

Then, we were able to consume the RDD using jsonRDD of sqlContext

>>> wc_players = sqlContext.jsonRDD(js)

And finally, register the table and run a query

>>> wc_players.registerTempTable("players")
>>> sqlContext.sql("select distinct Team from players").show()
+--------------------+
| Team|
+--------------------+
| Mexico|
| Portugal|
| Colombia|
| SouthKorea|
| Netherlands|
| Belgium|
| Chile|
| Brazil|
|BosniaandHerzegovina|
| IvoryCoast|
| Cameroon|
| England|
| Croatia|
| Argentina|
| Algeria|
| Ghana|
| Iran|
| Nigeria|
| Russia|
| France|
+--------------------+

But, all this is much more straightforward in newer versions of PySpark.

How to read multi-line JSON from PySpark

These days, PySpark provides the multiline option to read records from multiple lines. Because, by default, PySpark considers every record in a JSON file as a fully qualified record in a single line which I mentioned before.

Using this option, it is easy compared to the previously shown older way.

>>> df = spark.read.option("multiline", "true").json("world-cup-players.json")
>>> df.printSchema()
root
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)

PySpark Write JSON

PySpark Writing JSON Files Examples

Similar to reading JSON files, PySpark provides ways to write DataFrame data to JSON files. The DataFrame.write.json() function can be used to write DataFrame data to a JSON file.

# Writing DataFrame to JSON file
df.write.json("path/to/output/json/file")

PySpark will automatically partition the data into multiple files based on the number of partitions in the DataFrame.

It is also possible to customize the output format by passing additional options to the DataFrame.write.json() function. For example, the compression option can be used to specify the compression codec to be used for the output JSON files.

# Writing DataFrame to compressed JSON file
df.write.json("path/to/output/json/file", compression="gzip")

This will write the DataFrame data to a compressed JSON file using the gzip compression codec.

For more info on options, see https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameWriter.json.html

JSON Functions in PySpark

Custom Schema with from_json Function

The from_json function in PySpark is used to convert a JSON string to a struct or map column. This function takes two arguments: the first argument is the column that contains the JSON string, and the second argument is the schema of the resulting struct or map column.

Here’s an example of the from_json function:

from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

json_schema = StructType([
    StructField("name", StringType()),
    StructField("age", StringType()),
    StructField("gender", StringType())
])

df = spark.read.json("path/to/json/file")
df = df.withColumn("json_col", from_json(df["json_string"], json_schema))

to_json Function

The to_json function in PySpark is used to convert a struct or map column to a JSON string. This function takes one argument: the column that contains the struct or map column.

Here’s an example of how to use the to_json function:

from pyspark.sql.functions import to_json

df = df.withColumn("json_string", to_json(df["json_col"]))

json_tuple Function

The json_tuple function in PySpark is used to extract a single value from a JSON string. This function takes two arguments: the first argument is the column that contains the JSON string, and the second argument is the name of the field to extract.

Here’s an example of how to use the json_tuple function:

from pyspark.sql.functions import json_tuple

df = df.withColumn("name", json_tuple(df["json_string"], "name"))

Handling Nested JSON in PySpark

When working with complex data structures, it is common to encounter nested JSON objects. PySpark provides ability to handle nested structures using its built-in functions.

To access nested elements in a JSON object, PySpark provides the getItem() function. This function takes the name of the nested element as an argument and returns its value. For example, to access the value of a nested element named “address” in a JSON object, the following code can be used:

from pyspark.sql.functions import col

df.select(col("user.address")).show()

In addition to getItem(), PySpark also provides the explode() function. This function is used to explode arrays and maps into separate rows. For example, to explode an array named “friends” in a JSON object, the following code can be used:

from pyspark.sql.functions import explode

df.select(explode(col("user.friends"))).show()

It is also possible to use the struct() function to create a new nested JSON object. This function takes a list of column names as arguments and returns a new column that contains a nested JSON object. For example, to create a new nested JSON object that contains the “name” and “age” columns, the following code can be used:

from pyspark.sql.functions import struct

df.select(struct(col("name"), col("age")).alias("person")).show()

By using these built-in functions, PySpark makes it easy to work with nested JSON objects and extract the data needed for analysis.

Best Practices for PySpark JSON

When working with PySpark JSON, there are several best practices that one should follow to ensure efficient and effective processing of the data.

Firstly, it is recommended to use the spark.read.json() method to read in JSON files. As shown, this method automatically infers the schema of the JSON file, which can save time and effort in schema inference. Additionally, with the multiline option set to true, it can handle both single-line and multi-line JSON files.

Secondly, it can be important to properly handle null values in the JSON data. PySpark treats null values as None objects, which can cause issues when performing operations on the data. To avoid this, one should use the dropna() method to remove any null values before processing the data.

Lastly, it can be important to optimize PySpark operations by minimizing the amount of data being shuffled. This can be achieved by using operations such as filter() and select() to reduce the amount of data being processed. Additionally, one should avoid using expensive operations such as groupby() and distinct() unless absolutely necessary. This last recommendation is true for data sources beyond JSON.

Troubleshooting Common PySpark JSON Issues

When working with PySpark and JSON, there are some common issues that users may encounter. Here are some troubleshooting tips to help overcome these issues:

Issue: Invalid JSON Format

One common issue is that the JSON data may not be in a valid format. This can happen if there are missing or extra commas, quotes, or brackets in the JSON data. When this happens, PySpark may not be able to read the JSON data, resulting in errors.

To fix this issue, users can use a JSON validator tool like the previously mentioned http://jsonlint.com to check the JSON data for errors.

Issue: Data Type Mismatch

A third issue that users may encounter is that the data types in the JSON data may not match the expected data types in PySpark. This can happen if the JSON data contains string values that should be integers or floats, for example.

To fix this issue, users can use the cast function in PySpark to convert the data types to the expected types. They can also use the schema option when reading the JSON data to specify the expected data types.

By following these troubleshooting tips, users can overcome common issues when working with PySpark and JSON data.

PySpark SQL Resources

PySpark API docs for DataFrameReader.json https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html

And before you go… check out more PySpark SQL tutorials.

See also  PySpark MySQL [Hands-on Example with JDBC]
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

3 thoughts on “PySpark JSON: A Comprehensive Guide to Working with JSON Data in PySpark”

  1. Thank you for your tutorial.

    I wonder if you explain why you mentioned “all-world-cup-players.json” while in your example you used a different json “2014-world-cup.json”

    I tested on the first json and cannot get the expected result, instead I got some error:
    +——————–+
    | _corrupt_record|
    +——————–+
    |[{“Competition”:”…|
    +——————–+

    and of course the sql query won’t produce result for me:

    >>> sqlContext.sql(“select Team from players”).show(1)
    Traceback (most recent call last):
    File “”, line 1, in
    File “/opt/spark/python/pyspark/sql/context.py”, line 580, in sql
    return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
    File “/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in __call__
    File “/opt/spark/python/pyspark/sql/utils.py”, line 51, in deco
    raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: u”cannot resolve ‘Team’ given input columns: [_corrupt_record];”

    I guess maybe there is something wrong or not compatible in the original json file and that’s why you extract the 2014 data and generated your new dataset for your tutorial, if so, can you post your 2014-world-cup.json here?

    Thanks again for your sharing.

    Reply
  2. # ===========================================================
    # You need to have one json object per row in your input file
    # ===========================================================

    # original file was written with pretty-print inside a list
    with open(“all-world-cup-players.json”) as jsonfile:
    json_soccer = json.load(jsonfile)

    # write a new file with one object per line
    with open(“all-world-cup-players-flat.json”, ‘a’) as outfile:
    for d in json_soccer:
    json.dump(d, outfile)
    outfile.write(‘\n’)

    df = sqlcontext.read.json(“all-world-cup-players-flat.json”)
    df.show()

    df.registerTempTable(“players”)
    sqlcontext.sql(“SELECT DISTINCT Team FROM players”).show()

    Reply

Leave a Comment