目录
- 累加器(Accumulator)简介
累加器使用陷阱
- 原因分析&解决方案
累加器(Accumulator)简介
累加器(Accumulator)是Spark提供的累加器,顾名思义,该变量只能够增加。由Driver端进行初始变量,Task再对声明的变量进行累加操作。
可以为Accumulator命名,这样就会在Spark web ui中看到每个节点的计数,以及累加后的值,可以帮助你了解程序运行的情况。
累加器使用的陷阱
在前段时间写项目时用累加器稽核数据量,结果发现稽核的数据输入量和输出量明显不同,此时要么是程序存在问题,要么是累加器使用有问题,从最终生成的结果文件中可以看出,是累加器的使用问题
下面来看一个Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val conf = new SparkConf()
.setAppName("Accumulator Demo")
.setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val example = sc.longAccumulator("Example")
val byKey = sc
.parallelize(1 to 10)
.map(x=>{
if(x%2==1){
example.add(-1)
("奇数",1)
}else{
("偶数",1)
}
})
byKey.foreach(println(_))
println("累加后的值:"+example.value)
println(byKey.count())
println("累加后的值:"+example.value)
结果:
可以看出,如果一个算子在最终计算两次,则累加器也会同样增加两次
那我们如果将涉及到累加的算子缓存会怎么样呢,修改部分代码
1
2
3
4
5
6
7
8
9
10
val byKey = sc
.parallelize(1 to 10)
.map(x=>{
if(x%2==1){
example.add(1)
("奇数",1)
}else{
("偶数",1)
}
}).persist() //将计算结果进行缓存
结果:
原因分析&解决方案
官方对这个问题的解释如下描述:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。
所以在第一次foreach(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。
之后又对新产生的的byKey进行了一次count(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。
既然已经知道了造成的原因,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。当然也可以通过切断依赖关系,例如触发一次Shuffle,Spark 会自动缓存Shuffle后生成的RDD(使用的Spark2.1,其他版本暂时不清楚),当然也可以通过Cache()、Persist()进行切断
参考