Apache Spark is an open-source distributed computing system providing fast and general-purpose cluster-computing capabilities for big data processing. Amazon Simple Storage Service (S3) is a scalable, cloud storage service originally designed for online backup and archiving of data and applications on Amazon Web Services (AWS), but it has involved into basis of object storage for analytics. Integrating Spark with S3 can enable workloads which process large datasets efficiently, while also benefiting from the features, reliability, and performance offered by both.
Spark provides built-in libraries to read from and write data to S3, while also allowing optimization of this process through configuration modifications. Additionally, security is pivotal for data processing, and a combination of Spark and S3 offers various options to manage data access control, ensuring that sensitive data remains secure.
Key Takeaways
- Spark and S3 integration enables efficient management and processing of large datasets.
- A proper configuration and setup optimize data reading and writing performance.
- Security and access control measures can be implemented to protect sensitive data.
This post will show ways and options for accessing files stored on Amazon S3 from Apache Spark. Examples of text file interaction on Amazon S3 will be shown from both Scala and Python using the spark-shell from Scala or ipython notebook for Python.
To begin, you should know there are multiple ways to access S3 based files. The options depend on a few factors such as:
- The version of Spark, because of the version of accompanying Hadoop libraries matters
- How were the files were created on S3? Were they written from Spark or Hadoop to S3 or some other 3rd party tool?
All these examples are based on Scala console or pyspark, but they may be translated to different driver programs relatively easily. If you run into any issues, just leave a comment at the bottom of this page and I’ll try to help you out.
Table of Contents
- Configuration and Setup
- Scala Spark S3 Examples
- Apache Spark with Amazon S3 PySpark Examples
- Optimization and Performance
- Security and Access Control
- Error Handling and Troubleshooting
- References
Configuration and Setup
Setting Up Spark S3 Connectivity
To set up Spark S3 connectivity, users must first obtain the required credentials. These credentials are typically an Access Key and a Secret Access Key. Once the credentials are acquired, the configuration process can begin.
- Start by configuring the
spark.hadoop.fs.s3a.access.key
andspark.hadoop.fs.s3a.secret.key
properties in thespark-defaults.conf
file.
spark.hadoop.fs.s3a.access.key=YOUR_ACCESS_KEY
spark.hadoop.fs.s3a.secret.key=YOUR_SECRET_KEY
- Alternatively, the properties can also be set through Spark’s
SparkConf
object in the code:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('S3 Connectivity Example')
conf.set('spark.hadoop.fs.s3a.access.key', 'YOUR_ACCESS_KEY')
conf.set('spark.hadoop.fs.s3a.secret.key', 'YOUR_SECRET_KEY')
sc = SparkContext(conf=conf)
S3 Libraries for Spark
Several libraries utilize full functionality with S3. They are as follows:
- Hadoop AWS: A library enabling support for the Hadoop
FileSystem
API for S3. - AWS SDK: Provides classes for using AWS services like S3, including the S3 client.
- AWS JDK: A library that implements the low-level requirements of the AWS SDK.
To add these libraries, include the following dependencies in your project’s build tool:
Maven:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.500</version>
</dependency>
sbt:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "3.2.0"
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.500"
Once these libraries are included, Spark will be able to interact with S3 and complete operations such as reading and writing data to and from S3 buckets.
Scala Spark S3 Examples
Example Load file from S3 Written By Third Party Amazon S3 tool
Requirements:
- Spark 1.4.1 pre-built using Hadoop 2.4
- File on S3 was created from Third Party – See Reference Section below for specifics on how the file was created
scala> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAJJRUVasdfasdf")
scala> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "LmuKE77fVLXJfasdfasdfxK2vj1nfA0Bp")
scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> myRDD.count
res2: Long = 35218
Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call. At the time of this writing, there are three different S3 options. See Reference section in this post for links for more information.
A person could also store the AWS credentials outside your code. For example, here’s how to set using environment variables:
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA ~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp
And then, if we restart the spark console, we don’t have to set the AWS security credentials in code. All we have to do is call textFile with appropriate protocol specifier:
scala> val myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
myRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
scala> myRDD.count
res0: Long = 35218
Example Load Text File from S3 Written from Hadoop Library
Requirements:
- Spark 1.4.1 pre-built using Hadoop 2.4
- File on S3 was created from Hadoop
- Amazon S3 credentials stored as environment variables before starting spark-shell
scala> val subset = myRDD.map(line => line.split(",")).map(n => (n(1), n(4)))
subset: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] at map at <console>:23
scala> subset.saveAsTextFile("s3://supergloospark/baby_names_s3_not_s3n.csv")
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res13: Long = 35218
Notice how s3 instead of s3n is used. Also, we’re not setting any AWS credentials because we set them as environment variables before starting spark-shell. See the previous example where AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID were set. These vars will work for either s3 or s3n.
S3 from Spark Text File Interoperability
Requirements:
- Spark 1.4.1 pre-built using Hadoop 2.4
- Run both Spark with Scala S3 examples above
- Amazon S3 credentials stored as environment variables before starting spark-shell
scala> // the following will error, because using s3 instead of s3n
scala> sc.textFile("s3://supergloospark/baby_names.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...
scala> sc.textFile("s3n://supergloospark/baby_names.csv").count()
res16: Long = 35218
scala> sc.textFile("s3://supergloospark/baby_names_s3_not_s3n.csv").count()
res19: Long = 35218
scala> sc.textFile("s3n://supergloospark/baby_names_s3_not_s3n.csv").count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3n://supergloospark/baby_names_s3_not_s3n.csv
...
Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call. At the time of this writing, there are three different S3 options. See Reference section in this post for links for more information.
Saving Data from Spark to S3
To save data from a Spark DataFrame to an S3 bucket, utilize the write
method on the DataFrame instance. The process is similar to loading data from S3:
dataframe.write.parquet("s3a://your-bucket-name/output.parquet")
In this example above, data is saved in parquet
format. You can replace the format in the write.<format>
method to save data in the desired format. Additionally, one can apply advanced options such as compression, partitioning, and overwrite modes:
- Compression: To save storage space and improve performance, enable data compression while saving:
dataframe.write.option("compression", "gzip").parquet("s3a://your-bucket-name/output.parquet")
- Partitioning: To optimize query performance on large datasets, partition data by one or more columns:
dataframe.write.partitionBy("date").parquet("s3a://your-bucket-name/output.parquet")
- Overwrite modes: Control the behavior while saving data if a file or directory already exists at the destination:
dataframe.write.mode("overwrite").parquet("s3a://your-bucket-name/output.parquet")
Apache Spark with Amazon S3 PySpark Examples
Python Example Load File from S3 Written By Third Party Amazon S3 tool
Requirements:
- Spark 1.4.1 pre-built using Hadoop 2.4
- File on S3 was created from Third Party – See Reference Section below for specifics on how the file was created
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAnotrealPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "+Uu9E4psBLnotrealyi+e7i1Z6kJANKt")
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218
There are three different S3 options. Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call. See Reference section in this post for links for more information.
You can store the AWS credentials outside your code. For example, here’s how to set using environment variables:
~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_ACCESS_KEY_ID=AKIAJJRUasdfasdfasdf33HPA ~/Development/spark-1.4.1-bin-hadoop2.4 $ export AWS_SECRET_ACCESS_KEY=LmuKE7afdasdfxK2vj1nfA0Bp
And then, if we restart the ipython notebook, we don’t have to set the AWS security credentials in code. All we have to do is call textFile with appropriate protocol specifier:
>>> sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218
Python Example Load Text File from S3 Written from Hadoop Library
Requirements:
- Spark 1.4.1 pre-built using Hadoop 2.4
- File on S3 was created from Hadoop
>>> myRDD = sc.textFile("s3n://supergloospark/baby_names.csv")
>>> subset = myRDD.filter(lambda line: "MICHAEL" in line)
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "AKIAI74O5KPLUQGVOJWQ")
>>> sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "+Uu9E4psBLJgJPNFeV2GdevWcyi+e7i1Z6kJANKt")
>>> subset.saveAsTextFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv")
>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206
Note how this example is using s3 instead of s3n in setting security credentials and protocol specification in textFile call. Unlike comparable Scala example above, we are setting the AWS keys again because we are using s3 instead of s3n. We can avoid having to set either if we store these values in external environment vars as noted above.
Python with S3 from Spark Text File Interoperability
Requirements:
- Spark 1.4.1 pre-built using Hadoop 2.4
- Run both Spark with Python S3 examples above
>>> sc.textFile("s3n://supergloospark/baby_names.csv").count()
35218
>>> sc.textFile("s3://supergloospark/python_example_baby_names_s3_not_s3n.csv").count()
206
>>> sc.textFile("s3://supergloospark/baby_names.csv").count()
...
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: s3://supergloospark/baby_names.csv
...
Note how this example is using s3n instead of s3 in setting security credentials and protocol specification in textFile call. At the time of this writing, there are three different S3 options. See Reference section in this post for links for more information.
Optimization and Performance
Optimizing Spark S3 integration involves focusing on two key aspects: Improving Data Transfer Rates and Caching Strategies. This section will present ways of achieving better performance in both these areas.
Improving Data Transfer Rates
To improve data transfer rates when using Spark with S3, consider the following recommendations:
- Parallelize transfers: Utilize parallelization to transfer larger amounts of data in a shorter time. Configure Spark to use a higher number of executor cores, thereby increasing the number of tasks, resulting in faster data transfers.
- Tune the JVM: To improve Spark performance, ensure optimal JVM configurations by setting appropriate values for
spark.executor.memory
,spark.driver.memory
, andspark.driver.maxResultSize
. - Select an appropriate file format: Optimal file formats, such as Parquet or Avro, can result in significant performance improvements. These formats maximize compression and enable predicate pushdown, reducing I/O operations and improving data transfer.
Caching Strategies
Implementing effective caching strategies is crucial for enhancing Spark S3 performance. The following guidelines will help achieve this goal:
- Use RDD Caching: When reusing an RDD multiple times during processing, cache the RDD to avoid recomputing it every time it is accessed. Use the
persist()
orcache()
methods on RDDs to achieve this. - Leverage Spark Broadcast Variables: Broadcast variables are used to cache a specific set of data on each executor for efficient access. Use them to store frequently used data such as lookup tables. For more on Spark broadcast variables.
- Employ Data Partitioning: Partition data intelligently to enhance performance by minimizing data shuffling during Spark operations. This can be achieved with functions like
repartition()
andpartitionBy()
.
By focusing on these strategies, Spark S3 integration can yield significant performance improvements, leading to optimal processing efficiency and enhanced overall system performance.
Security and Access Control
When using Spark with Amazon S3, various aspects of security and access control must be considered. Let’s mention two key components of securing your data: Authentication Methods and Encryption Options.
Authentication Methods
There are several methods available for authentication when accessing data stored in S3:
- AWS Identity and Access Management (IAM): IAM is the primary method for managing and controlling access to S3 resources. You can create and manage fine-grained policies to control access to buckets and objects, allowing you to grant specific permissions to users, groups, or roles.
- Access Control Lists (ACLs): This method allows you to define overall access permissions for an object or bucket. ACLs grant different levels of access to users, groups, or accounts, such as read, write, or full control.
- Bucket Policies: These policies are used to adjust access permissions at the bucket level. They are JSON-based policies similar to IAM policies, but are attached directly to S3 buckets.
- S3 Presigned URLs: Presigned URLs provide a secure way to grant temporary access to an object in S3. They are used to generate a URL that includes a cryptographic signature to allow access to an object for a specified duration.
Encryption Options
There are two primary encryption options available in S3:
- Server-Side Encryption (SSE): With server-side encryption, S3 automatically encrypts data when it’s written to the infrastructure and decrypts it when it’s read. There are three types of server-side encryption:
- SSE-S3: Amazon S3 handles the encryption, decryption and key management using its own encryption keys. This is the simplest server-side encryption option.
- SSE-KMS: This option uses AWS Key Management Service (KMS) to manage the encryption keys. It provides additional control over the cryptographic operations, such as auditing, access control, and key rotation.
- SSE-C: With this option, the customer provides their own encryption key. Amazon S3 manages the encryption and decryption but uses the supplied key.
- Client-Side Encryption: In this approach, data is encrypted by the client before being sent to Amazon S3 for storage. The customer is responsible for the management and protection of the encryption keys.
By implementing strong authentication methods and the appropriate encryption options, Spark users can ensure that their data stored in S3 remains secure and accessible only to authorized users.
Error Handling and Troubleshooting
Common Spark S3 Errors
There are several Spark S3 errors that users might encounter during the execution of their jobs. Here is a list of some common errors and their possible causes:
- S3A connection error: This error may occur when Spark is unable to establish a connection to the S3 service. It can be caused by incorrect AWS credentials, firewall issues, or network connectivity problems.
- Read timeout error: This error happens when Spark takes too long to read data from S3, which could be a result of slow network speeds or large file sizes.
- Write timeout error: Similar to read timeout errors, write timeout errors may occur when Spark is unable to write data to S3 in a timely manner.
- NoSuchKey error: This error occurs when the requested object key does not exist in the specified bucket.
Log Analysis
Analyzing logs is an essential part of troubleshooting Spark S3 issues. Here are some tips for log analysis:
- Enable debug logging: It can give more insights into the internal workings of Spark S3, making it easier to identify errors and their causes.
- Search for error messages: Look for keywords such as “error”, “exception”, or “failure” in the logs to quickly locate potential issues.
- Correlate timestamps: Match the timestamps of errors with relevant events in the system to determine possible causes.
- Review stack traces: Stack traces provide detailed information about the execution path leading to an error, which can help pinpoint the root cause.
Debugging
Once potential issues have been identified through log analysis, proper debugging techniques can be employed to resolve them. Here are some general steps to follow for efficient debugging:
- Reproduce the error: It is crucial to consistently reproduce the error in order to accurately identify and address the root cause.
- Isolate the issue: Narrow down the specific operation, component, or configuration causing the issue by eliminating other potential factors.
- Test hypotheses: Formulate hypotheses on the possible causes and test them systematically until the root cause is found.
- Apply fixes and verify: Implement the necessary changes to resolve the issue and verify the fix by running the job again.
By following the steps mentioned above, one can effectively handle and troubleshoot Spark S3 errors while optimizing the overall performance and stability of their data processing workloads.
References
1) To create the files on S3 outside of Spark/Hadoop, I used a client called Forklift. But, Forklift isn’t a requirement as there are many S3 clients available. Here’s a screencast example of configuring Amazon S3 and copying the file up to the S3 bucket.
2) For more information on different S3 options, see Amazon S3 page on Hadoop wiki http://wiki.apache.org/hadoop/AmazonS3
3) Additional tutorials around Amazon AWS and Spark include Spark Kinesis example and be sure to watch Spark with Scala tutorials and PySpark tutorials landing pages.
I’m using the exact approach as yours (using Spark (Scala) to read CSV from S3).
After the textFile() method, I’m chaining the toDF() method to convert RDD into Dataframe. Following this, I’m trying to printSchema() of the dataframe.
I’m facing following problems:
1. the textFile() method won’t succeed (read 2nd point for the error message) but the printSchema() would still generate output (single String type column called ‘value’)
This is perplexing since i’m not aware of the textFile() method or the toDF() method to be asynchronous (so that the execution can advance without completion of the textFile().toDF() statement)
2. Ultimately, the textFile() method fails with
.. org.jets3t.service.S3ServiceException: Service Error Message. — ResponseCode: 403, ResponseStatus: Forbidden, .. Access denied ..
Now before you ask, I can confirm that:
(a) I’m able to use aws cli to display the contents of the s3 bucket (aws s3 ls s3://bucket-name..) using the same access-key-id and the secret-access-key
(b) My secret access key contains only a plus ‘+’ character and no slashes ‘/’. I’ve already tried secretKey.replace(“/”,”%2F”) method
(c) My s3 bucket path contains directories (or key, whatever it is called) with underscore ‘_’ and period ‘.’ Can this be a source of problem (even though the error says Access Deined)?
The solution that worked for me was replacing ‘s3’ with ‘s3n’ everywhere (while setting hadoopConfiguration as well as while in textFile() method).
I’m sure this isn’t a general solution and this problem arised due to the particular configuration of the s3 instance that I was accessing.
Hello Techmates,
I have created my aws free account and uploaded a weather file in a bucket (region:: sa-east-1 :: South America).
Afterwards, I have been trying to read a file from AWS S3 bucket by pyspark as below::
from pyspark import SparkConf, SparkContext
ak=’*****’
sk=’*****’
sc._jsc.hadoopConfiguration().set(“fs.s3.impl”,”org.apache.hadoop.fs.s3.S3FileSystem”)
sc._jsc.hadoopConfiguration().set(“fs.s3.awsAccessKeyId”,ak)
sc._jsc.hadoopConfiguration().set(“fs.s3.awsSecretAccessKey”,sk)
a=sc.textFile(“s3://bucket_name/weatherhistory.txt”);
a.collect()
But it is showing :: /weatherhistory.txt does not exists.
But, when am trying the same using python (boto3), I can easily read the file.
import boto
import boto.s3.connection
access_key = ‘*****’
secret_key = ‘*****’
conn = boto.connect_s3(bucket_name,
aws_access_key_id = access_key,
aws_secret_access_key = secret_key)
…..
…..
Even have listed the keys spark-default.conf as well
[default]
aws_access_key_id=*****
aws_secret_access_key=*****
But, still the error is appearing as :: /weatherhistory.txt does not exists.
have tried this approach as well but the error is same.
conf = (SparkConf()
.setAppName(“S3 Configuration Test”)
.set(“spark.executor.instances”, “1”)
.set(“spark.executor.cores”, 1)
.set(“spark.executor.memory”, “2g”)
.set(“fs.s3.awsAccessKeyId”, “*****”)
.set(“fs.s3.awsSecretAccessKey”, “*****”)
.set(“fs.s3.endpoint”, “s3-sa-east-1.amazonaws.com”)
.set(“com.amazonaws.services.s3.enableV4”, “true”)
.set(“fs.s3.impl”, “org.apache.hadoop.fs.s3.S3FileSystem”))
sc.conf=conf
a=sc.textFile(“s3://bucketname/weatherhistory.txt”)
Even have tried to write a file thinking that my directory pointing was not correct and if the file write is successful, could pin point the path where it is pointing now but still no progress and say no path exists.
If you please could guide us in this regard, it would really be helpful. Thanks in advance.
Is spark running on standalone here?