This recipe shows how to use accumulators and broadcast variables. Accumulators are used to aggregate values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. The other type of shared variable is the broadcast variable, which allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. Such variables are used in cases where the application needs to send a large, read-only lookup table to all the nodes.
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
The log file of workers from Spark log
$SPARK_HOME/logs
is taken, whose filename looks like this:spark-padma-org.apache.spark.deploy.worker.Worker-1-blrrndtipdl19
. Place this file in HDFS. This log file contains Spark log information with different trace levels, such asDEBUG
,INFO
,WARN
, andERROR
. The sample data looks as follows:Let's work with an accumulator now:
val sc = new SparkContext val logFile = sc.textFile("hdfs://namenodeHostName:8020/data/spark- worker-Worker1.out") val errorLines = sc.accumulator(0) val debugLines = logFile.map{line => if(line.contains("ERROR")) errorLines +=1 if(line.contains("DEBUG"))line } debugLines.saveAsTextFile("hdfs://namenodeHostName:8020/data /out/ debugLines.txt") println("ERROR Lines: "+ errorLines.value)
Now create a broadcast variable and use it in the workers as follows:
val sc = new SparkContext val broadCastedTemperatures = sc.broadcast(Map("KOCHI" -> 22,"BNGLR" -> 22, "HYD" -> 24, "MUMBAI" -> 21, "DELHI" -> 17, "NOIDA" -> 19, "SIMLA" -> 9)) val inputRdd = sc.parallelize(Array("BNGLR",20), ("BNGLR",16), ("KOCHI",-999), ("SIMLA",-999), ("DELHI",19, ("DELHI",-999), ("MUMBAI",27), ("MUMBAI",-999), ("HYD",19), ("HYD",25), ("NOIDA",-999) ) val replacedRdd = inputRdd.map{case(location, temperature) => val standardTemperatures = broadCastedTemperatures.value if(temperature == -999 && standardTemperatures.get(location) != None) (location, standardTemperatures.get(location).get) else if(temperature != -999) (location, temperature ) } val locationsWithMaxTemperatures = replacedRdd.reduceByKey{(temp1, temp2) => if (temp1 > temp2) temp1 else temp2}
Initially, when working with accumulators, we created Accumulator[Int]
, called errorLines
, and added 1
to it whenever we saw a line that contained ERROR
. We will see the correct count for errorLines
only after the saveAsTextFile()
action runs because the transformation map()
is lazy, so the side-effect, incrementing the accumulator happens only when the map()
is forced to occur by saveAsTextFile()
. The return type of the accumulator would be the org.apache.spark.Accumulator[T]
object where T
is the type of the value.
Well, coming to broadcast variables, SparkContext.broadcast
creates a broadcast variable of type Broadcast[T]
. T
is of any type and it should be serializable. The value of the broadcast variable is accessed using the value
property. The variable is sent to each node only once and is read-only.
Spark has support for custom accumulator types. They need to extend AccumulatorParam
.
Tip
For additional information on this, please visit: http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka.
Also, when working with broadcast variables, it is essential to choose a serialization format which is fast and compact.
Tip
For more information on broadcast variables, please refer: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables.