Apache Spark Advanced Cluster Deploy Troubleshooting


In this Apache Spark cluster troubleshooting tutorial, we’ll review a few options when your Scala Spark code does not deploy as anticipated.  For example, does your Spark driver program rely on a 3rd party jar only compatible with Scala 2.11, but your Spark Cluster is based on Scala 2.10?  Maybe your code relies on a newer version of a 3rd party jar also used by Apache Spark?  Or maybe you want your code to use the Spark version of a particular jar instead of the jar specified by your code.

In any of these cases, your deploy to the Spark Cluster will not be smooth.  So, in this post, we’ll explore ways how to address all three issues.

Table of Contents

Overview

We’re going to address three specific issues when deploying to a Spark cluster and how to address in this post:

  1. Using Apache Spark with Scala 2.11
  2. Overriding jars used by Spark with newer versions
  3. Excluding jars from your code in order to use the Spark version instead

All of these issues will be addressed based on the spark streaming code used in a previous Spark Streaming Example tutorial.  Links to source code download and screencasts are available at the end of this post.

Challenge 1 Apache Spark with Scala 2.11

I had no idea things could go so wrong with the following build.sbt file.

name := "spark-streaming-example"
version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.scalaj" %% "scalaj-http" % "2.3.0",
  "org.jfarcand" % "wcs" % "1.5"
)

Although it doesn’t stand out, the issue is going to be with “wcs” WebSocket client library.   The “wcs” WebSocket client library does not work with an Apache Spark cluster compiled to Scala 2.10.

See also  Spark Broadcast Variables When, Why, Examples, and Alternatives

There was the error

Exception in thread "Thread-28" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
    at scalaj.http.HttpConstants$.liftedTree1$1(Http.scala:637)

An error such as this indicates a Scala version incompatibility issue.

wcs is compiled to Scala 2.11 and I couldn’t find another WebSocket client to use in this project, so I explored compiling Spark to Scala 2.11 compatibility.  It turns out this isn’t a big deal.

Build Apache Spark with Scala 2.11 Steps

  1. Download Source (screencast below)
  2. Run script to change 2.11
  3. Run make-distribution.sh script

Screencast of these three steps:

Commands run in Screencast:

./dev/change-scala-version.sh 2.11

./make-distribution.sh --name spark-1.6.1-scala-2.11 --tgz -Dscala-2.11 -Pyarn -Phadoop-2.4

After creating the distribution, I started the Spark master and worker and optimistically tried to deploy again.  That’s how I ran into the next challenge.

Challenge 2 Incompatible Jars between Spark and Scala program – Use Your Jar

When I tried deploying the assembly jar to the new Spark cluster custom built for Scala 2.11, I ran into another issue as shown in this screencast:

As you see in the screencast, there were issues with SSL in Netty based HttpClient.

After asking Dr. Googs (see reference links below), I determined Spark uses Akka Actor for RPC and messaging, which in turn uses Netty.  And it turns out the Spark/ Akka version of Netty is an incompatible version with the needed by the scalaj-http library used in this project.

Recall from the “sbt assembly” command, the following as assembled:

~/Development/spark-course/spark-streaming $ sbt assembly
[info] Loading project definition from /Users/toddmcgrath/Development/spark-course/spark-streaming/project
[info] Set current project to spark-streaming-example (in build file:/Users/toddmcgrath/Development/spark-course/spark-streaming/)
[info] Including from cache: wcs-1.5.jar
[info] Including from cache: slf4j-api-1.7.12.jar
[info] Including from cache: scalaj-http_2.11-2.3.0.jar
[info] Including from cache: async-http-client-1.9.28.jar
[info] Including from cache: netty-3.10.3.Final.jar
[info] Checking every *.class/*.jar file's SHA-1.

I needed a way to configure Spark to use my netty-3.10.3.Final.jar instead of the older version used in Akka.

See also  Spark S3 Integration: A Comprehensive Guide

The solution was to use the “spark.driver.userClassPathFirst” configuration variable provided the answer.  This variable is described as

"Whether to give user-added jars precedence over Spark's own jars when loading classes in the the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature. This is used in cluster mode only."

So, I tried deploying again with this conf variable set as shown in the following screencast:

Challenge 3 Incompatible Jars between Spark and Scala program – Use Spark Jar

What to do if you want to use your jar instead of Spark’s version?  Essentially, this is the opposite the previously described Challenge 2.

I reviewed the output from “sbt assembly” and see slf4j was included in the assembly. Well, from the logs, we can see that Spark is already using slf4j and now our driver program is attempting to spawn another instance. Let’s use Spark’s already instantiated slf4j instead.

To remove or exclude certain jars from being included in the fat jar, hook into sbt-assembly plugin “excludedJars”

Update to build.sbt with the following:

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
  cp filter {
                i => i.data.getName == "slf4j-api-1.7.12.jar"
            }
}

And re-run “sbt assembly” and you’ll be ready to try another Spark deploy.

Everything worked as you can see in this screencast

Spark Troubleshooting Conclusion

This post presented three challenges and solutions when troubleshooting Apache Spark with Scala deploys to a Spark cluster.  We covered three scenarios:

  • Apache Spark with Scala 2.11
  • Setting Apache Spark jars precedence over yours by excluding from assembly
  • Giving your jar(s) precedence over comparable Apache Spark version of jar(s).
See also  Spark FAIR Scheduler Example

Further References

http://stackoverflow.com/questions/23330449/how-does-spark-use-netty/23333955#23333955

https://issues.apache.org/jira/browse/SPARK-4738

http://spark.apache.org/docs/latest/configuration.html

http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211

Featured Image credit https://flic.kr/p/jWwHNq

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment