PySpark SQL JSON Examples in Python

This short PySpark SQL tutorial shows analysis of World Cup player data using PySpark SQL with a JSON file input data source from Python perspective.

PySpark SQL with JSON Overview

We are going to load a JSON input source to Spark SQL’s SQLContext.  This Spark SQL JSON with Python tutorial has two parts.  The first part shows examples of JSON input sources with a specific structure.  The second part warns you of something you might not expect when using Spark SQL with a JSON data source.

Methodology

We are going to use two JSON inputs.  We’ll start with a simple, trivial Spark SQL with JSON example and then move to the analysis of historical World Cup player data.

There are assumptions you have worked with Spark and Python in the past.  See PySpark Quick Start if you are new.

PySpark SQL JSON with Example Tutorial Part 1

1. Start pyspark

$SPARK_HOME/bin/pyspark

2. Load a JSON file which comes with Apache Spark distributions by default.  We do this by using the jsonFile function from the provided sqlContext.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> people = sqlContext.read.json("examples/src/main/resources/people.json")
>>> people.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

3. Register the data as a temp table to ease our future SQL queries

>>> people.registerTempTable("people")

4. Now, we can run some SQL

>>> sqlContext.sql("select name from people").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+

Ok, this is a simple example, but the real world is rarely this simple.  So, in part 2, we’ll cover a more complex example.

See also  PySpark Join Examples with DataFrame join function

PySpark SQL JSON Example Tutorial Part 2

Take a closer look at the people.json file used in Part 1.  If you run it through http://jsonlint.com, it will not validate.  Please, let’s not debate this being a byproduct of JSON and you can’t technical validate any JSON.  Stay with me here.

If you read the Spark SQL documentation closely:

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

But, what happens if we have typical JSON? Let’s find out.

Download and save historical World Cup player data from here

Here’s a snippet of the content

[
{
"Competition": "World Cup",
"Year": 1930,
"Team": "Argentina",
"Number": "",
"Position": "GK",
"FullName": "Ãngel Bossio",
"Club": "Club Atlético Talleres de Remedios de Escalada",
"ClubCountry": "Argentina",
"DateOfBirth": "1905-5-5",
"IsCaptain": false
},
{
"Competition": "World Cup",
"Year": 1930,
"Team": "Argentina",
"Number": "",
"Position": "GK",
"FullName": "Juan Botasso",
"Club": "Quilmes Atlético Club",
"ClubCountry": "Argentina",
"DateOfBirth": "1908-10-23",
"IsCaptain": false
},
….
]

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

Please read this previous quote again.

But, what happens if we have typical JSON?  Let’s find out.

Download and save historical world cup player data from https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json

Here’s a snippet of the content

[
  {
    "Competition": "World Cup",
    "Year": 1930,
    "Team": "Argentina",
    "Number": "",
    "Position": "GK",
    "FullName": "Ãngel Bossio",
    "Club": "Club Atlético Talleres de Remedios de Escalada",
    "ClubCountry": "Argentina",
    "DateOfBirth": "1905-5-5",
    "IsCaptain": false
  },
  {
    "Competition": "World Cup",
    "Year": 1930,
    "Team": "Argentina",
    "Number": "",
    "Position": "GK",
    "FullName": "Juan Botasso",
    "Club": "Quilmes Atlético Club",
    "ClubCountry": "Argentina",
    "DateOfBirth": "1908-10-23",
    "IsCaptain": false
  },
....
]

You should have the all-world-cup-players.json file in your Spark home directory.

See also  PySpark Joins with SQL

Unlike Part 1, this JSON will not work with a sqlContext.

PySpark SQL JSON Part 2 Steps

1. Start pyspark

2. Load the JSON using the Spark Context wholeTextFiles method which produces a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.

>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x[1])

3. Then, we need to prepare this RDD so it can be parsed by sqlContext.  Let’s remove the whitespace

>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))

4. We’re now able to consume the RDD using jsonRDD of sqlContext

>>> wc_players = sqlContext.jsonRDD(js)

5. Let’s register the table and run a query

>>> wc_players.registerTempTable("players")
>>> sqlContext.sql("select distinct Team from players").show()
+--------------------+
|                Team|
+--------------------+
|              Mexico|
|            Portugal|
|            Colombia|
|          SouthKorea|
|         Netherlands|
|             Belgium|
|               Chile|
|              Brazil|
|BosniaandHerzegovina|
|          IvoryCoast|
|            Cameroon|
|             England|
|             Croatia|
|           Argentina|
|             Algeria|
|               Ghana|
|                Iran|
|             Nigeria|
|              Russia|
|              France|
+--------------------+

PySpark SQL Resources

Check out PySpark SQL tutorials.

Featured image credit https://flic.kr/p/q57bEv

You should have the all-world-cup-players.json file in your Spark home directory.

Unlike Part 1, this JSON will not work with a sqlContext.

PySpark SQL JSON Part 2 Steps

  1. Start PySpark
  1. Load the JSON using the Spark Context wholeTextFiles method which produces a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.
>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x[1])
  1. Then, we need to prepare this RDD so it can be parsed by sqlContext.  Let’s remove the whitespace
>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))
  1. We’re now able to consume the RDD using jsonRDD of sqlContext
>>> wc_players = sqlContext.jsonRDD(js)
  1. Let’s register the table and run a query
>>> wc_players.registerTempTable("players")
>>> sqlContext.sql("select distinct Team from players").show()
+--------------------+
| Team|
+--------------------+
| Mexico|
| Portugal|
| Colombia|
| SouthKorea|
| Netherlands|
| Belgium|
| Chile|
| Brazil|
|BosniaandHerzegovina|
| IvoryCoast|
| Cameroon|
| England|
| Croatia|
| Argentina|
| Algeria|
| Ghana|
| Iran|
| Nigeria|
| Russia|
| France|
+--------------------+

PySpark SQL Resources

Check out PySpark SQL tutorials.

See also  Deep dive into PySpark SQL Functions

Featured image credit https://flic.kr/p/q57bEv

3 thoughts on “PySpark SQL JSON Examples in Python”

  1. Thank you for your tutorial.

    I wonder if you explain why you mentioned “all-world-cup-players.json” while in your example you used a different json “2014-world-cup.json”

    I tested on the first json and cannot get the expected result, instead I got some error:
    +——————–+
    | _corrupt_record|
    +——————–+
    |[{“Competition”:”…|
    +——————–+

    and of course the sql query won’t produce result for me:

    >>> sqlContext.sql(“select Team from players”).show(1)
    Traceback (most recent call last):
    File “”, line 1, in
    File “/opt/spark/python/pyspark/sql/context.py”, line 580, in sql
    return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
    File “/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in __call__
    File “/opt/spark/python/pyspark/sql/utils.py”, line 51, in deco
    raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: u”cannot resolve ‘Team’ given input columns: [_corrupt_record];”

    I guess maybe there is something wrong or not compatible in the original json file and that’s why you extract the 2014 data and generated your new dataset for your tutorial, if so, can you post your 2014-world-cup.json here?

    Thanks again for your sharing.

    Reply
  2. # ===========================================================
    # You need to have one json object per row in your input file
    # ===========================================================

    # original file was written with pretty-print inside a list
    with open(“all-world-cup-players.json”) as jsonfile:
    json_soccer = json.load(jsonfile)

    # write a new file with one object per line
    with open(“all-world-cup-players-flat.json”, ‘a’) as outfile:
    for d in json_soccer:
    json.dump(d, outfile)
    outfile.write(‘\n’)

    df = sqlcontext.read.json(“all-world-cup-players-flat.json”)
    df.show()

    df.registerTempTable(“players”)
    sqlcontext.sql(“SELECT DISTINCT Team FROM players”).show()

    Reply

Leave a Comment