当前位置:   article > 正文

用shell脚本自动监测Kafka积压_kafka消费积压shell监控

kafka消费积压shell监控

目的:为了解决kafka数据积压而造成的影响数据合并准确性问题

方式:用check1和check2两个脚本,check1只启动一个,做判断并给值,如果当前表有今天的ts说明可以合并昨天数据,则标志位设置为1,否则设置为0,如果满足条件的表到一定比例就可以大致确定没有积压,数据合并是没有问题的,则填入一个表名为all的表,此时所有表都可参与合并工作。check2有多少个ods表就启动多少个,查询check1所输出的表,判断是否执行ods合并,或等到一个表名为all的表,强制其开始合并。

  1. 建表语句

  1. CREATE EXTERNAL TABLE tmp.check1 (
  2. tablename STRING,
  3. check1 INT,
  4. ctime STRING)
  5. PARTITIONED BY (dt STRING)
  6. STORED AS PARQUET;

说明:

每天一个分区,check1记录当天合并前每张表的最终状态值(是否有当天凌晨ts),为1或0,ctime记录合并前最后一次赋值入表时的时间

  1. check1

  1. #!/bin/bash
  2. value=0.8
  3. time=03
  4. while ((1))
  5. do
  6. hive -e "
  7. set mapreduce.map.memory.mb=512;
  8. set mapreduce.reduce.memory.mb=10240;
  9. set mapred.max.split.size=300000000;
  10. set mapred.min.split.size.per.node=300000000;
  11. set mapred.min.split.size.per.rack=300000000;
  12. set mapred.reduce.tasks=1;
  13. set hive.exec.dynamic.partition.mode=nonstrict;
  14. insert overwrite table tmp.check1 partition(dt)
  15. select \`table\`, if(from_unixtime(bigts, 'yyyy-MM-dd')=dt, 1, 0), from_unixtime(unix_timestamp()), current_date() dt
  16. from
  17. (
  18. select \`table\`, dt, max(cast(get_json_object(content,'$.ts') as bigint)) bigts
  19. from ods_binlog.ods_binlog where dt = current_date() group by \`table\`,dt
  20. )t
  21. "
  22. QUAN=$(hive -S -e "
  23. select count(if(check1=1, 1, null))/count(*) from tmp.check1
  24. ")
  25. if [[ `echo "${QUAN} > ${value}" | bc` -eq 1 || $(date "+%Y-%m-%d-%H") = $(date "+%Y-%m-%d")-${time} ]]
  26. then
  27. hive -e"
  28. set mapreduce.map.memory.mb=512;
  29. set hive.exec.dynamic.partition.mode=nonstrict;
  30. insert into table tmp.check1 partition(dt)
  31. select 'ALL', 1, from_unixtime(unix_timestamp()), current_date() dt
  32. "
  33. exit
  34. fi
  35. sleep 1m
  36. done

说明:

1.value为全部开始合并的比例阈值,例如有十个表,九个表已经有ts为今天Binlog,则比例为0.9,若阈值为0.8,则剩余的一个表也开始合并,

该参数用来避免有些表变化小一直没有今天Binlog而陷入死等状态

2.time为等待的最大忍受限度,例如有十个表,七个表已经有ts为今天Binlog,则比例为0.7,达不到阈值0.8,假如time为凌晨三点,只要过了这个点,剩余的三个表也开始合并,

该参数用来避免哪天突发情况,阈值设置不合适导致的死等状态,是value的补充

补充:

1.QUAN=$(hive -S -e "sql语句")可以将hive-e产生的结果集返回给变量QUAN

2.table为分区名,但也为hive中的关键字,需要使用飘号引起来,但因为飘号也为shell中包起来命令的关键字,为避免冲突,飘号前使用反斜杠\`table\`

3.group by分组后,select中只能是分组字段或聚合函数,如果想要对max值在判断,应在外边嵌套一层

4.浮点数比较大小应该先通过管道符传给bc并用飘号引起来`echo "${QUAN} > ${value}" | bc`,返回结果为真则返回1,否则返回0,再和1做等值整型eq判断即可

5.shell中日期函数为date后面可接格式化类型date "+%Y-%m-%d-%H",加号写在外面效果一样,此时返回格式为年-月-日-时用$()引起来后可做变量的等值判断,后面可拼接其他字符串

6.如果if条件里有且或关系等,应该用[[ ]]双层中括号,while ((1))为死循环,sleep 1m为睡眠一分钟,每隔1m重新写一次分区表

  1. check2

  1. #!/bin/bash
  2. tablename=$1
  3. while ((1))
  4. do
  5. QUAN=$(hive -S -e "
  6. select check1 from tmp.check1 where (tablename = 'ods_binlog_${tablename}_di' or tablename = 'ALL') and dt = current_date() ")
  7. for check in ${QUAN[*]}
  8. do
  9. if [ ${check} -eq 1 ]
  10. then
  11. echo "退出"
  12. exit
  13. fi
  14. done
  15. echo "下次循环"
  16. sleep 1m
  17. done

说明:

tablename为脚本传入参数,因为内部有拼接前后缀,故只需要传入例如account_avatar_accountitems即可,就可以做该ods表是否开始合并的判断

补充:

tablename为脚本传入参数,因为内部有拼接前后缀,故只需要传入例如account_avatar_accountitems即可,就可以做该ods表是否开始合并的判断

补充:

1.因为我们的输出是双行单列的,故不牵扯取第几列的问题,如果多行多列又要取第二列,则QUAN=$(hive -S -e "sql语句" | cut -d' ' -f 2),意为每行都按制表符切分,取第二列,需要注意不能用\t,

由于sql输出的结果集不同列之间是制表符,而默认cut就是按照制表符切分的,所以此时-d参数可以省略

2.对于变量接受到结果集的遍历,应当使用for check in${QUAN[*]},类似于java的增强for循环,为什么要遍历所有,因为该表可能返回标志位为0,但ALL作为保底可能此时返回的是1,此时就会有0和1两个结果,我们不能盲目的只取第一个结果

[*]代表依次遍历结果集中所有行,[2]代表只遍历下标为2的行,不写方括号默认下标为0的行,forcheck in QUAN则会传入值"QUAN"给check

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/712248
推荐阅读
相关标签
  

闽ICP备14008679号