赞
踩
程序原逻辑是从一张kudu表读数据进行后续统计,
需求是修改为从另外两张不同的kudu表读数据,合并两张表的数据进行后续统计。
于是修改的逻辑简化后是如下两个步骤:
1、从两张kudu表读数据,并且分别注册成临时表
- import org.apache.kudu.spark.kudu._
- val kudu_table1 = spark.read.options(
- Map("kudu.master" -> kuduMaster,
- "kudu.table" -> Table1)
- ).kudu
- .select("col1", "col2", "col3")
- .where(s"col3= substr(${Time},1,8)")
- .distinct()
- kudu_table1.createOrReplaceTempView("kudu_table1")
-
- val kudu_table2 = spark.read.options(
- Map("kudu.master" -> kuduMaster,
- "kudu.table" -> Table2)
- ).kudu
- .select("col1", "col2", "col3")
- .where(s"col3= substr(${Time},1,8)")
- .distinct()
- kudu_table2.createOrReplaceTempView("kudu_table2")
2、使用sparkSQL,合并两张表进行统计,通过phoenix写入Hbase
- val DF1 = spark.sql("""
- |with union_data as (
- | select col1,
- | col2,
- | col3
- | from kudu_table1
- | union all
- | select col1,
- | col2,
- | col3
- | from kudu_table2
- |)
- |select ...
- | from ...
- | where ...
- | group by ...
- | grouping sets(...)
- | )""".stripMargin
- ).persist()
- PhoenixJdbcUtil.saveToPhoenix(DF1 , phoenixTable, phoenixZk)
测试后发现,运行时间极慢,原本运行一个批次只需要十几秒,现在修改后需要八九分钟。
后优化如下:
1、从两张kudu表读数据,对两个DataFrame先进行DF的union,union后注册成一个临时表
- import org.apache.kudu.spark.kudu._
- val kudu_table1 = spark.read.options(
- Map("kudu.master" -> kuduMaster,
- "kudu.table" -> Table1)
- ).kudu
- .select("col1", "col2", "col3")
- .where(s"col3= substr(${Time},1,8)")
- .distinct()
-
- val kudu_table2 = spark.read.options(
- Map("kudu.master" -> kuduMaster,
- "kudu.table" -> Table2)
- ).kudu
- .select("col1", "col2", "col3")
- .where(s"col3= substr(${Time},1,8)")
- .distinct()
-
- //主要修改点:对DF进行union,并去重以减少后续sparkSQL计算的数据量,注册成一张临时表
- val union = kudu_table1.union(kudu_table2).distinct()
- union.createOrReplaceTempView("kudu_table")
2、使用sparkSQL,使用上述的一张临时表kudu_table直接进行统计,通过phoenix写入Hbase
- val DF1 = spark.sql("""
- |select ...
- | from kudu_table
- | where ...
- | group by ...
- | grouping sets(...)
- | )""".stripMargin
- ).persist()
- PhoenixJdbcUtil.saveToPhoenix(DF1 , phoenixTable, phoenixZk)
经测试,优化后的程序运行一个批次只需1分钟左右,运行时间减少好几倍。
具体原理不明(有知道原理的小伙伴可以在下面评论一下,嘿嘿)。但应该是SparkSQL由于要解析SQL,效率上不如dataFrame操作效率高。
2019.10.30补充:
运行慢有可能是sql的问题,之前where条件中使用到了从concat(day,min) = '...',经测试,此做法计算极慢。现在是where day='...' and min='...',该方式效率提高很多。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。