Spark Broadcast and Accumulators by Examples


What do we do when we need each Spark worker task to coordinate certain variables and values with each other?  This is when Spark Broadcast and Spark Accumulators may come into play.

Think about it.

Imagine we want each task to know the state of variables or values instead of simply independently returning action results back to the driver program.   If you are thinking of terms such as shared state or “stateful” vs. “stateless”, then you are on the right track.  

Or, that’s how I think of the Spark Broadcast and Accumulators.  

In this post, we’ll discuss two constructs of sharing variables across a Spark cluster and then review example Scala code.

Table of Contents

Spark Shared Variables

When functions are passed to a specific Spark operation, it is executed on a particular remote cluster node.  Usually, the operation is done in a way that different copy of variable(s) are used within the function. These particular variables are carefully copied into the different machines, and the updates to the variables in the said remote machines are not propagated back to the driver program. For this reason, one cannot support the general; read-write shared variables across the tasks and expects them to be efficient.

Nevertheless, Spark does provide two different types (limited) of shared variables to two known usage patterns.

  • Broadcast variables
  • Accumulators

Spark Broadcast Variables

Broadcast variables allow Spark developers to keep a secured read-only variable cached on different nodes, other than merely shipping a copy of it with the needed tasks. For instance, they can be used to give a node a copy of a large input dataset without having to waste time with network transfer I/O.

Spark has the ability to distribute broadcast variables using various broadcast algorithms which will in turn largely reduce the cost of communication.

See also  Begin Apache Spark Transformations in Scala [15 Examples]

Actions in Spark with Scala can be executed through different stages. These stages are separated by distributed “shuffle” operations. Within each stage, Spark automatically broadcasts common data needed in a cached, serialized form which will be de-serialized by each node before the running of each task.  For this reason, if you create broadcast variables explicitly, it should only be done when tasks across multiple stages are in need of the same data.

Broadcast variables are created by wrapping with SparkContext.broadcast function as shown in the following Scala code

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res2: Array[Int] = Array(1, 2, 3)

For more in-depth look, check out Spark Broadcast Variables tutorial.

Spark Accumulators

As you might assume from the name, Accumulators are variables which may be added to through associated operations.  There are many uses for accumulators including implementing counters or sums.  Spark supports the accumulation of numeric types easily, but programmers can add support for other types.  If there is a particular name for an accumulator in code, it is usually displayed in the Spark UI, which will be useful in understanding the running stage progress.

Accumulators are created from an initial value v; i.e. `SparkContext.accumulator(v)`.  Then the tasks running in the cluster can be added to it using the known “add method” or += operator in Scala. They cannot, however, read the value of it. The driver program has the ability to read the value of the accumulator, using the `value` method as shown below

scala> val accum = sc.accumulator(0, "Accumulator Example")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3)).foreach(x => accum += x)

scala> accum.value
res4: Int = 6

Spark Broadcast and Spark Accumulators Examples

With this background on broadcast and accumulators, let’s take a look at more extensive examples in Scala.  The context of the following example code is developing a web server log file analyzer for certain types of http status codes. We can easily imagine the advantages of using Spark when processing a large volume of log file data.  See the Resources section below for source code download links.

See also  Beginning Spark Actions in Scala [9 Popular Examples]

Let’s start with an object containing the `main` method and definition of one broadcast variable and numerous accumulators:

object Boot {

 import utils.Utils._

 def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf(true)
     .setMaster("local[2]")
     .setAppName("SparkAnalyzer")

   val sparkContext = new SparkContext(sparkConf)

   /**
     * Defining list of all HTTP status codes divided into status groups
     * This list is read only, and it is used for parsing access log file in order to count status code groups
     *
     * This example of broadcast variable shows how broadcast value
     */
   val httpStatusList = sparkContext broadcast populateHttpStatusList

   /**
     * Definition of accumulators for counting specific HTTP status codes
     * Accumulator variable is used because of all the updates to this variable in every executor is relayed back to the driver.
     * Otherwise they are local variable on executor and it is not relayed back to driver
     * so driver value is not changed
     */
   val httpInfo = sparkContext accumulator(0, "HTTP 1xx")
   val httpSuccess = sparkContext accumulator(0, "HTTP 2xx")
   val httpRedirect = sparkContext accumulator(0, "HTTP 3xx")
   val httpClientError = sparkContext accumulator(0, "HTTP 4xx")
   val httpServerError = sparkContext accumulator(0, "HTTP 5xx")

   /**
     * Iterate over access.log file and parse every line
     * for every line extract HTTP status code from it and update appropriate accumulator variable
     */
   sparkContext.textFile(getClass.getResource("/access.log").getPath, 2).foreach { line =>
     httpStatusList.value foreach {
       case httpInfoStatus: HttpInfoStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpInfoStatus))) => httpInfo += 1
       case httpSuccessStatus: HttpSuccessStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpSuccessStatus))) => httpSuccess += 1
       case httpRedirectStatus: HttpRedirectStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpRedirectStatus))) => httpRedirect += 1
       case httpClientErrorStatus: HttpClientErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpClientErrorStatus))) => httpClientError += 1
       case httpServerErrorStatus: HttpServerErrorStatus if (AccessLogParser.parseHttpStatusCode(line).equals(Some(httpServerErrorStatus))) => httpServerError += 1
       case _ =>
     }
   }

   println("########## START ##########")
   println("Printing HttpStatusCodes result from parsing access log")
   println(s"HttpStatusInfo : ${httpInfo.value}")
   println(s"HttpStatusSuccess : ${httpSuccess.value}")
   println(s"HttpStatusRedirect : ${httpRedirect.value}")
   println(s"HttpStatusClientError : ${httpClientError.value}")
   println(s"HttpStatusServerError : ${httpServerError.value}")
   println("########## END ##########")

   sparkContext.stop()
 }

}

As you can hopefully see above, we plan to use the `httpStatusList` when determining which accumulator to update.

See also  IntelliJ Scala and Apache Spark Happy Together

`populateHttpStatusList` is available from the import to Utils and looks like

object Utils {

  private val httpStatuses = List(
    "100", "101", "103",
    "200", "201", "202", "203", "204", "205", "206",
    "300", "301", "302", "303", "304", "305", "306", "307", "308",
    "400", "401", "402", "403", "404", "405", "406", "407", "408", "409", "410", "411", "412", "413", "414", "415", "416", "417",
    "500", "501", "502", "503", "504", "505", "511"
  )

  def populateHttpStatusList(): List[HttpStatus] = {
      httpStatuses map createHttpStatus
  }

  def createHttpStatus(status: String): HttpStatus = status match {
    case status if (status.startsWith("1")) => HttpInfoStatus(status)
    case status if (status.startsWith("2")) => HttpSuccessStatus(status)
    case status if (status.startsWith("3")) => HttpRedirectStatus(status)
    case status if (status.startsWith("4")) => HttpClientErrorStatus(status)
    case status if (status.startsWith("5")) => HttpServerErrorStatus(status)
  }

}

AccessLogParser could be considered a simple wrapper for regular expressions shown next:

object AccessLogParser extends Serializable {
  import Utils._

  private val ddd = "\\d{1,3}"
  private val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
  private val client = "(\\S+)"
  private val user = "(\\S+)"
  private val dateTime = "(\\[.+?\\])"
  private val request = "\"(.*?)\""
  private val status = "(\\d{3})"
  private val bytes = "(\\S+)"
  private val referer = "\"(.*?)\""
  private val agent = "\"(.*?)\""
  private val accessLogRegex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
  private val p = Pattern.compile(accessLogRegex)

  /**
    * Extract HTTP status code and create HttpStatus instance for given status code
    */
  def parseHttpStatusCode(logLine: String): Option[HttpStatus] = {
    val matcher = p.matcher(logLine)
    if(matcher.find) {
      Some(createHttpStatus(matcher.group(6)))
    }
    else {
      None
    }
  }

}

The credit for this regex mastery goes to alvinj at https://github.com/alvinj and in particular https://github.com/alvinj/ScalaApacheAccessLogParser/blob/master/src/main/scala/AccessLogParser.scala

Spark Broadcast Resources

Featured image credit https://flic.kr/p/wYHqe

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment