Apache Spark with Amazon S3 Examples

This post will show ways and options for accessing files stored on Amazon S3 from Apache Spark.  Examples of text file interaction on Amazon S3 will be shown from both Scala and Python using the spark-shell from Scala or ipython notebook for Python.

To begin, you should know there are multiple ways to access S3 based files.  The options depend on a few factors such as:

  • The version of Spark, because of the version of accompanying Hadoop libraries matters
  • How were the files were created on S3? Were they written from Spark or Hadoop to S3 or some other 3rd party tool?

All these examples are based on Scala console or pyspark, but they may be translated to different driver programs relatively easily.  If you run into any issues, just leave a comment at the bottom of this page and I’ll try to help you out.

Table of Contents

Apache Spark with Amazon S3 Scala Examples

Example Load file from S3 Written By Third Party Amazon S3 tool

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAJJRUVasdfasdf")
scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "LmuKE77fVLXJfasdfasdfxK2vj1nfA0Bp")
scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> myRDD.count
res2: Long = 35218

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

See also  How to Use Spark Submit Command to Deploy

A person could also store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the spark console, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> myRDD.count
res0: Long = 35218

Example Load Text File from S3 Written from Hadoop Library

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> val subset = myRDD.map(line => line.split(",")).map(n => (n(1), n(4)))
subset: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] at map at <console>:23

scala> subset.saveAsTextFile("s3://supergloospark/baby_names_s3_not_s3n.csv")
                                                                                
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res13: Long = 35218

Notice how s3 instead of s3n is used.  Also, we’re not setting any AWS credentials because we set them as environment variables before starting spark-shell.  See the previous example where AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID were set.  These vars will work for either s3 or s3n.

S3 from Spark Text File Interoperability

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Scala S3 examples above
  • Amazon S3 credentials stored as environment variables before starting spark-shell
scala> // the following will error, because using s3 instead of s3n
scala> sc.textFile("s3://supergloospark/baby_names.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...
scala> sc.textFile("s3n://supergloospark/baby_names.csv").count()
res16: Long = 35218
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res19: Long = 35218                                                             
scala> sc.textFile("s3n://supergloospark/baby_names_s3_not_s3n.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://supergloospark/baby_names_s3_not_s3n.csv
...

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

See also  Spark Submit Command Line Arguments

Apache Spark with Amazon S3 Python Examples

Python Example Load File from S3 Written By Third Party Amazon S3 tool

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Third Party –  See Reference Section below for specifics on how the file was created
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAnotrealPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "+Uu9E4psBLnotrealyi+e7i1Z6kJANKt")
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

There are three different S3 options.  Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  See Reference section in this post for links for more information.

You can store the AWS credentials outside your code.  For example, here’s how to set using environment variables:

~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp

And then, if we restart the ipython notebook, we don’t have to set the AWS security credentials in code.  All we have to do is call textFile with appropriate protocol specifier:

>>> sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

Python Example Load Text File from S3 Written from Hadoop Library

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • File on S3 was created from Hadoop
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
>>> subset = myRDD.filter(lambda line: "MICHAEL" in line)
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AKIAI74O5KPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "+Uu9E4psBLJgJPNFeV2GdevWcyi+e7i1Z6kJANKt")
>>> subset.saveAsTextFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv")
>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206

Note how this example is using s3 instead of s3n in setting security credentials and protocol specification in textFile call.  Unlike comparable Scala example above, we are setting the AWS keys again because we are using s3 instead of s3n.  We can avoid having to set either if we store these values in external environment vars as noted above.

Python with S3 from Spark Text File Interoperability

Requirements:

  • Spark 1.4.1 pre-built using Hadoop 2.4
  • Run both Spark with Python S3 examples above
>>>  sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218

>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206

>>> sc.textFile("s3://supergloospark/baby_names.csv").count()
...
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...

Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call.  At the time of this writing, there are three different S3 options.  See Reference section in this post for links for more information.

See also  What is Apache Spark? An Essential Overview

References

1)  To create the files on S3 outside of Spark/Hadoop, I used a client called Forklift.  But, Forklift isn’t a requirement as there are many S3 clients available.  Here’s a screencast example of configuring Amazon S3 and copying the file up to the S3 bucket.

2) For more information on different S3 options, see Amazon S3 page on Hadoop wiki http://wiki.apache.org/hadoop/AmazonS3

3) Additional tutorials around Amazon AWS and Spark include Spark Kinesis example and be sure to watch Spark with Scala tutorials and PySpark tutorials landing pages.

Featured Image Credit https://flic.kr/p/nvoqRm

4 thoughts on “Apache Spark with Amazon S3 Examples”

  1. I’m using the exact approach as yours (using Spark (Scala) to read CSV from S3).

    After the textFile() method, I’m chaining the toDF() method to convert RDD into Dataframe. Following this, I’m trying to printSchema() of the dataframe.

    I’m facing following problems:

    1. the textFile() method won’t succeed (read 2nd point for the error message) but the printSchema() would still generate output (single String type column called ‘value’)
    This is perplexing since i’m not aware of the textFile() method or the toDF() method to be asynchronous (so that the execution can advance without completion of the textFile().toDF() statement)

    2. Ultimately, the textFile() method fails with
    .. org.jets3t.service.S3ServiceException: Service Error Message. — ResponseCode: 403, ResponseStatus: Forbidden, .. Access denied ..

    Now before you ask, I can confirm that:
    (a) I’m able to use aws cli to display the contents of the s3 bucket (aws s3 ls s3://bucket-name..) using the same access-key-id and the secret-access-key
    (b) My secret access key contains only a plus ‘+’ character and no slashes ‘/’. I’ve already tried secretKey.replace(“/”,”%2F”) method
    (c) My s3 bucket path contains directories (or key, whatever it is called) with underscore ‘_’ and period ‘.’ Can this be a source of problem (even though the error says Access Deined)?

    Reply
    • The solution that worked for me was replacing ‘s3’ with ‘s3n’ everywhere (while setting hadoopConfiguration as well as while in textFile() method).

      I’m sure this isn’t a general solution and this problem arised due to the particular configuration of the s3 instance that I was accessing.

      Reply
  2. Hello Techmates,
    I have created my aws free account and uploaded a weather file in a bucket (region:: sa-east-1 :: South America).
    Afterwards, I have been trying to read a file from AWS S3 bucket by pyspark as below::
    from pyspark import SparkConf, SparkContext
    ak=’*****’
    sk=’*****’
    sc._jsc.hadoopConfiguration().set(“fs.s3.impl”,”org.apache.hadoop.fs.s3.S3FileSystem”)
    sc._jsc.hadoopConfiguration().set(“fs.s3.awsAccessKeyId”,ak)
    sc._jsc.hadoopConfiguration().set(“fs.s3.awsSecretAccessKey”,sk)
    a=sc.textFile(“s3://bucket_name/weatherhistory.txt”);
    a.collect()
    But it is showing :: /weatherhistory.txt does not exists.

    But, when am trying the same using python (boto3), I can easily read the file.
    import boto
    import boto.s3.connection
    access_key = ‘*****’
    secret_key = ‘*****’
    conn = boto.connect_s3(bucket_name,
    aws_access_key_id = access_key,
    aws_secret_access_key = secret_key)
    …..
    …..
    Even have listed the keys spark-default.conf as well
    [default]
    aws_access_key_id=*****
    aws_secret_access_key=*****
    But, still the error is appearing as :: /weatherhistory.txt does not exists.

    have tried this approach as well but the error is same.
    conf = (SparkConf()
    .setAppName(“S3 Configuration Test”)
    .set(“spark.executor.instances”, “1”)
    .set(“spark.executor.cores”, 1)
    .set(“spark.executor.memory”, “2g”)
    .set(“fs.s3.awsAccessKeyId”, “*****”)
    .set(“fs.s3.awsSecretAccessKey”, “*****”)
    .set(“fs.s3.endpoint”, “s3-sa-east-1.amazonaws.com”)
    .set(“com.amazonaws.services.s3.enableV4”, “true”)
    .set(“fs.s3.impl”, “org.apache.hadoop.fs.s3.S3FileSystem”))
    sc.conf=conf
    a=sc.textFile(“s3://bucketname/weatherhistory.txt”)

    Even have tried to write a file thinking that my directory pointing was not correct and if the file write is successful, could pin point the path where it is pointing now but still no progress and say no path exists.

    If you please could guide us in this regard, it would really be helpful. Thanks in advance.

    Reply

Leave a Comment