赞
踩
目的:为了解决kafka数据积压而造成的影响数据合并准确性问题
方式:用check1和check2两个脚本,check1只启动一个,做判断并给值,如果当前表有今天的ts说明可以合并昨天数据,则标志位设置为1,否则设置为0,如果满足条件的表到一定比例就可以大致确定没有积压,数据合并是没有问题的,则填入一个表名为all的表,此时所有表都可参与合并工作。check2有多少个ods表就启动多少个,查询check1所输出的表,判断是否执行ods合并,或等到一个表名为all的表,强制其开始合并。
- CREATE EXTERNAL TABLE tmp.check1 (
- tablename STRING,
- check1 INT,
- ctime STRING)
- PARTITIONED BY (dt STRING)
- STORED AS PARQUET;
说明:
每天一个分区,check1记录当天合并前每张表的最终状态值(是否有当天凌晨ts),为1或0,ctime记录合并前最后一次赋值入表时的时间
#!/bin/bash value=0.8 time=03 while ((1)) do hive -e " set mapreduce.map.memory.mb=512; set mapreduce.reduce.memory.mb=10240; set mapred.max.split.size=300000000; set mapred.min.split.size.per.node=300000000; set mapred.min.split.size.per.rack=300000000; set mapred.reduce.tasks=1; set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table tmp.check1 partition(dt) select \`table\`, if(from_unixtime(bigts, 'yyyy-MM-dd')=dt, 1, 0), from_unixtime(unix_timestamp()), current_date() dt from ( select \`table\`, dt, max(cast(get_json_object(content,'$.ts') as bigint)) bigts from ods_binlog.ods_binlog where dt = current_date() group by \`table\`,dt )t " QUAN=$(hive -S -e " select count(if(check1=1, 1, null))/count(*) from tmp.check1 ") if [[ `echo "${QUAN} > ${value}" | bc` -eq 1 || $(date "+%Y-%m-%d-%H") = $(date "+%Y-%m-%d")-${time} ]] then hive -e" set mapreduce.map.memory.mb=512; set hive.exec.dynamic.partition.mode=nonstrict; insert into table tmp.check1 partition(dt) select 'ALL', 1, from_unixtime(unix_timestamp()), current_date() dt " exit fi sleep 1m done
说明:
1.value为全部开始合并的比例阈值,例如有十个表,九个表已经有ts为今天Binlog,则比例为0.9,若阈值为0.8,则剩余的一个表也开始合并,
该参数用来避免有些表变化小一直没有今天Binlog而陷入死等状态
2.time为等待的最大忍受限度,例如有十个表,七个表已经有ts为今天Binlog,则比例为0.7,达不到阈值0.8,假如time为凌晨三点,只要过了这个点,剩余的三个表也开始合并,
该参数用来避免哪天突发情况,阈值设置不合适导致的死等状态,是value的补充
补充:
1.QUAN=$(hive -S -e "sql语句")可以将hive-e产生的结果集返回给变量QUAN
2.table为分区名,但也为hive中的关键字,需要使用飘号引起来,但因为飘号也为shell中包起来命令的关键字,为避免冲突,飘号前使用反斜杠\`table\`
3.group by分组后,select中只能是分组字段或聚合函数,如果想要对max值在判断,应在外边嵌套一层
4.浮点数比较大小应该先通过管道符传给bc并用飘号引起来`echo "${QUAN} > ${value}" | bc`,返回结果为真则返回1,否则返回0,再和1做等值整型eq判断即可
5.shell中日期函数为date后面可接格式化类型date "+%Y-%m-%d-%H",加号写在外面效果一样,此时返回格式为年-月-日-时用$()引起来后可做变量的等值判断,后面可拼接其他字符串
6.如果if条件里有且或关系等,应该用[[ ]]双层中括号,while ((1))为死循环,sleep 1m为睡眠一分钟,每隔1m重新写一次分区表
#!/bin/bash tablename=$1 while ((1)) do QUAN=$(hive -S -e " select check1 from tmp.check1 where (tablename = 'ods_binlog_${tablename}_di' or tablename = 'ALL') and dt = current_date() ") for check in ${QUAN[*]} do if [ ${check} -eq 1 ] then echo "退出" exit fi done echo "下次循环" sleep 1m done
说明:
tablename为脚本传入参数,因为内部有拼接前后缀,故只需要传入例如account_avatar_accountitems即可,就可以做该ods表是否开始合并的判断
补充:
tablename为脚本传入参数,因为内部有拼接前后缀,故只需要传入例如account_avatar_accountitems即可,就可以做该ods表是否开始合并的判断
补充:
1.因为我们的输出是双行单列的,故不牵扯取第几列的问题,如果多行多列又要取第二列,则QUAN=$(hive -S -e "sql语句" | cut -d' ' -f 2),意为每行都按制表符切分,取第二列,需要注意不能用\t,
由于sql输出的结果集不同列之间是制表符,而默认cut就是按照制表符切分的,所以此时-d参数可以省略
2.对于变量接受到结果集的遍历,应当使用for check in${QUAN[*]},类似于java的增强for循环,为什么要遍历所有,因为该表可能返回标志位为0,但ALL作为保底可能此时返回的是1,此时就会有0和1两个结果,我们不能盲目的只取第一个结果
[*]代表依次遍历结果集中所有行,[2]代表只遍历下标为2的行,不写方括号默认下标为0的行,forcheck in QUAN则会传入值"QUAN"给check
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。