当前位置:   article > 正文

spark sql优化:小表大表关联优化 & union替换or & broadcast join

sparksql的小表关联大表优化

----原语句(运行18min)

  1. SELECT
  2. bb.ip
  3. FROM
  4. (
  5. SELECT
  6. ip ,
  7. sum(click) click_num,
  8. round(sum(click) / sum(imp), 4) user_click_rate
  9. FROM
  10. schema.srctable1
  11. WHERE
  12. date = '20171020'
  13. AND ip IS NOT NULL
  14. AND imp > 0
  15. GROUP BY ip
  16. ) bb
  17. LEFT OUTER JOIN
  18. (
  19. SELECT
  20. round(sum(click) / sum(imp), 4) avg_click_rate
  21. FROM
  22. schema.srctable1
  23. WHERE
  24. date = '20171020'
  25. ) aa
  26. LEFT OUTER JOIN schema.dstable cc
  27. on cc.ip = bb.ip
  28. WHERE cc.ip is null
  29. AND
  30. (
  31. bb.user_click_rate > aa.avg_click_rate * 3
  32. AND click_num > 500
  33. )
  34. OR
  35. (
  36. click_num > 1000
  37. )

分析:

1、aa表存放的就是一个指标数据,1条记录,列为小表
2、bb表存放的是按ip聚合的明细数据,记录很多,列为大表
3、cc表用来过滤ip,数量也很小,列为过滤表,作用很小。
查看执行计划,发现bb与aa进行left outer join时,引发了shuffle过程,造成大量的磁盘及网络IO,影响性能。

解决策略

优化方案1:调整大小表位置,将小表放在左边后,提升至29s (该方案一直不太明白为啥会提升,执行计划里显示的也就是大小表位置调换下而已,跟之前的没其他区别)
优化方案2: 将 or 改成 union,提升至35s(各种调整,一直怀疑跟or有关系,后面调整成union其他不变,果真效率不一样;但方案1只是调整了下大小表顺序,并未调整其他,其效率同样提升很大;不太明白sparksql内部到底走了什么优化机制,后面继续研究);

优化方案3: 采用cache+broadcast方式,提升至20s(该方案将小表缓存至内存,进行map侧关联)

方案具体实施

----方案2:or 改成 union(运行35s)

  1. select aa.ip
  2. from (
  3. SELECT bb.ip ip
  4. FROM
  5. (
  6. SELECT
  7. ip ,
  8. sum(click) click_num,
  9. round(sum(click) / sum(imp), 4)
  10. user_click_rate
  11. FROM
  12. schema.srctable1
  13. WHERE
  14. date = '20171020'
  15. AND ip IS NOT NULL
  16. AND imp > 0
  17. GROUP BY ip
  18. ) bb
  19. LEFT OUTER JOIN
  20. (
  21. SELECT round(sum(click) / sum(imp), 4) avg_click_rate
  22. FROM schema.srctable1
  23. WHERE date = '20171020'
  24. ) aa
  25. WHERE ( bb.user_click_rate > aa.avg_click_rate * 3
  26. AND click_num > 20 )
  27. union
  28. SELECT
  29. bb.ip ip
  30. FROM
  31. (
  32. SELECT
  33. ip , sum(click) click_num,
  34. round(sum(click) / sum(imp), 4) user_click_rate
  35. FROM schema.srctable1
  36. WHERE
  37. date = '20171020'
  38. AND ip IS NOT NULL
  39. AND imp > 0
  40. GROUP BY ip
  41. ) bb
  42. LEFT OUTER JOIN
  43. (
  44. SELECT
  45. round(sum(click) / sum(imp), 4) avg_click_rate
  46. FROM schema.srctable1
  47. WHERE date = '20171020'
  48. ) aa
  49. WHERE click_num > 40
  50. ) aa
  51. LEFT OUTER JOIN schema.dstable cc
  52. on aa.ip = cc.ip
  53. where cc.ip is null

-----cache+broadcast方式(20s)
原理:使用broadcast将会把小表分发到每台执行节点上,因此,关联操作都在本地完成,基本就取消了shuffle的过程,运行效率大幅度提高。

  1. cache table cta
  2. as
  3. SELECT round(sum(click) / sum(imp), 4) avg_click_rate
  4. FROM schema.srctable1
  5. WHERE date = '20171020';
  6. INSERT into TABLE schema.dstable
  7. SELECT bb.ip
  8. FROM (
  9. SELECT
  10. ip ,
  11. sum(click) click_num,
  12. round(sum(click) / sum(imp), 4) user_click_rate
  13. FROM schema.srctable1
  14. WHERE
  15. date = '20171020'
  16. AND ip IS NOT NULL
  17. AND imp > 0
  18. GROUP BY ip
  19. ) bb
  20. LEFT OUTER JOIN cta aa
  21. LEFT OUTER JOIN schema.dstable cc
  22. on cc.ip = bb.ip
  23. WHERE cc.ip is null
  24. AND (
  25. bb.user_click_rate > aa.avg_click_rate * 3
  26. AND click_num > 500
  27. )
  28. OR(
  29. click_num > 1000
  30. )

注意:
cache 表不一定会被广播到Executor,执行map side join,还受另外一个参数:spark.sql.autoBroadcastJoinThreshold影响,该参数判断是否将该表广播;
spark.sql.autoBroadcastJoinThreshold参数默认值是10M,所以只有cache的表小于10M的才被广播到Executor上去执行map side join。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/725033
推荐阅读
相关标签
  

闽ICP备14008679号