当前位置:   article > 正文

记一次Spark读取多个kudu表的优化经历_spark同时读取多个表

spark同时读取多个表

程序原逻辑是从一张kudu表读数据进行后续统计,

需求是修改为从另外两张不同的kudu表读数据,合并两张表的数据进行后续统计。

于是修改的逻辑简化后是如下两个步骤:

1、从两张kudu表读数据,并且分别注册成临时表

  1. import org.apache.kudu.spark.kudu._
  2. val kudu_table1 = spark.read.options(
  3. Map("kudu.master" -> kuduMaster,
  4. "kudu.table" -> Table1)
  5. ).kudu
  6. .select("col1", "col2", "col3")
  7. .where(s"col3= substr(${Time},1,8)")
  8. .distinct()
  9. kudu_table1.createOrReplaceTempView("kudu_table1")
  10. val kudu_table2 = spark.read.options(
  11. Map("kudu.master" -> kuduMaster,
  12. "kudu.table" -> Table2)
  13. ).kudu
  14. .select("col1", "col2", "col3")
  15. .where(s"col3= substr(${Time},1,8)")
  16. .distinct()
  17. kudu_table2.createOrReplaceTempView("kudu_table2")

2、使用sparkSQL,合并两张表进行统计,通过phoenix写入Hbase

  1. val DF1 = spark.sql("""
  2. |with union_data as (
  3. | select col1,
  4. | col2,
  5. | col3
  6. | from kudu_table1
  7. | union all
  8. | select col1,
  9. | col2,
  10. | col3
  11. | from kudu_table2
  12. |)
  13. |select ...
  14. | from ...
  15. | where ...
  16. | group by ...
  17. | grouping sets(...)
  18. | )""".stripMargin
  19. ).persist()
  20. PhoenixJdbcUtil.saveToPhoenix(DF1 , phoenixTable, phoenixZk)

测试后发现,运行时间极慢,原本运行一个批次只需要十几秒,现在修改后需要八九分钟。

后优化如下:

1、从两张kudu表读数据,对两个DataFrame先进行DF的union,union后注册成一个临时表

  1. import org.apache.kudu.spark.kudu._
  2. val kudu_table1 = spark.read.options(
  3. Map("kudu.master" -> kuduMaster,
  4. "kudu.table" -> Table1)
  5. ).kudu
  6. .select("col1", "col2", "col3")
  7. .where(s"col3= substr(${Time},1,8)")
  8. .distinct()
  9. val kudu_table2 = spark.read.options(
  10. Map("kudu.master" -> kuduMaster,
  11. "kudu.table" -> Table2)
  12. ).kudu
  13. .select("col1", "col2", "col3")
  14. .where(s"col3= substr(${Time},1,8)")
  15. .distinct()
  16. //主要修改点:对DF进行union,并去重以减少后续sparkSQL计算的数据量,注册成一张临时表
  17. val union = kudu_table1.union(kudu_table2).distinct()
  18. union.createOrReplaceTempView("kudu_table")

2、使用sparkSQL,使用上述的一张临时表kudu_table直接进行统计,通过phoenix写入Hbase

  1. val DF1 = spark.sql("""
  2. |select ...
  3. | from kudu_table
  4. | where ...
  5. | group by ...
  6. | grouping sets(...)
  7. | )""".stripMargin
  8. ).persist()
  9. PhoenixJdbcUtil.saveToPhoenix(DF1 , phoenixTable, phoenixZk)

经测试,优化后的程序运行一个批次只需1分钟左右,运行时间减少好几倍。

具体原理不明(有知道原理的小伙伴可以在下面评论一下,嘿嘿)。但应该是SparkSQL由于要解析SQL,效率上不如dataFrame操作效率高。

 

2019.10.30补充:

运行慢有可能是sql的问题,之前where条件中使用到了从concat(day,min) = '...',经测试,此做法计算极慢。现在是where day='...' and min='...',该方式效率提高很多。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/417472
推荐阅读
相关标签
  

闽ICP备14008679号