赞
踩
应用场景:广播变量用于在集群的各个节点的executor 中高效的分发一个只读的变量副本
操作原理:创建一个广播变量时,spark会将变量序列化并发送到每一个executor,每一个executor存一个副本,而不需要每次执行任务重新重driver或其他节点来获取,spark任务涉及到对该变量的访问时就只需要从本地executor内存中来获取即可,避免了在网络中频繁的传输大量重复的数据
目的: 就是为了减少冗余数据在网络中的传输,提升了分布式计算环境下的访问速度和整体的性能
广播变量适合场景: 那些需要在多个任务中共享, 且数据量适中可以存在executor内存中的情况
应用场景: 在处理大规模的join操作时,其中一方的数据集明显较小,就是所谓的小表就可以通过广播join, 将小的数据集加载到每一个map task的executor的内存中,那种join操作中某些key对应的键的record数量远大于其他key的record数量,导致在reducer task 严重的负载不均衡问题
操作原理:广播join 发生在map端非reduce 端无需通过网络传输shuffle就可以到达reduce 端,减少I/O,hive将小表的数据全部加载到map task 的内存中作为广播变量,然后对每一条大表的record ,在本地内存中查找是否有匹配的小表record ,并立即完成join操作,
由于每个join 在每个map task内部独立完成 ,无需经过网络shuffle 就可以到达reduce 端,进而大大减少了网络传输和磁盘i/o,消除了因为数据倾斜造成的
目的: 主要目的是利用小表数据可以被广播到所有的map task 内存的特点 提前在map 端完成了join 操作,避免了常规的join操作中需要的昂贵的shuffle过程,特别是适合于数据倾斜问题,显著提高了join的操作效率
这种策略的前提是小表的数据必须要足够小,能够被全部加载到每个节点的内存中
代码:
-
- def main(args: Array[String]): Unit = {
- /**
- * rdd 处理数据倾斜-广播join
- */
- val spark = SparkSession
- .builder()
- .appName("MapSideBroadcastJoin")
- .getOrCreate()
-
- // 假设已经读取了大表和小表的数据,并转换为RDD[(String, String)]
- // 大表
- val bigTable: RDD[(String, String)] = spark.sparkContext.textFile("hdfs://path/to/big_table.csv")
- .map(line => line.split(",")).map(arr => (arr(0), arr(1)))
-
- // 小表
- val smallTable: RDD[(String, String)] = spark.sparkContext.textFile("hdfs://path/to/small_table.csv")
- .map(line => line.split(",")).map(arr => (arr(0), arr(1)))
-
- // 将小表转换为本地集合并广播
- val smallTableBroadcast: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(
- smallTable.collectAsMap().toMap
- )
-
- // 定义Map函数,执行Map-Side JOIN
- def joinWithSmallTable(bigRecord: (String, String)): Option[(String, (String, String))] = {
- smallTableBroadcast.value.get(bigRecord._1).map { smallValue =>
- (bigRecord._1, (bigRecord._2, smallValue))
- }
- }
-
- // 应用Map函数,过滤掉没有匹配的小表记录
- val joinedData: RDD[(String, (String, String))] = bigTable.flatMap(joinWithSmallTable)
-
- // 打印或进一步处理joinedData
- joinedData.foreach(println)
-
- spark.stop()
都是基于优化分布式计算思想
关于使用广播join 使用map-side 函数在map端进行merage 在本地executor 中完成join过程
减少网络传输, 以及减少磁盘的I/O
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。