当前位置:   article > 正文

FlinkOnYarn 监控 flink任务_yarn任务状态监控

yarn任务状态监控

Flink任务一般为实时不断运行的任务,如果没有任务监控,
任务异常时无法第一时间处理会比较麻烦。
这里通过调用API接口方式来获取参数,实现任务监控。

Flink任务监控(基于API接口编写shell脚本)
一 flink-on-yarn 模式
二 编写shell 脚本 

监控集群指标

http://rm-http-address:port/ws/v1/cluster/metrics

 响应正文

<clusterMetrics>

<appsSubmitted>**</appsSubmitted>

<appsCompleted>**</appsCompleted>

<appsPending>0</appsPending>

<appsRunning>**</appsRunning>

<appsFailed>**</appsFailed>

<appsKilled>**</appsKilled>

<reservedMB>0</reservedMB>

<availableMB>**</availableMB>

<allocatedMB>**</allocatedMB>

<pendingMB>0</pendingMB>

<reservedVirtualCores>0</reservedVirtualCores>

<availableVirtualCores>**</availableVirtualCores>

<allocatedVirtualCores>**</allocatedVirtualCores>

<pendingVirtualCores>0</pendingVirtualCores>

<containersAllocated>**</containersAllocated>

<containersReserved>0</containersReserved>

<containersPending>0</containersPending>

<totalMB>**</totalMB>

<totalVirtualCores>**</totalVirtualCores>

<utilizedMBPercent>53</utilizedMBPercent>

<utilizedVirtualCoresPercent>**</utilizedVirtualCoresPercent>

<rmSchedulerBusyPercent>0</rmSchedulerBusyPercent>

<totalNodes>**</totalNodes>

<lostNodes>0</lostNodes>

<unhealthyNodes>**</unhealthyNodes>

<decommissioningNodes>0</decommissioningNodes>

<decommissionedNodes>0</decommissionedNodes>

<rebootedNodes>0</rebootedNodes>

<activeNodes>**</activeNodes>

<shutdownNodes>0</shutdownNodes>

<totalAllocatedContainersAcrossPartition>0</totalAllocatedContainersAcrossPartition>

<crossPartitionMetricsAvailable>false</crossPartitionMetricsAvailable>

</clusterMetrics>

clusterMetrics 对象的元素

项目数据类型描述
apps已提交int提交的申请数量
应用已完成int完成的申请数量
apps待定int待处理的申请数量
应用程序正在运行int正在运行的应用程序数
apps失败int失败的应用程序数
应用已杀死int被终止的应用程序数
保留MB预留的内存量(以 MB 为单位)
可用MB可用内存量(以 MB 为单位)
已分配MB分配的内存量(以 MB 为单位)
总MB总内存量(以 MB 为单位)
保留虚拟核心保留的虚拟核心数
可用虚拟核心可用虚拟核心数
分配的虚拟核心分配的虚拟核心数
totalVirtualCores 虚拟核心数虚拟核心总数
容器已分配int分配的容器数
容器保留int保留的容器数
容器挂起int待处理的容器数
总节点数int节点总数
活动节点int活动节点数
丢失节点int丢失的节点数
不健康的节点int不正常的节点数
停用节点int停用的节点数
已停用节点int停用的节点数
rebooted节点int重新启动的节点数
shutdown节点int关闭的节点数

获取所有application

curl -s http://XXX:8088/ws/v1/cluster/apps

获取 state值为 RUNNING 的application任务

curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING 

获取这个任务单个信息 

curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state

请注意,根据安全设置,用户可能无法看到所有字段。 

项目数据类型描述
编号字符串应用程序 ID
用户字符串启动应用程序的用户
名字字符串应用程序名称
队列字符串提交应用程序的队列
字符串根据 ResourceManager 的应用程序状态 - 有效值是 YarnApplicationState 枚举的成员:NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
finalStatus字符串应用程序的最终状态(如果已完成)(由应用程序本身报告)有效值是 FinalApplicationStatus 枚举的成员:UNDEFINED、SUCCEEDED、FAILED、KILLED
进展以百分比表示的申请进度
trackingUI字符串跟踪 URL 当前指向的位置 - 历史记录(用于历史记录服务器)或 ApplicationMaster
trackingUrl字符串可用于跟踪应用程序的 Web URL
诊断字符串详细的诊断信息
clusterId集群 ID
应用程序类型字符串应用程序类型
application标签字符串应用程序的逗号分隔标记
优先权字符串所提交申请的优先权
开始时间应用程序启动的时间(自纪元以来的毫秒)
完成时间应用程序完成的时间(以纪元以来的毫秒数为单位)
elapsedTime自应用程序启动以来经过的时间(以毫秒为单位)
amContainer日志字符串应用程序主容器日志的 URL
amHostHttp地址字符串应用程序主机的节点 http 地址
amRPCAddress字符串应用程序主机的 RPC 地址
已分配MBint分配给应用程序正在运行的容器的内存总和(以 MB 为单位)
已分配VCoresint分配给应用程序正在运行的容器的虚拟核心的总和
running容器int当前为应用程序运行的容器数
memorySeconds应用程序分配的内存量(兆字节-秒)
vcore秒数应用程序分配的 CPU 资源量(虚拟内核 - 秒)
queueUsagePercentage应用正在使用的队列资源的百分比
clusterUsage百分比应用正在使用的群集资源的百分比。
抢占ResourceMB抢占式容器使用的内存
preemptedResourceVCores抢占容器使用的虚拟核心数
numNonAMContainer抢占int抢占的标准容器数
numAMContainer抢占int抢占的应用程序主容器数
logAggregationStatus字符串日志聚合的状态 - 有效值是 LogAggregationStatus 枚举的成员:DISABLED、NOT_START、RUNNING、RUNNING_WITH_FAILURE、SUCCEEDED、FAILED、TIME_OUT
unmanaged应用程序布尔应用程序是否处于非托管状态。
appNodeLabelExpression字符串节点标签表达式,用于标识默认情况下应在其上运行应用程序容器的节点。
amNodeLabel表达式字符串节点标签表达式,用于标识应用程序的 AM 容器预期在其上运行的节点。

jq,是linux一个很方便的json处理工具

通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。

安装起来也非常的方便,直接使用yum即可安装。linux下离线安装jq工具 - 代码天地 (codetd.com)

yum install jq

编写shell脚本

由于公司离线yarn和实时yarn 采用是分开的方式。
只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的
这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控
shell脚本水平有限,大家多多谅解,欢迎指导

shell脚本实现功能:
获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。
不是RUNNING,就告警同时重新记录日志。

  1. #!/bin/bash
  2. Joblist=`cat /opt/shell/logs/flink_job.log` #获取记录job的log文件
  3. let i=0 #获取任务数
  4. let log_count=0 #获取日志中的任务数
  5. start_count=RUNNING #判断任务是否存在异常
  6. ############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########
  7. if [ -z "$Joblist" ]
  8. then
  9. while :
  10. do
  11. job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
  12. if [ ${job_id[$i]} = "null" ];then
  13. break
  14. else
  15. echo ${job_id[$i]}
  16. echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
  17. let i++
  18. fi
  19. done
  20. fi
  21. ############## 2 读取文件中JOB任务 ##################
  22. let i=0
  23. while read line
  24. do
  25. JOB[$i]=$line
  26. let i++
  27. done</opt/shell/logs/flink_job.log
  28. log_count=$i #获取日志中的任务数
  29. ########### 3 判断任务状态,是否为RUNNIG,不是则邮件告警 ###############
  30. for ((j=0;j<i;j++))
  31. do
  32. JOB_ID=${JOB[$j]//\"}
  33. JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.state`
  34. JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.name`
  35. START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.startedTime` / 1000]
  36. # echo "JOB_NAME: "$JOB_NAME
  37. # echo 启动时间: `date -d @$START +"%F %H:%M:%S"`
  38. # echo "JOB_status: " ${JOB_status//\"}
  39. #echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"
  40. #echo "=============================================="
  41. if [ ${JOB_status//\"} != "RUNNING" ];then
  42. SUBJECT="【异常告警】Flink任务异常"
  43. TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n\n目前状态: $JOB_status"
  44. echo -e $TEXT | mail -s $SUBJECT 邮箱地址
  45. start_count=erron
  46. fi
  47. done
  48. ########### 4 出现任务异常,重新读取job 任务记录到日志文件 ###############
  49. let i=0
  50. if [ $start_count == "erron" ];then
  51. echo '重新写入日志文件'
  52. while :
  53. do
  54. job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
  55. if [ ${job_id[$i]} = "null" ];then
  56. break
  57. elif [ $i == 0 ]; then
  58. echo ${job_id[$i]}>/opt/shell/logs/flink_job.log
  59. else
  60. echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
  61. fi
  62. let i++
  63. done
  64. start_count=RUNNING
  65. fi
  66. ########### 5 判断线上任务数是否一致,是否有新任务增加 ###############
  67. let i=0
  68. while :
  69. do
  70. job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`
  71. if [ ${job_id[$i]} = "null" ];then
  72. break
  73. else
  74. let i++
  75. fi
  76. done
  77. let count=$i #线上任务数
  78. echo "==========================线上最新RUNNING状态任务数: "$count
  79. echo "==========================日志RUNNING状态任务数: "$log_count
  80. if [ ! $count -eq $log_count ]; then
  81. echo "现有RUNNING状态任务数不相等于已记录的任务数"
  82. echo ${job_id[0]} >/opt/shell/logs/flink_job.log
  83. for ((i=1;i<count;i++))
  84. do
  85. echo "重新写入JOB: "${job_id[$i]}
  86. echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log
  87. done
  88. fi
  89. echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="
  90. echo ================================================================================================
  91. echo =====================================本次crontab监控结束========================================
  92. echo ================================================================================================

Yarn REST API 使用指南-阿里云开发者社区

Apache Hadoop 3.0.1 – ResourceManager REST API。

监控jobId

 http://***.**.**.**:****/proxy/applicationId/jobs

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/958757
推荐阅读
相关标签
  

闽ICP备14008679号