赞
踩
Join 大大表 :
分而治之的调优思路:把复杂任务拆解成多个简单任务,再合并多个简单任务的计算结果
分而治之的计算过程:
内表拆分:要求每个子表的尺寸相对均匀, 且都小到进行广播变量
拆分的关键 : 选取的列,要让子表足够小 :
外表的重复扫描 :
解决数据重复扫描:
DPP 机制:
orders 和 transactions 都是事实表,都是 TB 级别 :
//orders 订单表
orderId: Int
customerId: Int
status: String
date: Date //分区键
//lineitems 交易明细表
orderId: Int //分区键
txId: Int
itemId: Int
price: Float
quantity: Int
每隔一段时间 ,计算上个季度所有订单的交易额 :
val query: String = "
select sum(tx.price * tx.quantity) as revenue,
o.orderId
from transactions as tx
inner join orders as o on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId
"
transactions 是外表,orders 是内表(较小)
//以date字段拆分内表
val query: String = "
select sum(tx.price * tx.quantity) as revenue,
o.orderId
from transactions as tx
inner join orders as o on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date = '2020-01-01'
group by o.orderId
"
内表拆分后,外表与所有子表做关联,把全部子关联的结果合并
//循环遍历 dates val dates: Seq[String] = Seq("2020-01-01", "2020-01-02",..."2020-03-31") for (date <- dates) { val query: String = s" select sum(tx.price * tx.quantity) as revenue, o.orderId from transactions as tx inner join orders as o on tx.orderId = o.orderId where o.status = 'COMPLETE' and o.date = ${date} group by o.orderId " val file: String = s"${outFile}/${date}" spark.sql(query).save.parquet(file) }
负隅顽抗 : 当内表没法均匀拆分,或外表没有分区键,不能利用 DPP,只能依赖 Shuffle Join,来完成 Join 大大表
默认 Shuffle Sort Merge Join
转为 Shuffle Hash Join
条件:
每个数据分片的切分 :
利用 Join Hints 选择 Shuffle Hash Join
select /*+ shuffle_hash(orders) */
sum(tx.price * tx.quantity) as revenue,
o.orderId
from transactions as tx inner
join orders as o on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId
Join 大大表数据倾斜情况 :
利用 AQE 解决自动倾斜处理。配置参数 :
spark.sql.adaptive.skewJoin.skewedPartitionFactor
: 判定倾斜的膨胀系数spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
: 判定倾斜的最低阈值spark.sql.adaptive.advisoryPartitionSizeInBytes
: 定义拆分粒度 (字节)AQE 自动倾斜处理 :
spark.sql.adaptive.advisoryPartitionSizeInBytes
把倾斜分区拆分为多个数据分区Task 的负载均衡 :
解决 Executors 的数据倾斜的方法 :分而治之/ 两阶段 Shuffle
分而治之 :
Shuffle Sort Merge Join
转为 Shuffle Hash Join
两阶段 Shuffle:
加盐、Shuffle、关联、聚合
去盐化、Shuffle、聚合
第一阶段:对倾斜 Join Keys 加盐 (粒度 : Executors 总数)
对外表进行随机加盐 :
内表进行复制加盐 :
第二阶段 :
orders 和 transactions 都 TB 级别的事实表,计算上个季度所有订单的交易额
//统计订单交易额的代码实现 val txFile: String = _ val orderFile: String = _ val transactions: DataFrame = spark.read.parquent(txFile) val orders: DataFrame = spark.read.parquent(orderFile) transactions.createOrReplaceTempView("transactions") orders.createOrReplaceTempView(“orders”) val query: String = " select sum(tx.price * tx.quantity) as revenue, o.orderId from transactions as tx inner join orders as o on tx.orderId = o.orderId where o.status = 'COMPLETE' and o.date between '2020-01-01' and '2020-03-31' group by o.orderId " val outFile: String = _ spark.sql(query).save.parquet(outFile)
把倾斜的 orderId 保存在数组 skewOrderIds 中,把均匀的 orderId 保持在数组 evenOrderIds 中
//根据Join Keys是否倾斜、将内外表分别拆分为两部分 import org.apache.spark.sql.functions.array_contains //将Join Keys分为两组,存在倾斜的、和分布均匀的 val skewOrderIds: Array[Int] = _ val evenOrderIds: Array[Int] = _ val skewTx: DataFrame = transactions.filter(array_contains(lit(skewOrderIds), $"orderId") val evenTx: DataFrame = transactions.filter(array_contains(lit(evenOrderIds), $"orderId") val skewOrders: DataFrame = orders.filter(array_contains(lit(skewOrderIds), $"orderId")) val evenOrders: DataFrame = orders.filter(array_contains(lit(evenOrderIds), $"orderId"))
对均匀数据,转为 Shuffle Hash Join:
//将分布均匀的数据分别注册为临时表
evenTx.createOrReplaceTempView("evenTx")
evenOrders.createOrReplaceTempView("evenOrders")
val evenQuery: String = "
select /*+ shuffle_hash(orders) */
sum(tx.price * tx.quantity) as revenue,
o.orderId
from evenTx as tx
inner join evenOrders as o on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId
"
val evenResults: DataFrame = spark.sql(evenQuery)
对外表做随机加盐,对内表做复制加盐
import org.apache.spark.sql.functions.udf //定义获取随机盐粒的UDF val numExecutors: Int = _ val rand = () => scala.util.Random.nextInt(numExecutors) val randUdf = udf(rand) //第一阶段的加盐。注意:保留 orderId 字段,用于二阶段的去盐化 //外表随机加盐 val saltedSkewTx = skewTx.withColumn("joinKey", concat($"orderId", lit("_"), randUdf())) //内表复制加盐 var saltedskewOrders = skewOrders.withColumn("joinKey", concat($"orderId", lit("_"), lit(1))) for (i <- 2 to numExecutors) { saltedskewOrders = saltedskewOrders union skewOrders.withColumn("joinKey", concat($"orderId", lit("_"), lit(i))) }
对加盐的两张表,进行查询 :
//将加盐后的数据分别注册为临时表 saltedSkewTx.createOrReplaceTempView(“saltedSkewTx”) saltedskewOrders.createOrReplaceTempView(“saltedskewOrders”) val skewQuery: String = " select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as initialReven, o.orderId, o.joinKey from saltedSkewTx as tx inner join saltedskewOrders as o on tx.joinKey = o.joinKey where o.status = 'COMPLETE' and o.date between '2020-01-01' and '2020-03-31' group by o.joinKey " //第一阶段: 加盐、Shuffle、关联、聚合后的初步结果 val skewInitialResults: DataFrame = spark.sql(skewQuery)
去盐化目的 :把计算的粒度,从加盐 joinKey 恢复为原来的 orderId
val skewResults: DataFrame =
skewInitialResults.select("initialRevenue", "orderId")
.groupBy(col("orderId"))
.agg(sum(col("initialRevenue")).alias("revenue"))
把倾斜结果和均匀结果进行合并,就能平衡 Executors 计算负载
evenResults union skewResults
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。