What do we do when we need each Spark worker task to coordinate certain variables and values with each other? This is when Spark Broadcast and Spark Accumulators may come into play.
Think about it.
Imagine we want each task to know the state of variables or values instead of simply independently returning action results back to the driver program. If you are thinking of terms such as shared state or “stateful” vs. “stateless”, then you are on the right track.
Or, that’s how I think of the Spark Broadcast and Accumulators.
In this post, we’ll discuss two constructs of sharing variables across a Spark cluster and then review example Scala code.
Table of Contents
- Spark Shared Variables
- Spark Broadcast Variables
- Spark Accumulators
- Spark Broadcast and Spark Accumulators Examples
- Spark Broadcast Resources
Spark Shared Variables
When functions are passed to a specific Spark operation, it is executed on a particular remote cluster node. Usually, the operation is done in a way that different copy of variable(s) are used within the function. These particular variables are carefully copied into the different machines, and the updates to the variables in the said remote machines are not propagated back to the driver program. For this reason, one cannot support the general; read-write shared variables across the tasks and expects them to be efficient.
Nevertheless, Spark does provide two different types (limited) of shared variables to two known usage patterns.
- Broadcast variables
- Accumulators
Spark Broadcast Variables
Broadcast variables allow Spark developers to keep a secured read-only variable cached on different nodes, other than merely shipping a copy of it with the needed tasks. For instance, they can be used to give a node a copy of a large input dataset without having to waste time with network transfer I/O.
Spark has the ability to distribute broadcast variables using various broadcast algorithms which will in turn largely reduce the cost of communication.
Actions in Spark with Scala can be executed through different stages. These stages are separated by distributed “shuffle” operations. Within each stage, Spark automatically broadcasts common data needed in a cached, serialized form which will be de-serialized by each node before the running of each task. For this reason, if you create broadcast variables explicitly, it should only be done when tasks across multiple stages are in need of the same data.
Broadcast variables are created by wrapping with SparkContext.broadcast
function as shown in the following Scala code
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res2: Array[Int] = Array(1, 2, 3)
For more in-depth look, check out Spark Broadcast Variables tutorial.
Spark Accumulators
As you might assume from the name, Accumulators are variables which may be added to through associated operations. There are many uses for accumulators including implementing counters or sums. Spark supports the accumulation of numeric types easily, but programmers can add support for other types. If there is a particular name for an accumulator in code, it is usually displayed in the Spark UI, which will be useful in understanding the running stage progress.
Accumulators are created from an initial value v; i.e. `SparkContext.accumulator(v)`. Then the tasks running in the cluster can be added to it using the known “add method” or += operator in Scala. They cannot, however, read the value of it. The driver program has the ability to read the value of the accumulator, using the `value` method as shown below
scala> val accum = sc.accumulator(0, "Accumulator Example")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)
scala> accum.value
res4: Int = 6
Spark Broadcast and Spark Accumulators Examples
With this background on broadcast and accumulators, let’s take a look at more extensive examples in Scala. The context of the following example code is developing a web server log file analyzer for certain types of http status codes. We can easily imagine the advantages of using Spark when processing a large volume of log file data. See the Resources section below for source code download links.
Let’s start with an object containing the `main` method and definition of one broadcast variable and numerous accumulators:
object Boot {
import utils.Utils._
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf(true)
.setMaster("local[2]")
.setAppName("SparkAnalyzer")
val sparkContext = new SparkContext(sparkConf)
/**
* Defining list of all HTTP status codes divided into status groups
* This list is read only, and it is used for parsing access log file in order to count status code groups
*
* This example of broadcast variable shows how broadcast value
*/
val httpStatusList = sparkContext broadcast populateHttpStatusList
/**
* Definition of accumulators for counting specific HTTP status codes
* Accumulator variable is used because of all the updates to this variable in every executor is relayed back to the driver.
* Otherwise they are local variable on executor and it is not relayed back to driver
* so driver value is not changed
*/
val httpInfo = sparkContext accumulator(0, "HTTP 1xx")
val httpSuccess = sparkContext accumulator(0, "HTTP 2xx")
val httpRedirect = sparkContext accumulator(0, "HTTP 3xx")
val httpClientError = sparkContext accumulator(0, "HTTP 4xx")
val httpServerError = sparkContext accumulator(0, "HTTP 5xx")
/**
* Iterate over access.log file and parse every line
* for every line extract HTTP status code from it and update appropriate accumulator variable
*/
sparkContext.textFile(getClass.getResource("/access.log").getPath, 2).foreach { line =>
httpStatusList.value foreach {
case httpInfoStatus: HttpInfoStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpInfoStatus))) => httpInfo += 1
case httpSuccessStatus: HttpSuccessStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpSuccessStatus))) => httpSuccess += 1
case httpRedirectStatus: HttpRedirectStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpRedirectStatus))) => httpRedirect += 1
case httpClientErrorStatus: HttpClientErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpClientErrorStatus))) => httpClientError += 1
case httpServerErrorStatus: HttpServerErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpServerErrorStatus))) => httpServerError += 1
case _ =>
}
}
println("########## START ##########")
println("Printing HttpStatusCodes result from parsing access log")
println(s"HttpStatusInfo : ${httpInfo.value}")
println(s"HttpStatusSuccess : ${httpSuccess.value}")
println(s"HttpStatusRedirect : ${httpRedirect.value}")
println(s"HttpStatusClientError : ${httpClientError.value}")
println(s"HttpStatusServerError : ${httpServerError.value}")
println("########## END ##########")
sparkContext.stop()
}
}
As you can hopefully see above, we plan to use the `httpStatusList` when determining which accumulator to update.
`populateHttpStatusList` is available from the import to Utils and looks like
object Utils {
private val httpStatuses = List(
"100", "101", "103",
"200", "201", "202", "203", "204", "205", "206",
"300", "301", "302", "303", "304", "305", "306", "307", "308",
"400", "401", "402", "403", "404", "405", "406", "407", "408", "409", "410", "411", "412", "413", "414", "415", "416", "417",
"500", "501", "502", "503", "504", "505", "511"
)
def populateHttpStatusList(): List[HttpStatus] = {
httpStatuses map createHttpStatus
}
def createHttpStatus(status: String): HttpStatus = status match {
case status if (status.startsWith("1")) => HttpInfoStatus(status)
case status if (status.startsWith("2")) => HttpSuccessStatus(status)
case status if (status.startsWith("3")) => HttpRedirectStatus(status)
case status if (status.startsWith("4")) => HttpClientErrorStatus(status)
case status if (status.startsWith("5")) => HttpServerErrorStatus(status)
}
}
AccessLogParser could be considered a simple wrapper for regular expressions shown next:
object AccessLogParser extends Serializable {
import Utils._
private val ddd = "\\d{1,3}"
private val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
private val client = "(\\S+)"
private val user = "(\\S+)"
private val dateTime = "(\\[.+?\\])"
private val request = "\"(.*?)\""
private val status = "(\\d{3})"
private val bytes = "(\\S+)"
private val referer = "\"(.*?)\""
private val agent = "\"(.*?)\""
private val accessLogRegex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
private val p = Pattern.compile(accessLogRegex)
/**
* Extract HTTP status code and create HttpStatus instance for given status code
*/
def parseHttpStatusCode(logLine: String): Option[HttpStatus] = {
val matcher = p.matcher(logLine)
if(matcher.find) {
Some(createHttpStatus(matcher.group(6)))
}
else {
None
}
}
}
The credit for this regex mastery goes to alvinj at https://github.com/alvinj and in particular https://github.com/alvinj/ScalaApacheAccessLogParser/blob/master/src/main/scala/AccessLogParser.scala
Spark Broadcast Resources
- All the source from above is available at Github: https://github.com/tmcgrath/spark-scala/tree/master/accessloganalyzer
- For a perspective on Broadcast and Accumulators for the Spark Programming Guide see http://spark.apache.org/docs/latest/programming-guide.html#shared-variables
- For more tutorials for both Scala and Python, check Spark Tutorial landing page and for Spark with Scala tutorials for Scala in particular
Featured image credit https://flic.kr/p/wYHqe