Apache Spark with Amazon S3 Examples of Text Files Tutorial

Apache Spark with Amazon S3 setup

This post will show ways and options for accessing files stored on Amazon S3 from Apache Spark.  Examples of text file interaction on Amazon S3 will be shown from both Scala and Python using the spark-shell from Scala or ipython notebook for Python.

To begin, you should know there are multiple ways to access S3 based files.  The options depend on a few factors such as:

  • Which version of Spark you are using, because the version of Hadoop matters
  • How were the files were created on S3? Were they written from Spark or Hadoop to S3 or some other 3rd party tool?

All these examples are based on scala console or pyspark, but they may be translated to different driver programs relatively easily.  If you run into any issues, just leave a comment at the bottom of this page and I’ll try to help you out.

Apache Spark with Amazon S3 Scala Examples

Example Load file from S3 Written By Third Party Amazon S3 tool

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAJJRUVasdfasdf")
scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "LmuKE77fVLXJfasdfasdfxK2vj1nfA0Bp")
scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> myRDD.count
res2: Long = 35218

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

A person could also store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the spark console, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> myRDD.count
res0: Long = 35218

Example Load Text File from S3 Written from Hadoop Library

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> val subset = myRDD.map(line => line.split(",")).map(n => (n(1), n(4)))
subset: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] at map at <console>:23

scala> subset.saveAsTextFile("s3://supergloospark/baby_names_s3_not_s3n.csv")
                                                                                
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res13: Long = 35218

Notice how s3 instead of s3n is used.  Also, we’re not setting any AWS credentials because we set them as environment variables before starting spark-shell.  See the previous example where AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID were set.  These vars will work for either s3 or s3n.

 

S3 from Spark Text File Interoperability

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Scala S3 examples above
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> // the following will error, because using s3 instead of s3n
scala> sc.textFile("s3://supergloospark/baby_names.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...
scala> sc.textFile("s3n://supergloospark/baby_names.csv").count()
res16: Long = 35218
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res19: Long = 35218                                                             
scala> sc.textFile("s3n://supergloospark/baby_names_s3_not_s3n.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://supergloospark/baby_names_s3_not_s3n.csv
...

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

 

Apache Spark with Amazon S3 Python Examples

Python Example Load File from S3 Written By Third Party Amazon S3 tool

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAnotrealPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "+Uu9E4psBLnotrealyi+e7i1Z6kJANKt")
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

There are three different S3 options.  Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  See Reference section in this post for links for more information.

You can store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the ipython notebook, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

>>> sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

Python Example Load Text File from S3 Written from Hadoop Library

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
>>> subset = myRDD.filter(lambda line: "MICHAEL" in line)
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AKIAI74O5KPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "+Uu9E4psBLJgJPNFeV2GdevWcyi+e7i1Z6kJANKt")
>>> subset.saveAsTextFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv")
>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206

Note how this example is using s3 instead of s3n in setting security credentials and protocol specification in textFile call.  Unlike comparable Scala example above, we are setting the AWS keys again because we are using s3 instead of s3n.  We can avoid having to set either if we store these values in external environment vars as noted above.

Python with S3 from Spark Text File Interoperability

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Python S3 examples above
>>>  sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206

>>> sc.textFile("s3://supergloospark/baby_names.csv").count()
...
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

 

References

1)  To create the files on S3 outside of Spark/Hadoop, I used a client called Forklift.  But, Forklift isn’t a requirement as there are many S3 clients available.  Here’s a screencast example of configuring Amazon S3 and copying the file up to the S3 bucket.

Apache Spark with Amazon S3 setup

2) For more information on different S3 options, see Amazon S3 page on Hadoop wiki http://wiki.apache.org/hadoop/AmazonS3

3) ipython notebook file available on github

 

 

Featured Image Credit https://flic.kr/p/nvoqRm

2 thoughts on “Apache Spark with Amazon S3 Examples of Text Files Tutorial

  1. I’m using the exact approach as yours (using Spark (Scala) to read CSV from S3).

    After the textFile() method, I’m chaining the toDF() method to convert RDD into Dataframe. Following this, I’m trying to printSchema() of the dataframe.

    I’m facing following problems:

    1. the textFile() method won’t succeed (read 2nd point for the error message) but the printSchema() would still generate output (single String type column called ‘value’)
    This is perplexing since i’m not aware of the textFile() method or the toDF() method to be asynchronous (so that the execution can advance without completion of the textFile().toDF() statement)

    2. Ultimately, the textFile() method fails with
    .. org.jets3t.service.S3ServiceException: Service Error Message. — ResponseCode: 403, ResponseStatus: Forbidden, .. Access denied ..

    Now before you ask, I can confirm that:
    (a) I’m able to use aws cli to display the contents of the s3 bucket (aws s3 ls s3://bucket-name..) using the same access-key-id and the secret-access-key
    (b) My secret access key contains only a plus ‘+’ character and no slashes ‘/’. I’ve already tried secretKey.replace(“/”,”%2F”) method
    (c) My s3 bucket path contains directories (or key, whatever it is called) with underscore ‘_’ and period ‘.’ Can this be a source of problem (even though the error says Access Deined)?

    1. The solution that worked for me was replacing ‘s3’ with ‘s3n’ everywhere (while setting hadoopConfiguration as well as while in textFile() method).

      I’m sure this isn’t a general solution and this problem arised due to the particular configuration of the s3 instance that I was accessing.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.