当前位置:   article > 正文

Spark 的共享变量之累加器和广播变量_spark 是否可以广播一个自定义函数

spark 是否可以广播一个自定义函数

前言

本期将介绍下 Spark 编程中两种类型的共享变量:累加器和广播变量。
简单说,累加器是用来对信息进行聚合的,而广播变量则是用来高效分发较大对象的。

学习目标

  • 闭包的概念
  • 累加器的原理
  • 广播变量的原理

1. 闭包的概念

在讲共享变量之前,我们先了解下啥是闭包,代码如下。

var n = 1
val func = (i:Int) => i + n
  • 1
  • 2

函数 func 中有两个变量 n 和 i ,其中 i 为该函数的形式参数,也就是入参,在 func 函数被调用时, i 会被赋予一个新的值,我们称之为绑定变量(bound variable)。而 n 则是定义在了函数 func 外面的,该函数并没有赋予其任何值,我们称之为自由变量(free variable)。

像 func 函数这样,返回结果依赖于声明在函数外部的一个或多个变量,将这些自由变量捕获并构成的封闭函数,我们称之为“闭包”。

先看一个累加求和的栗子,如果在集群模式下运行以下代码,会发现结果并非我们所期待的累计求和。

var sum = 0
val arr = Array(1,2,3,4,5)
sc.parallelize(arr).foreach(x => sum + x)
println(sum)
  • 1
  • 2
  • 3
  • 4

sum 的结果为0,导致这个结果的原因就是存在闭包。

在集群中 Spark 会将对 RDD 的操作处理分解为 Tasks ,每个 Task 由 Executor 执行。而在执行之前,Spark会计算 task 的闭包(也就是 foreach() )。闭包会被序列化并发送给每个 Executor,但是发送给 Executor 的是副本,所以在 Driver 上输出的依然是 sum 本身。

在这里插入图片描述

如果想对 sum 变量进行更新,则就要用到接下来我们要讲的累加器。

2. 累加器的原理

累加器是对信息进行聚合的,通常在向 Spark 传递函数时,比如使用 map() 或者 filter() 传条件时,可以使用 Driver 中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,然而,正如前面所述,更新这些副本的值,并不会影响到 Driver 中对应的变量。

累加器则突破了这个限制,可以将工作节点中的值聚合到 Driver 中。它的一个典型用途就是对作业执行过程中的特定事件进行计数。

举个栗子,给了一个日志记录,需要统计这个文件中有多少空行。

val sc = new SparkContext(...)
val logs = sc.textFile(...)
val blanklines = sc.accumulator(0)
val callSigns = logs.flatMap(line => {
	if(line == ""){
		blanklines += 1
	}
	line.split("")
})
callSigns.count()
println("日志中的空行数为:" + blanklines.value)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

总结下累加器的使用,首先 Driver 调用了 SparkContext.accumulator(initialValue) 方法,创建一个名为 blanklines 且初始值为0的累加器。然后在遇到空行时,Spark 闭包里的执行器代码就会对其 +1 。执行完成之后,Driver 可以调用累加器的 value 属性来访问累加器的值。

需要说明的是,只有在行动算子 count() 运行之后,才可以 println 出正确的值,因为我们之前讲过 flatMap() 是惰性计算的,只有遇到行动操作之后才会出发强制执行运算进行求值。

另外,工作节点上的任务是不可以访问累加器的值,在这些任务看来,累加器是一个只写的变量。

对于累加器的使用,不仅可以进行数据的 sum 加法,也可以跟踪数据的最大值 max、最小值 min等。

3. 广播变量的原理

前面说了,Spark 会自动把闭包中所有引用到的自由变量发送到工作节点上,那么每个 Task 的闭包都会持有自由变量的副本。如果自由变量的内容很大且 Task 很多的情况下,为每个 Task 分发这样的自由变量的代价将会巨大,必然会对网络 IO 造成压力。

广播变量则突破了这个限制,不是把变量副本发给所有的 Task ,而是将其分发给所有的工作节点一次,这样节点上的 Task 可以共享一个变量副本。

Spark 使用的是一种高效的类似 BitTorrent 的通信机制,可以降低通信成本。广播的数据只会被发动各个节点一次,除了 Driver 可以修改,其他节点都是只读,并且广播数据是以序列化形式缓存在系统中的,当 Task 需要数据时对其反序列化操作即可。

在使用中,Spark 可以通过调用 SparkContext.broadcast(v) 创建广播变量,并通过调用 value 来访问其值,举栗代码如下:

val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
  • 1
  • 2

以上是本期分享,如有帮助请 点赞+关注+收藏 支持下哦~
下期继续讲解 RDD 内容。

往期精彩内容回顾:

1 - Spark 概述(入门必看)
2 - Spark 的模块组成
3 - Spark 的运行原理
4 - RDD 概念以及核心结构
5 - Spark RDD 的宽窄依赖关系
6 - 详解 Spark RDD 的转换操作与行动操作
7 - Spark RDD 中常用的操作算子
可扫码关注

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/679948
推荐阅读
相关标签
  

闽ICP备14008679号