Spark Read CSV with Scala: A Comprehensive Guide


In this Spark Read CSV in Scala tutorial, we will create a DataFrame from a CSV source and query it with Spark SQL. Both simple and advanced examples will be explored and cover topics such as inferring schema from the header row of a CSV file.  

** Updated April 2023 **

Starting in Spark 2.0, has a native API for reading CSV from Scala. This makes it convenient to read and process CSV files using Scala code. Prior to Spark 2.0, it wasn’t as straightforward to read CSV files from Spark 1.0. The newer, inlined Spark CSV library provides a simple and efficient way to read and write CSV files using Spark.

In this Spark Read CSV tutorial, we will query a DataFrame using Spark SQL by reading in a CSV file data source.  This tutorial presumes the reader is familiar with using SQL, but wants to know how to apply it in Spark SQL in Spark.

One of the key features of Spark is its ability to read and process large amounts of data from various sources, including CSV files, quickly and efficiently.

Let’s review reading and processing CSV files with Spark in Scala.

Before we begin, it’s important to note: the way to perform Spark CSV reading in Scala has changed over history and this Spark read csv tutorial covers the approaches over time. Feel free to skip to the present day section.

Table of Contents

Overview

Earlier versions of Spark SQL required a certain kind of Resilient Distributed Data set called SchemaRDD. Starting in Spark 1.3, Schema RDD was renamed to DataFrame. The Spark Read CSV examples below use DataFrames.

DataFrames are composed of Row objects accompanied with a schema describing the data types of each column. If this is new to you, a DataFrame is similar to a table in a traditional relational database.

A DataFrame may be created from a variety of input sources including CSV text files.

In this tutorial, I’ll download a CSV file of baby names file found here:

https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv?accessType=DOWNLOAD

For this Spark read CSV from Scala tutorial, the file has been named baby_names.csv

Spark Read CSV Methodology [Pre Spark 2.0, Original and Outdated]

In the early days of Spark, reading CSV files required downloading and adding a special CSV package.

In this example, I’ll show the spark shell and load the spark-csv package available from Spark Packages.  It is described as a “library for parsing and querying CSV data with Apache Spark, for Spark SQL and DataFrames”  This library is compatible with Spark 1.3 and above, but not necessary starting in Spark 2.0. As mentioned feel free to skip ahead if not interested in the older way to read CSV files in Spark.

Spark 1 SQL Read CSV Example Tutorial Part 1

1. Load Spark-CSV package when starting Spark Shell

Depending on your version of Scala, start the spark shell with a packages command line argument.

For Scala 2.10 version:

SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.3.0

where SPARK_HOME is the root directory of your Spark directory; i.e. ~/Development/spark-1.4.1-bin-hadoop2.4 or c:/dev/spark-1.4.1-bin-hadoop2.4

Or when using Scala 2.11 version:

SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0

2. Spark Read CSV with Header

Here we will show how to read in our example CSV file which includes a header row. This header row will provide the means to refer to columns by name rather than index location. It’s another way of specifying the schema during read.

In early versions, we use to use the available sqlContext from the shell load the CSV read, format, option and load functions as shown here:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val baby_names = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("baby_names.csv")
baby_names: org.apache.spark.sql.DataFrame = [Year: int, First Name: string, County: string, Sex: string, Count: int]

In the above code, we are specifying the desire to use com.databricks.spark.csv format from the package we passed to the spark-shell in step 1 and created a DataFrame from the source CSV file.

“header” set to true signifies the first row has column names.  

“inferSchema” instructs Spark to attempt to infer the schema of the CSV when reading.

“load” function simply passes in the path and name of the CSV source file.  

This is an early example of how to read a csv file with header row in Spark.

In this example, the baby_names.csv file is in the same directory as where the spark-shell script was launched.

3. Register a temp table to query with SQL

Next, let’s register a temporary table with a specific name, so we can use it in SQL queries.

scala> baby_names.registerTempTable("names")

4. Spark SQL CSV Examples

Ok, we’re now ready to query our CSV based DataFrame using SQL such as finding the distinct years shown here:

scala> val distinctYears = sqlContext.sql("select distinct Year from names")
distinctYears: org.apache.spark.sql.DataFrame = [Year: int]

scala> distinctYears.collect.foreach(println)
[2007]                                                                          
[2008]
[2009]
[2010]
[2011]
[2012]
[2013]

5. Spark SQL CSV Print Schema

It might be handy to know the schema which is easily achieved with printSchema as shown:

scala> baby_names.printSchema
root
|-- Year: integer (nullable = true)
|-- First Name: string (nullable = true)
|-- County: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Count: integer (nullable = true)

Spark SQL CSV with Header Example Tutorial Part 2

Let’s try some more advanced SQL, such as determining the names which appear most often in the data

scala> val popular_names = sqlContext.sql("select distinct(`First Name`), count(County) as cnt from names group by `First Name` order by cnt desc LIMIT 10")
popular_names: org.apache.spark.sql.DataFrame = [First Name: string, cnt: bigint]

scala> popular_names.collect.foreach(println)
[JACOB,284]
[EMMA,284]
[LOGAN,270]
[OLIVIA,268]
[ISABELLA,259]
[SOPHIA,249]
[NOAH,247]
[MASON,245]
[ETHAN,239]
[AVA,234]

But, as we have already learned from Spark transformation in Scala and Spark actions in Scala tutorials, this doesn’t necessarily imply the most popular names.  We need to utilize the count column value:

scala> val popular_names = sqlContext.sql("select distinct(`First Name`), sum(Count) as cnt from names group by `First Name` order by cnt desc LIMIT 10")
popular_names: org.apache.spark.sql.DataFrame = [First Name: string, cnt: bigint]

scala> popular_names.collect.foreach(println)
[MICHAEL,10391]
[ISABELLA,9106]
[MATTHEW,9002]
[JAYDEN,8948]
[JACOB,8770]
[JOSEPH,8715]
[SOPHIA,8689]
[DANIEL,8412]
[ANTHONY,8399]
[RYAN,8187]
Spark Read CSV Scala 2

Spark Read CSV [2023 Version]

The spark-csv package was inlined into Spark 2.0 and beyond. This makes our lives easier because it removes the need to load the package as shown above.

All the previous examples above work as I will show next with Spark 3.4.

Spark Read CSV Scala 4
Spark 3.4 with baby_names.csv file in current working directory

1. Read CSV with Schema from Spark

$ bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 08:23:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.16:4040
Spark context available as 'sc' (master = local[*], app id = local-1683120223560).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 16.0.2)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val baby_names = spark.read.option("header", "true").option("inferSchema", "true").csv("baby_names.csv")
baby_names: org.apache.spark.sql.DataFrame = [Year: int, First Name: string ... 3 more fields]

By using the spark.read method as shown above, we can easily read CSV files into a DataFrame for further analysis. The option parameter can be used to specify options such as the delimiter, header, and encoding of the CSV file.

The callouts from the above example are chaining the option functions to indicate CSV file has a header row and Spark should attempt to infer the schema from it. We can see above the schema type has been inferred when seeing “Year” is an int and “First Name” is a string.

Now, let’s run some queries on this CSV based DataFrame.

2. Spark SQL with CSV Examples

Now, that we have the CSV based DataFrame, let’s continue to run SparkSession instead of SQLContext and show some example queries.

The way to register a temporary table, so we can utilize from SQL in upcoming examples has changed as shown:

scala> baby_names.createOrReplaceTempView("names")

Now, let’s run some SQL on our CSV

scala> spark.sql("select distinct(`First Name`), sum(Count) as cnt from names group by `First Name` order by cnt desc LIMIT 10").show
+----------+-----+
|First Name|  cnt|
+----------+-----+
|   MICHAEL|17118|
|     JACOB|16135|
|  ISABELLA|15527|
|    SOPHIA|15467|
|    OLIVIA|15361|
|   MATTHEW|15117|
|    JOSEPH|15080|
|      EMMA|14797|
|     ETHAN|14780|
|      LIAM|14481|
+----------+-----+

Spark Read CSV with Spark SQL Conclusion

As we saw above reading CSV files has changed from Spark 1 to Spark 2 and beyond because the spark-csv package is available by default now. In this Spark read csv tutorial, we covered examples of using SQL to query the DataFrame the older and new ways. Hope this helped and if you have any questions or suggestions for me, just let me know.

Before you go, be sure to bookmark or check out Spark SQL with Scala tutorials for more Spark SQL with Scala including Spark SQL with JSON and Spark SQL with JDBC.  And for tutorials in Scala, see Spark Tutorials in Scala page.

See also  Spark Read JDBC Examples with mySQL
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

5 thoughts on “Spark Read CSV with Scala: A Comprehensive Guide”

  1. Hello Admin,

    I have a doubt about “start the spark shell with a packages command line argument” (step 1 – Spark SQL CSV Example Tutorial Part 1). How can I to setup these parameters in Ubuntu Linux? Because I typed that line on the terminal but isn’t works! Is it necessary to setup some file as ~/.bashrc?

    I appreciated! Thanks!

    Reply
      • Hello Admin,

        I typed that line about colors, some like this “[craynion …. line=sasfghr456778]$SPARK_HOME/bin/spark-shell –packages com.databricks:spark-csv_2.10:1.3.0”

        But I saw that the page is different now. Did you change? I think so, because the line command is “SPARK_HOME/bin/spark-shell –packages com.databricks:spark-csv_2.10:1.3.0”! I tried and it works!!! Thanks!!! : )

        I appreciate your support! I’m sorry about my English, I’m from Brazil!

        Best regards!

        Reply
        • Great! Glad to hear it’s working for you. I did change the page based on your comment, so thank you too!

          Your English is fine. No worries.

          Reply
  2. Hey Admin,

    I am trying to count the no. of rows of a particular column having a certain condition , but i am having trouble using the SqlContext in order to achieve it.Can u give me some suggestions?

    Reply

Leave a Comment