Spark SQL MySQL Python Example with JDBC

Spark SQL Python mySQL

Let’s cover how to use Spark SQL with Python and a mySQL database input data source.  Consider this tutorial an introductory step when learning how to use Spark SQL with a relational database and Python.

Overview

We’re going to load some NYC Uber data into a database.  Then, we’re going to fire up pyspark with a command line argument to specify the JDBC driver needed to connect to the JDBC data source.  We’ll make sure we can authenticate and then start running some queries.

Setup Requirements

1. MySQL database with at least one table containing data.

2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)

3. Uber NYC data file available here: https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

For reference of the setup used in this Spark SQL mySQL Python tutorial, see the Setup Reference section at the bottom of this post.

Spark SQL MySQL (JDBC) Python Quick Start Tutorial

1. Start the pyspark shell with –jars argument

$SPARK_HOME/bin/pyspark  –jars mysql-connector-java-5.1.38-bin.jar

This example assumes the mysql connector jdbc jar file is located in the same directory as where you are calling spark-shell.  If it is not, you can specify the path location such as:

$SPARK_HOME/bin/pyspark  –jars /home/example/jars/mysql-connector-java-5.1.38-bin.jar

2. Once the shell is running, let’s establish connection to mysql db and read the “trips” table:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/uber").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "trips").option("user", "root").option("password", "root").load()

Change the mysql url and user/password values in the above code appropriate to your environment.

3. Let’s confirm the dataframe by show the schema of the table

>>> dataframe_mysql.show()
+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02764|1/1/2015|           3427|29421|
|                 B02682|1/1/2015|            945| 7679|
|                 B02617|1/1/2015|           1228| 9537|
|                 B02598|1/1/2015|            870| 6903|
|                 B02598|1/2/2015|            785| 4768|
|                 B02617|1/2/2015|           1137| 7065|
|                 B02512|1/2/2015|            175|  875|
|                 B02682|1/2/2015|            890| 5506|
|                 B02765|1/2/2015|            196| 1001|
|                 B02764|1/2/2015|           3147|19974|
|                 B02765|1/3/2015|            201| 1526|
|                 B02617|1/3/2015|           1188|10664|
|                 B02598|1/3/2015|            818| 7432|
|                 B02682|1/3/2015|            915| 8010|
|                 B02512|1/3/2015|            173| 1088|
|                 B02764|1/3/2015|           3215|29729|
|                 B02512|1/4/2015|            147|  791|
|                 B02682|1/4/2015|            812| 5621|
+-----------------------+--------+---------------+-----+

3. Register the data as a temp table for future SQL queries

>>> dataframe_mysql.registerTempTable("trips")

4. We are now in position to run some SQL such as

>>> sqlContext.sql("select * from trips where dispatching_base_number like '%2512%'").show()
+-----------------------+---------+---------------+-----+
|dispatching_base_number|     date|active_vehicles|trips|
+-----------------------+---------+---------------+-----+
|                 B02512| 1/1/2015|            190| 1132|
|                 B02512| 1/2/2015|            175|  875|
|                 B02512| 1/3/2015|            173| 1088|
|                 B02512| 1/4/2015|            147|  791|
|                 B02512| 1/5/2015|            194|  984|
|                 B02512| 1/6/2015|            218| 1314|
|                 B02512| 1/7/2015|            217| 1446|
|                 B02512| 1/8/2015|            238| 1772|
|                 B02512| 1/9/2015|            224| 1560|
|                 B02512|1/10/2015|            206| 1646|
|                 B02512|1/11/2015|            162| 1104|
|                 B02512|1/12/2015|            217| 1399|
|                 B02512|1/13/2015|            234| 1652|
|                 B02512|1/14/2015|            233| 1582|
|                 B02512|1/15/2015|            237| 1636|
|                 B02512|1/16/2015|            234| 1481|
|                 B02512|1/17/2015|            201| 1281|
|                 B02512|1/18/2015|            177| 1521|
|                 B02512|1/19/2015|            168| 1025|
|                 B02512|1/20/2015|            221| 1310|
+-----------------------+---------+---------------+-----+

Conclusion Spark SQL MySQL (JDBC) with Python

This example was designed to get you up and running with Spark SQL, mySQL or any JDBC compliant database and Python.  Would you like to see other examples?  Leave ideas or questions in comments below.

 

Setup Reference

The Spark SQL with MySQL JDBC example assumes a mysql db named “uber” with table called “trips”.  The “trips” table was populated with the Uber NYC data used in Spark SQL Python CSV tutorial.

For an example of how I loaded the CSV into mySQL for Spark SQL tutorials, check this YouTube video and subscribe to our channel.

 

 

Featured Image credit: https://flic.kr/p/53udJ

Spark SQL JSON Examples in Python using World Cup Player Data

Spark SQL JSON with Python

This short tutorial shows analysis of World Cup player data using Spark SQL with a JSON file input data source from Python perspective.

Overview

We are going to load a JSON input source to Spark SQL’s SQLContext.  This Spark SQL JSON with Python tutorial has two parts.  The first part shows examples of JSON input sources with a specific structure.  The second part warns you of something you might not expect when using Spark SQL with JSON data source.

Methodology

We are going to use two JSON inputs.  We’ll start with a simple, trivial example and then move to analysis of historical World Cup player data.

Spark SQL JSON with Python Example Tutorial Part 1

1. Start pyspark

$SPARK_HOME/bin/pyspark

2. Load a JSON file which comes with Apache Spark distributions by default.  We do this by using the jsonFile function from the provided sqlContext.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Python version 2.7.11 (default, Dec  6 2015 18:57:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>> people = sqlContext.read.json("examples/src/main/resources/people.json")
>>> people.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

3. Register the data as a temp table to ease our future SQL queries

>>> people.registerTempTable("people")

4. Now, we can run some SQL

>>> sqlContext.sql("select name from people").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Ok, this is a simple example, but the real world is rarely this simple.  So, in part 2, we’ll cover a more complex example.

 

Spark SQL JSON Example Tutorial Part 2

Take a closer look at the people.json file used in Part 1.  If you run it through http://jsonlint.com, it will not validate.  Please, let’s not debate this being a byproduct of JSON and you can’t technical validate any JSON.  Stay with me here.

If you read the Spark SQL documentation closely:

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

Please read this previous quote again.

But, what happens if we have typical JSON?  Let’s find out.

Download and save historical world cup player data from https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json

Here’s a snippet of the content

[
  {
    "Competition": "World Cup",
    "Year": 1930,
    "Team": "Argentina",
    "Number": "",
    "Position": "GK",
    "FullName": "Ãngel Bossio",
    "Club": "Club Atlético Talleres de Remedios de Escalada",
    "ClubCountry": "Argentina",
    "DateOfBirth": "1905-5-5",
    "IsCaptain": false
  },
  {
    "Competition": "World Cup",
    "Year": 1930,
    "Team": "Argentina",
    "Number": "",
    "Position": "GK",
    "FullName": "Juan Botasso",
    "Club": "Quilmes Atlético Club",
    "ClubCountry": "Argentina",
    "DateOfBirth": "1908-10-23",
    "IsCaptain": false
  },
....
]

You should have the all-world-cup-players.json file in your Spark home directory.

Unlike Part 1, this JSON will not work with a sqlContext.

Steps

1. Start pyspark

2. Load the JSON using the Spark Context wholeTextFiles method which produces a tuple RDD whose 1st element is a filename and the 2nd element is the data with lines separated by whitespace. We use map to create the new RDD using the 2nd element of the tuple.

>>> jsonRDD = sc.wholeTextFiles("2014-world-cup.json").map(lambda x: x[1])

3. Then, we need to prepare this RDD so it can be parsed by sqlContext.  Let’s remove the whitespace

>>> import re
>>> js = jsonRDD.map(lambda x: re.sub(r"\s+", "", x, flags=re.UNICODE))

4. We’re now able to consume the RDD using jsonRDD of sqlContext

>>> wc_players = sqlContext.jsonRDD(js)

5. Let’s register the table and run a query

>>> wc_players.registerTempTable("players")
>>> sqlContext.sql("select distinct Team from players").show()
+--------------------+
|                Team|
+--------------------+
|              Mexico|
|            Portugal|
|            Colombia|
|          SouthKorea|
|         Netherlands|
|             Belgium|
|               Chile|
|              Brazil|
|BosniaandHerzegovina|
|          IvoryCoast|
|            Cameroon|
|             England|
|             Croatia|
|           Argentina|
|             Algeria|
|               Ghana|
|                Iran|
|             Nigeria|
|              Russia|
|              France|
+--------------------+

 

Featured image credit https://flic.kr/p/q57bEv

Spark SQL CSV Examples with Python

Spark SQL CSV Python

In this Spark tutorial, we will use Spark SQL with a CSV input data source using the Python API.  We will continue to use the Uber CSV source file as used in the Getting Started with Spark and Python tutorial presented earlier.

Also, this Spark SQL CSV tutorial assumes you are familiar with using SQL against relational databases directly or from Python.  So, in other words, you have experience with SQL and would like to know how to use with Spark.

Overview

Spark SQL uses a type of Resilient Distributed Dataset called DataFrames.  DataFrames are composed of Row objects accompanied with a schema which describes the data types of each column. A DataFrame may be considered similar to a table in a traditional relational database. As you might expect, DataFrames may be created from a variety of input sources including CSV text files.

This intro to Spark SQL post will uses a CSV file from previous Spark Python tutorials found here:

https://raw.githubusercontent.com/fivethirtyeight/uber-tlc-foil-response/master/Uber-Jan-Feb-FOIL.csv

Methodology

We’re going to use the ipython notebook and the spark-csv package available from Spark Packages to make our lives easier.  The spark-csv package is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames”  This library is compatible with Spark 1.3 and above.

Spark SQL CSV with Python Example Tutorial Part 1

1. Depending on your version of Scala, start the pyspark shell with a packages command line argument.

At time of this writing, scala 2.10 version:

$SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0

 

At time of this writing, scala 2.11 version:

$SPARK_HOME/bin/pyspark --packages com.databricks:spark-csv_2.11:1.3.0

2. Using the available sqlContext from the shell load the CSV read, format, option and load functions

>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Uber-Jan-Feb-FOIL.csv')

In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the shell in step 1.  “header” set to true signifies the first row has column names.  “inferSchema” instructs Spark to attempt to infer the schema of the CSV and finally load function passes in the path and name of the CSV source file.  In this example, we can tell the Uber-Jan-Feb-FOIL.csv file is in the same directory as where pyspark was launched.

3. Register a temp table

>>> df.registerTempTable("uber")

4. We’re now ready to query using SQL such as finding the distinct NYC Uber bases  in the CSV

>>> distinct_bases = sqlContext.sql("select distinct dispatching_base_number from uber")
>>> for b in distinct_bases.collect(): print b
Row(dispatching_base_number=u'B02598')
Row(dispatching_base_number=u'B02764')
Row(dispatching_base_number=u'B02765')
Row(dispatching_base_number=u'B02617')
Row(dispatching_base_number=u'B02682')
Row(dispatching_base_number=u'B02512')

5. It might be handy to know the schema

>>> df.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- active_vehicles: integer (nullable = true)
 |-- trips: integer (nullable = true)

 

Spark SQL CSV with Python Example Tutorial Part 2

But, let’s try some more advanced SQL, such as determining which Uber bases is the busiest based on number of trips

>>> sqlContext.sql("""select distinct(`dispatching_base_number`), 
                                sum(`trips`) as cnt from uber group by `dispatching_base_number` 
                                order by cnt desc""").show()


+-----------------------+-------+
|dispatching_base_number|    cnt|
+-----------------------+-------+
|                 B02764|1914449|
|                 B02617| 725025|
|                 B02682| 662509|
|                 B02598| 540791|
|                 B02765| 193670|
|                 B02512|  93786|
+-----------------------+-------+

Or the 5 busiest days based on number of trips in the time range of the data:

>>> sqlContext.sql("""select distinct(`date`), 
                                sum(`trips`) as cnt from uber group by `date` 
                                order by cnt desc limit 5""").show()


+---------+------+
|     date|   cnt|
+---------+------+
|2/20/2015|100915|
|2/14/2015|100345|
|2/21/2015| 98380|
|2/13/2015| 98024|
|1/31/2015| 92257|
+---------+------+

 

Reference

ipython notebook file https://github.com/tmcgrath/spark-with-python-course/blob/master/Spark-SQL-CSV-with-Python.ipynb

Featured image credit https://flic.kr/p/4FbcGr

Spark SQL MySQL Example with JDBC

Spark SQL mySQL JDBC

In this tutorial, we will cover using Spark SQL with a mySQL database input data source.

Overview

Let’s show examples of using Spark SQL mySQL.  We’re going to use mySQL in this tutorial, but you can apply the concepts presented here to any relational database which has a JDBC driver.

By the way, If you are not familiar with Spark SQL, there are a few Spark SQL tutorials on this site.

Requirements

1. MySQL instance

2. MySQL JDBC driver (download available https://dev.mysql.com/downloads/connector/j/)

3. Previously used baby_names.csv file as source data.

Quick Setup

The Spark SQL with MySQL JDBC example assumes a mysql db named “sparksql” with table called “baby_names”.  The “baby_names” table has been populated with the baby_names.csv data used in previous Spark tutorials.

Here’s a screencast on YouTube of how I setup my environment:

mysql setup for Spark SQL with MySQL (JDBC) examples

The SQL to create the baby_names table:

DROP TABLE IF EXISTS `baby_names`;

CREATE TABLE `baby_names` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `year` int(11) DEFAULT NULL,
  `first_name` varchar(100) DEFAULT NULL,
  `county` varchar(100) DEFAULT NULL,
  `sex` varchar(5) DEFAULT NULL,
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

If you have any questions about the environment setup, leave comments on this post.

Methodology

We need to pass in the mySQL JDBC driver jar when we start up the Spark Shell.  (In a Spark application, any third party libs such as a JDBC driver would be included in package.)

From shell we’re going to establish a connection to the mySQL db and then run some queries via Spark SQL.

Spark SQL with MySQL (JDBC) Example Tutorial

1. Start the spark shell with –jars argument

$SPARK_HOME/bin/sparkshell  –jars mysql-connector-java-5.1.26.jar

This example assumes the mySQL connector JDBC jar file is located in the same directory as where you are calling spark-shell.  If it is not, you can specify the path location such as:

$SPARK_HOME/bin/sparkshell  –jars /home/example/jars/mysql-connector-java-5.1.26.jar

2. Once the shell is running, let’s establish connection to mySQL db and read the baby_names table:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
...
SQL context available as sqlContext.

scala> val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "baby_names").option("user", "root").option("password", "root").load()

Change the mySQL url and user/password values in the above code appropriate to your environment.

3. Let’s confirm the dataframe by show the schema of the table

scala> dataframe_mysql.show

3. Register the data as a temp table for future SQL queries

scala> dataframe_mysql.registerTempTable("names")

4. We are now in position to run some SQL such as

scala> dataframe_mysql.sqlContext.sql("select * from names").collect.foreach(println)

Conclusion Spark SQL with MySQL (JDBC)

This example was designed to get you up and running with Spark SQL and mySQL or any JDBC compliant database.  What other examples would you like to see with Spark SQL and JDBC?  Please leave ideas or questions in comments below.

 

Featured Image credit: https://flic.kr/p/f8KB7L

Spark SQL JSON Examples

Spark SQL JSON

This tutorial covers using Spark SQL with a JSON file input data source.

Overview

We will show examples of JSON as input source to Spark SQL’s SQLContext.  This Spark SQL tutorial with JSON has two parts.  Part 1 focus is the “happy path” when using JSON with Spark SQL.  Part 2 covers a “gotcha” or something you might not expect when using Spark SQL JSON data source.

By the way, If you are not familiar with Spark SQL, a couple of references include a summary of Spark SQL chapter post and the first Spark SQL CSV tutorial.

Methodology

We build upon the previous baby_names.csv file as well as a simple file to get us started which I’ve called customers.json.  Here is a gist of customers.json.

Spark SQL JSON Example Tutorial Part 1

1. Start the spark shell

$SPARK_HOME/bin/sparkshell

2. Load the JSON using the jsonFile function from the provided sqlContext.  Following assumes you have customers.json in same directory as from where the spark-shell script was called.

RBH12103:spark-1.4.1-bin-hadoop2.4 tmcgrath$ bin/spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val customers = sqlContext.jsonFile("customers.json")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
customers: org.apache.spark.sql.DataFrame = [address: struct<city:string,state:string,street:string,zip:string>, first_name: string, last_name: string]

3. Register the data as a temp table to ease our future SQL queries

scala> customers.registerTempTable("customers")

4. We are now in position to run some SQL

scala> val firstCityState = sqlContext.sql("SELECT first_name, address.city, address.state FROM customers")
firstCityState: org.apache.spark.sql.DataFrame = [first_name: string, city: string, state: string]

scala> firstCityState.collect.foreach(println)
[James,New Orleans,LA]
[Josephine,Brighton,MI]
[Art,Bridgeport,NJ]

Ok, we started with a simple example, but the real world is rarely this simple.  So, in part 2, we’ll cover a more complex example.

Spark SQL JSON Example Tutorial Part 2

If you run the customers.json from part 1 through http://jsonlint.com, it will not validate.  You might be surprised to know that creating invalid JSON for Part 1 was intentional.  Why?  We needed JSON source which works well with Spark SQL out of the box.

If you read the Spark SQL documentation closely:

“Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.”

But, what happens if we have valid JSON?

In this part of the Spark SQL JSON tutorial, we’ll cover how to use valid JSON as an input source for Spark SQL.

As input, we’re going to convert the baby_names.csv file to baby_names.json.  There are many CSV to JSON conversion tools available… just search for “CSV to JSON converter”.

I converted and reduced the baby_names.csv to the following:

[{
	"Year": "2013",
	"First Name": "DAVID",
	"County": "KINGS",
	"Sex": "M",
	"Count": "272"
}, {
	"Year": "2013",
	"First Name": "JAYDEN",
	"County": "KINGS",
	"Sex": "M",
	"Count": "268"
}, {
	"Year": "2013",
	"First Name": "JAYDEN",
	"County": "QUEENS",
	"Sex": "M",
	"Count": "219"
}, {
	"Year": "2013",
	"First Name": "MOSHE",
	"County": "KINGS",
	"Sex": "M",
	"Count": "219"
}, {
	"Year": "2013",
	"First Name": "ETHAN",
	"County": "QUEENS",
	"Sex": "M",
	"Count": "216"
}]

I saved this as a file called baby_names.json

Steps

1. Start the spark-shell from the same directory containing the baby_names.json file

2. Load the JSON using the Spark Context wholeTextFiles method which produces a PairRDD.  Use map to create the new RDD using the value portion of the pair.

scala> val jsonRDD = sc.wholeTextFiles("baby_names.json").map(x => x._2)
jsonRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at map at <console>:21

3. Read in this RDD as JSON and confirm the schema

scala> val namesJson = sqlContext.read.json(jsonRDD)
namesJson: org.apache.spark.sql.DataFrame = [Count: string, County: string, First Name: string, Sex: string, Year: string]

scala> namesJson.printSchema
root
 |-- Count: string (nullable = true)
 |-- County: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Year: string (nullable = true)

scala>

 

Featured image credit https://flic.kr/p/9pJKgA

Spark SQL CSV Examples

Spark SQL CSV Example

In this Spark tutorial, we will use Spark SQL with a CSV input data source.  We will continue to use the baby names CSV source file as used in the previous Spark tutorials.  This tutorial presumes the reader is familiar with using SQL with relational databases and would like to know how to use with Spark.

Overview

Earlier versions of Spark SQL required a certain kind of Resilient Distributed Data set called SchemaRDD.   Starting with Spark 1.3, Schema RDD was renamed to DataFrame.

DataFrames are composed of Row objects accompanied with schema which describes the data types of each column. A DataFrame may be considered similar to a table in a traditional relational database. A DataFrame may be created from a variety of input sources including CSV text files.

This intro to Spark SQL post will uses a CSV file from previous Spark SQL tutorials.

Download the CSV version of baby names file here:

https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv?accessType=DOWNLOAD

For this and other Spark tutorials, the file has been named baby_names.csv

Methodology

We’re going to use the spark shell and the spark-csv package available from Spark Packages to make our lives easier.  It is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames”  This library is compatible with Spark 1.3 and above.

Spark SQL CSV Example Tutorial Part 1

1. Depending on your version of Scala, start the spark shell with a packages command line argument.

At time of this writing, scala 2.10 version:

SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.3.0

where SPARK_HOME is the root directory of your Spark directory; i.e. ~/Development/spark-1.4.1-bin-hadoop2.4 or c:/dev/spark-1.4.1-bin-hadoop2.4

At time of this writing, scala 2.11 version:

SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0

 

2. Using the available sqlContext from the shell load the CSV read, format, option and load functions

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val baby_names = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("baby_names.csv")
baby_names: org.apache.spark.sql.DataFrame = [Year: int, First Name: string, County: string, Sex: string, Count: int]


In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the shell in step 1.  “header” set to true signifies the first row has column names.  “inferSchema” instructs Spark to attempt to infer the schema of the CSV and finally load function passes in the path and name of the CSV source file.  In this example, we can tell the baby_names.csv file is in the same directory as where the spark-shell script was launched.

3. Register a temp table

scala> baby_names.registerTempTable("names")

4. We’re now ready to query using SQL such as finding the distinct years in the CSV

scala> val distinctYears = sqlContext.sql("select distinct Year from names")
distinctYears: org.apache.spark.sql.DataFrame = [Year: int]

scala> distinctYears.collect.foreach(println)
[2007]                                                                          
[2008]
[2009]
[2010]
[2011]
[2012]
[2013]

5. It might be handy to know the schema

scala> baby_names.printSchema
root
|-- Year: integer (nullable = true)
|-- First Name: string (nullable = true)
|-- County: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Count: integer (nullable = true)

 

Spark SQL CSV Example Tutorial Part 2

But, let’s try some more advanced SQL, such as determining the names which appear most often in the data

scala> val popular_names = sqlContext.sql("select distinct(`First Name`), count(County) as cnt from names group by `First Name` order by cnt desc LIMIT 10")
popular_names: org.apache.spark.sql.DataFrame = [First Name: string, cnt: bigint]

scala> popular_names.collect.foreach(println)
[JACOB,284]
[EMMA,284]
[LOGAN,270]
[OLIVIA,268]
[ISABELLA,259]
[SOPHIA,249]
[NOAH,247]
[MASON,245]
[ETHAN,239]
[AVA,234]

But as we learned from Spark transformation and action tutorials, this doesn’t necessarily imply the most popular names.  We need to utilize the Count column value:

scala> val popular_names = sqlContext.sql("select distinct(`First Name`), sum(Count) as cnt from names group by `First Name` order by cnt desc LIMIT 10")
popular_names: org.apache.spark.sql.DataFrame = [First Name: string, cnt: bigint]

scala> popular_names.collect.foreach(println)
[MICHAEL,10391]
[ISABELLA,9106]
[MATTHEW,9002]
[JAYDEN,8948]
[JACOB,8770]
[JOSEPH,8715]
[SOPHIA,8689]
[DANIEL,8412]
[ANTHONY,8399]
[RYAN,8187]

 

 

Featured image credit https://flic.kr/p/3CrmX