Spark SQL MySQL Python Example with JDBC

Spark SQL Python mySQL

Let’s cover how to use Spark SQL with Python and a mySQL database input data source.  Shall we?  Yes, yes we shall.

Consider this tutorial an introductory step when learning how to use Spark SQL with a relational database and Python.  If you are brand new, check out the Spark with Python Tutorial.

Overview

We’re going to load some NYC Uber data into a database for this Spark SQL with MySQL tutorial.  Then, we’re going to fire up pyspark with a command line argument to specify the JDBC driver needed to connect to the JDBC data source.  We’ll make sure we can authenticate and then start running some queries.

Spark SQL Python and mySQL Setup Requirements

1. MySQL database with at least one table containing data.

2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)

3. Uber NYC data file available here: https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

For reference of the setup used in this Spark SQL mySQL Python tutorial, see the Setup Reference section at the bottom of this post.

Spark SQL MySQL (JDBC) Python Quick Start Tutorial

1. Start the pyspark shell with –jars argument

$SPARK_HOME/bin/pyspark  –jars mysql-connector-java-5.1.38-bin.jar

This example assumes the mysql connector jdbc jar file is located in the same directory as where you are calling spark-shell.  If it is not, you can specify the path location such as:

$SPARK_HOME/bin/pyspark  –jars /home/example/jars/mysql-connector-java-5.1.38-bin.jar

2. Once the shell is running, let’s establish connection to mysql db and read the “trips” table:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/uber").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "trips").option("user", "root").option("password", "root").load()

Change the mysql url and user/password values in the above code appropriate to your environment.

3. Let’s confirm the dataframe by show the schema of the table

>>> dataframe_mysql.show()
+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02764|1/1/2015|           3427|29421|
|                 B02682|1/1/2015|            945| 7679|
|                 B02617|1/1/2015|           1228| 9537|
|                 B02598|1/1/2015|            870| 6903|
|                 B02598|1/2/2015|            785| 4768|
|                 B02617|1/2/2015|           1137| 7065|
|                 B02512|1/2/2015|            175|  875|
|                 B02682|1/2/2015|            890| 5506|
|                 B02765|1/2/2015|            196| 1001|
|                 B02764|1/2/2015|           3147|19974|
|                 B02765|1/3/2015|            201| 1526|
|                 B02617|1/3/2015|           1188|10664|
|                 B02598|1/3/2015|            818| 7432|
|                 B02682|1/3/2015|            915| 8010|
|                 B02512|1/3/2015|            173| 1088|
|                 B02764|1/3/2015|           3215|29729|
|                 B02512|1/4/2015|            147|  791|
|                 B02682|1/4/2015|            812| 5621|
+-----------------------+--------+---------------+-----+

3. Register the data as a temp table for future SQL queries

>>> dataframe_mysql.registerTempTable("trips")

4. We are now in position to run some SQL such as

>>> sqlContext.sql("select * from trips where dispatching_base_number like '%2512%'").show()
+-----------------------+---------+---------------+-----+
|dispatching_base_number|     date|active_vehicles|trips|
+-----------------------+---------+---------------+-----+
|                 B02512| 1/1/2015|            190| 1132|
|                 B02512| 1/2/2015|            175|  875|
|                 B02512| 1/3/2015|            173| 1088|
|                 B02512| 1/4/2015|            147|  791|
|                 B02512| 1/5/2015|            194|  984|
|                 B02512| 1/6/2015|            218| 1314|
|                 B02512| 1/7/2015|            217| 1446|
|                 B02512| 1/8/2015|            238| 1772|
|                 B02512| 1/9/2015|            224| 1560|
|                 B02512|1/10/2015|            206| 1646|
|                 B02512|1/11/2015|            162| 1104|
|                 B02512|1/12/2015|            217| 1399|
|                 B02512|1/13/2015|            234| 1652|
|                 B02512|1/14/2015|            233| 1582|
|                 B02512|1/15/2015|            237| 1636|
|                 B02512|1/16/2015|            234| 1481|
|                 B02512|1/17/2015|            201| 1281|
|                 B02512|1/18/2015|            177| 1521|
|                 B02512|1/19/2015|            168| 1025|
|                 B02512|1/20/2015|            221| 1310|
+-----------------------+---------+---------------+-----+

Conclusion Spark SQL MySQL (JDBC) with Python

This example was designed to get you up and running with Spark SQL, mySQL or any JDBC compliant database and Python.  Would you like to see other examples?  Leave ideas or questions in comments below.

Setup Reference

The Spark SQL with MySQL JDBC example assumes a mysql db named “uber” with table called “trips”.  The “trips” table was populated with the Uber NYC data used in Spark SQL Python CSV tutorial.

For an example of how I loaded the CSV into mySQL for Spark SQL tutorials, check this YouTube video and subscribe to our channel.

Spark with Python Additional Resources

Spark with Python tutorials

Spark SQL tutorials

Featured Image credit: https://flic.kr/p/53udJ

Spark SQL JSON Examples in Python

Spark SQL JSON with Python

This short Spark tutorial shows analysis of World Cup player data using Spark SQL with a JSON file input data source from Python perspective.

Spark SQL JSON with Python Overview

We are going to load a JSON input source to Spark SQL’s SQLContext.  This Spark SQL JSON with Python tutorial has two parts.  The first part shows examples of JSON input sources with a specific structure.  The second part warns you of something you might not expect when using Spark SQL with a JSON data source.

Methodology

We are going to use two JSON inputs.  We’ll start with a simple, trivial Spark SQL with JSON example and then move to the analysis of historical World Cup player data.

There are assumptions you have worked with Spark and Python in the past.  See Spark with Python Quick Start if you are new.

Spark SQL JSON with Python Example Tutorial Part 1

1. Start pyspark

$SPARK_HOME/bin/pyspark

2. Load a JSON file which comes with Apache Spark distributions by default.  We do this by using the jsonFile function from the provided sqlContext.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> people = sqlContext.read.json("examples/src/main/resources/people.json")
>>> people.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

3. Register the data as a temp table to ease our future SQL queries

>>> people.registerTempTable("people")

4. Now, we can run some SQL

>>> sqlContext.sql("select name from people").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Ok, this is a simple example, but the real world is rarely this simple.  So, in part 2, we’ll cover a more complex example.

Spark SQL JSON Example Tutorial Part 2

Take a closer look at the people.json file used in Part 1.  If you run it through http://jsonlint.com, it will not validate.  Please, let’s not debate this being a byproduct of JSON and you can’t technical validate any JSON.  Stay with me here.

If you read the Spark SQL documentation closely:

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

Please read this previous quote again.

But, what happens if we have typical JSON?  Let’s find out.

Download and save historical world cup player data from https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json

Here’s a snippet of the content

[
  {
    "Competition": "World Cup",
    "Year": 1930,
    "Team": "Argentina",
    "Number": "",
    "Position": "GK",
    "FullName": "Ãngel Bossio",
    "Club": "Club Atlético Talleres de Remedios de Escalada",
    "ClubCountry": "Argentina",
    "DateOfBirth": "1905-5-5",
    "IsCaptain": false
  },
  {
    "Competition": "World Cup",
    "Year": 1930,
    "Team": "Argentina",
    "Number": "",
    "Position": "GK",
    "FullName": "Juan Botasso",
    "Club": "Quilmes Atlético Club",
    "ClubCountry": "Argentina",
    "DateOfBirth": "1908-10-23",
    "IsCaptain": false
  },
....
]

You should have the all-world-cup-players.json file in your Spark home directory.

Unlike Part 1, this JSON will not work with a sqlContext.

Spark SQL JSON Python Part 2 Steps

1. Start pyspark

2. Load the JSON using the Spark Context wholeTextFiles method which produces a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.

>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x[1])

3. Then, we need to prepare this RDD so it can be parsed by sqlContext.  Let’s remove the whitespace

>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))

4. We’re now able to consume the RDD using jsonRDD of sqlContext

>>> wc_players = sqlContext.jsonRDD(js)

5. Let’s register the table and run a query

>>> wc_players.registerTempTable("players")
>>> sqlContext.sql("select distinct Team from players").show()
+--------------------+
|                Team|
+--------------------+
|              Mexico|
|            Portugal|
|            Colombia|
|          SouthKorea|
|         Netherlands|
|             Belgium|
|               Chile|
|              Brazil|
|BosniaandHerzegovina|
|          IvoryCoast|
|            Cameroon|
|             England|
|             Croatia|
|           Argentina|
|             Algeria|
|               Ghana|
|                Iran|
|             Nigeria|
|              Russia|
|              France|
+--------------------+

Spark SQL with Python Resources

Check out Spark with Python tutorials and for more Spark SQL with Python.

Featured image credit https://flic.kr/p/q57bEv

Spark SQL CSV Examples with Python

Spark SQL CSV Python

In this Spark tutorial, we will use Spark SQL with a CSV input data source using the Python API.  We will continue to use the Uber CSV source file as used in the Getting Started with Spark and Python tutorial presented earlier.

Also, this Spark SQL CSV tutorial assumes you are familiar with using SQL against relational databases directly or from Python.  So, in other words, you have experience with SQL and would like to know how to use with Spark.

Overview

Spark SQL uses a type of Resilient Distributed Dataset called DataFrames.  DataFrames are composed of Row objects accompanied by a schema which describes the data types of each column. A DataFrame may be considered similar to a table in a traditional relational database. As you might expect, DataFrames may be created from a variety of input sources including CSV text files.

This intro to Spark SQL post will use a CSV file from previous Spark Python tutorials found here:

https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

Methodology

We’re going to use the ipython notebook and the spark-csv package available from Spark Packages to make our lives easier.  The spark-csv package is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames”  This library is compatible with Spark 1.3 and above.

Spark SQL CSV with Python Example Tutorial Part 1

1. Depending on your version of Scala, start the pyspark shell with a packages command line argument.

At time of this writing, scala 2.10 version:

$SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

At time of this writing, scala 2.11 version:

$SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.11:1.3.0

2. Using the available sqlContext from the shell load the CSV read, format, option and load functions

>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Uber-Jan-Feb-FOIL.csv')

In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the shell in step 1.  “header” set to true signifies the first row has column names.  “inferSchema” instructs Spark to attempt to infer the schema of the CSV and finally load function passes in the path and name of the CSV source file.  In this example, we can tell the Uber-Jan-Feb-FOIL.csv file is in the same directory as where pyspark was launched.

3. Register a temp table

>>> df.registerTempTable("uber")

4. We’re now ready to query using SQL such as finding the distinct NYC Uber bases  in the CSV

>>> distinct_bases = sqlContext.sql("select distinct dispatching_base_number from uber")
>>> for b in distinct_bases.collect(): print b
Row(dispatching_base_number=u'B02598')
Row(dispatching_base_number=u'B02764')
Row(dispatching_base_number=u'B02765')
Row(dispatching_base_number=u'B02617')
Row(dispatching_base_number=u'B02682')
Row(dispatching_base_number=u'B02512')

5. It might be handy to know the schema

>>> df.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- active_vehicles: integer (nullable = true)
 |-- trips: integer (nullable = true)

Spark SQL CSV with Python Example Tutorial Part 2

But, let’s try some more advanced SQL, such as determining which Uber bases is the busiest based on the number of trips

>>> sqlContext.sql("""select distinct(`dispatching_base_number`), 
                                sum(`trips`) as cnt from uber group by `dispatching_base_number` 
                                order by cnt desc""").show()


+-----------------------+-------+
|dispatching_base_number|    cnt|
+-----------------------+-------+
|                 B02764|1914449|
|                 B02617| 725025|
|                 B02682| 662509|
|                 B02598| 540791|
|                 B02765| 193670|
|                 B02512|  93786|
+-----------------------+-------+

Or the 5 busiest days based on the number of trips in the time range of the data:

>>> sqlContext.sql("""select distinct(`date`), 
                                sum(`trips`) as cnt from uber group by `date` 
                                order by cnt desc limit 5""").show()


+---------+------+
|     date|   cnt|
+---------+------+
|2/20/2015|100915|
|2/14/2015|100345|
|2/21/2015| 98380|
|2/13/2015| 98024|
|1/31/2015| 92257|
+---------+------+

Python Spark SQL Resource

ipython notebook file https://github.com/tmcgrath/spark-with-python-course/blob/master/Spark-SQL-CSV-with-Python.ipynb

Further Spark Python References

Spark Python Tutorials

Spark Python Quick Start Tutorial

Featured image credit https://flic.kr/p/4FbcGr

Spark SQL MySQL Example with JDBC

Spark SQL mySQL JDBC

In this tutorial, we will cover using Spark SQL with a mySQL database.

Overview

Let’s show examples of using Spark SQL mySQL.  We’re going to use mySQL with Spark in this tutorial, but you can apply the concepts presented here to any relational database which has a JDBC driver.

By the way, If you are not familiar with Spark SQL, there are a few Spark SQL tutorials on this site.

Requirements

1. MySQL instance

2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)

3. Previously used baby_names.csv file as source data.

4. Apache Spark

Quick Setup

The Spark SQL with MySQL JDBC example assumes a mysql db named “sparksql” with table called “baby_names”.  The “baby_names” table has been populated with the baby_names.csv data used in previous Spark tutorials.

Here’s a screencast on YouTube of how I set up my environment:

mysql setup for Spark SQL with MySQL (JDBC) examples

The SQL to create the baby_names table:

DROP TABLE IF EXISTS `baby_names`;

CREATE TABLE `baby_names` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `year` int(11) DEFAULT NULL,
  `first_name` varchar(100) DEFAULT NULL,
  `county` varchar(100) DEFAULT NULL,
  `sex` varchar(5) DEFAULT NULL,
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

If you have any questions about the environment setup, leave comments on this post.

Methodology

We need to pass in the mySQL JDBC driver jar when we start up the Spark Shell.  (In a Spark application, any third party libs such as a JDBC driver would be included in package.)

From Spark shell we’re going to establish a connection to the mySQL db and then run some queries via Spark SQL.  This is a getting started with Spark mySQL example.  To build and deploy and Spark application with mySQL JDBC driver you may wish to check out the Spark cluster deploy with extra jars tutorial.

Spark SQL with MySQL (JDBC) Example Tutorial

1. Start the spark shell with –jars argument

$SPARK_HOME/bin/sparkshell  –jars mysql-connector-java-5.1.26.jar

This example assumes the mySQL connector JDBC jar file is located in the same directory as where you are calling spark-shell.  If it is not, you can specify the path location such as:

$SPARK_HOME/bin/sparkshell  –jars /home/example/jars/mysql-connector-java-5.1.26.jar

2. Once the Spark shell is running, let’s establish a connection to mySQL db and read the baby_names table:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
...
SQL context available as sqlContext.

scala> val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "baby_names").option("user", "root").option("password", "root").load()

Change the mySQL url and user/password values in the above code appropriate to your environment.

3. Let’s confirm the dataframe by show the schema of the table

scala> dataframe_mysql.show

3. Register the data as a temp table for future SQL queries

scala> dataframe_mysql.registerTempTable("names")

4. We are now in a position to run some SQL such as

scala> dataframe_mysql.sqlContext.sql("select * from names").collect.foreach(println)

Conclusion Spark SQL with MySQL (JDBC)

This example was designed to get you up and running with Spark SQL and mySQL or any JDBC compliant database quickly.  What other examples would you like to see with Spark SQL and JDBC?  Please leave ideas or questions in comments below.

For more Spark tutorials, check out Spark SQL with Scala

And to keep up with changes in Spark SQL, especially DataFrame vs DataSet, check the Apache Spark SQL documentation from time-to-time.

Featured Image credit: https://flic.kr/p/f8KB7L

Spark SQL JSON Examples

Spark SQL JSON

This tutorial covers using Spark SQL with a JSON file input data source in Scala.  If you are interested in using Python instead, check out Spark SQL JSON in Python tutorial page.

Spark SQL JSON Overview

We will show examples of JSON as input source to Spark SQL’s SQLContext.  This Spark SQL tutorial with JSON has two parts.  Part 1 focus is the “happy path” when using JSON with Spark SQL.  Part 2 covers a “gotcha” or something you might not expect when using Spark SQL JSON data source.

By the way, If you are not familiar with Spark SQL, a couple of references include a summary of Spark SQL chapter post and the first Spark SQL CSV tutorial.

Methodology

We build upon the previous baby_names.csv file as well as a simple file to get us started which I’ve called customers.json.  Here is a gist of customers.json.

Spark SQL JSON Example Tutorial Part 1

1. Start the spark shell

$SPARK_HOME/bin/sparkshell

2. Load the JSON using the jsonFile function from the provided sqlContext.  The following assumes you have customers.json in the same directory as from where the spark-shell script was called.

RBH12103:spark-1.4.1-bin-hadoop2.4 tmcgrath$ bin/spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val customers = sqlContext.jsonFile("customers.json")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
customers: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string,street:string,zip:string>, first_name: string, last_name: string]

3. Register the data as a temp table to ease our future SQL queries

scala> customers.registerTempTable("customers")

4. We are now in a position to run some Spark SQL

scala> val firstCityState = sqlContext.sql("SELECT first_name, address.city, address.state FROM customers")
firstCityState: org.apache.spark.sql.DataFrame = [first_name: string, city: string, state: string]

scala> firstCityState.collect.foreach(println)
[James,New Orleans,LA]
[Josephine,Brighton,MI]
[Art,Bridgeport,NJ]

Ok, we started with a simple example, but the real world is rarely this simple.  So, in part 2, we’ll cover a more complex example.

Spark SQL JSON Example Tutorial Part 2

If you run the customers.json from part 1 through http://jsonlint.com, it will not validate.  You might be surprised to know that creating invalid JSON for Part 1 was intentional.  Why?  We needed JSON source which works well with Spark SQL out of the box.

If you read the Spark SQL documentation closely:

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

But, what happens if we have valid JSON?

In this part of the Spark SQL JSON tutorial, we’ll cover how to use valid JSON as an input source for Spark SQL.

As input, we’re going to convert the baby_names.csv file to baby_names.json.  There are many CSV to JSON conversion tools available… just search for “CSV to JSON converter”.

I converted and reduced the baby_names.csv to the following:

[{
	"Year": "2013",
	"First Name": "DAVID",
	"County": "KINGS",
	"Sex": "M",
	"Count": "272"
}, {
	"Year": "2013",
	"First Name": "JAYDEN",
	"County": "KINGS",
	"Sex": "M",
	"Count": "268"
}, {
	"Year": "2013",
	"First Name": "JAYDEN",
	"County": "QUEENS",
	"Sex": "M",
	"Count": "219"
}, {
	"Year": "2013",
	"First Name": "MOSHE",
	"County": "KINGS",
	"Sex": "M",
	"Count": "219"
}, {
	"Year": "2013",
	"First Name": "ETHAN",
	"County": "QUEENS",
	"Sex": "M",
	"Count": "216"
}]

I saved this as a file called baby_names.json

Steps

1. Start the spark-shell from the same directory containing the baby_names.json file

2. Load the JSON using the Spark Context wholeTextFiles method which produces a PairRDD.  Use map to create the new RDD using the value portion of the pair.

scala> val jsonRDD = sc.wholeTextFiles("baby_names.json").map(x => x._2)
jsonRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at map at <console>:21

3. Read in this RDD as JSON and confirm the schema

scala> val namesJson = sqlContext.read.json(jsonRDD)
namesJson: org.apache.spark.sql.DataFrame = [Count: string, County: string, First Name: string, Sex: string, Year: string]

scala> namesJson.printSchema
root
 |-- Count: string (nullable = true)
 |-- County: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Year: string (nullable = true)

scala>

Spark SQL Further Reference

See Spark SQL and Spark SQL with Scala and Spark SQL with Python tutorials.

Featured image credit https://flic.kr/p/9pJKgA

Spark SQL CSV Examples in Scala

Spark SQL CSV Example

In this Spark SQL tutorial, we will use Spark SQL with a CSV input data source.  We will continue to use the baby names CSV source file as used in the previous What is Spark tutorial.  This tutorial presumes the reader is familiar with using SQL with relational databases and would like to know how to use Spark SQL in Spark.

Overview

Earlier versions of Spark SQL required a certain kind of Resilient Distributed Data set called SchemaRDD.   Starting with Spark 1.3, Schema RDD was renamed to DataFrame.

DataFrames are composed of Row objects accompanied with a schema which describes the data types of each column. A DataFrame may be considered similar to a table in a traditional relational database. A DataFrame may be created from a variety of input sources including CSV text files.

This intro to Spark SQL post will use a CSV file from a previous Spark tutorial.

Download the CSV version of baby names file here:

https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv?accessType=DOWNLOAD

For this and other Spark tutorials, the file has been named baby_names.csv

Methodology

We’re going to use the spark shell and the spark-csv package available from Spark Packages to make our lives easier.  It is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames”  This library is compatible with Spark 1.3 and above.

Spark SQL CSV Example Tutorial Part 1

1. Depending on your version of Scala, start the spark shell with a packages command line argument.

At time of this writing, scala 2.10 version:

SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.3.0

where SPARK_HOME is the root directory of your Spark directory; i.e. ~/Development/spark-1.4.1-bin-hadoop2.4 or c:/dev/spark-1.4.1-bin-hadoop2.4

At time of this writing, scala 2.11 version:

SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0

2. Using the available sqlContext from the shell load the CSV read, format, option and load functions

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val baby_names = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("baby_names.csv")
baby_names: org.apache.spark.sql.DataFrame = [Year: int, First Name: string, County: string, Sex: string, Count: int]


In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the shell in step 1.  “header” set to true signifies the first row has column names.  “inferSchema” instructs Spark to attempt to infer the schema of the CSV and finally load function passes in the path and name of the CSV source file.  In this example, we can tell the baby_names.csv file is in the same directory as where the spark-shell script was launched.

3. Register a temp table

scala> baby_names.registerTempTable("names")

4. We’re now ready to query using SQL such as finding the distinct years in the CSV

scala> val distinctYears = sqlContext.sql("select distinct Year from names")
distinctYears: org.apache.spark.sql.DataFrame = [Year: int]

scala> distinctYears.collect.foreach(println)
[2007]                                                                          
[2008]
[2009]
[2010]
[2011]
[2012]
[2013]

5. It might be handy to know the schema

scala> baby_names.printSchema
root
|-- Year: integer (nullable = true)
|-- First Name: string (nullable = true)
|-- County: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Count: integer (nullable = true)

Spark SQL CSV Example Tutorial Part 2

But, let’s try some more advanced SQL, such as determining the names which appear most often in the data

scala> val popular_names = sqlContext.sql("select distinct(`First Name`), count(County) as cnt from names group by `First Name` order by cnt desc LIMIT 10")
popular_names: org.apache.spark.sql.DataFrame = [First Name: string, cnt: bigint]

scala> popular_names.collect.foreach(println)
[JACOB,284]
[EMMA,284]
[LOGAN,270]
[OLIVIA,268]
[ISABELLA,259]
[SOPHIA,249]
[NOAH,247]
[MASON,245]
[ETHAN,239]
[AVA,234]

But as we learned from Spark transformation and action tutorials, this doesn’t necessarily imply the most popular names.  We need to utilize the Count column value:

scala> val popular_names = sqlContext.sql("select distinct(`First Name`), sum(Count) as cnt from names group by `First Name` order by cnt desc LIMIT 10")
popular_names: org.apache.spark.sql.DataFrame = [First Name: string, cnt: bigint]

scala> popular_names.collect.foreach(println)
[MICHAEL,10391]
[ISABELLA,9106]
[MATTHEW,9002]
[JAYDEN,8948]
[JACOB,8770]
[JOSEPH,8715]
[SOPHIA,8689]
[DANIEL,8412]
[ANTHONY,8399]
[RYAN,8187]
Spark SQL Further Reference

Check out Spark SQL with Scala tutorials for more Spark SQL with Scala including Spark SQL with JSON and Spark SQL with JDBC.  And for tutorials in Scala, see Spark Tutorials in Scala page.

Featured image credit https://flic.kr/p/3CrmX