当前位置:   article > 正文

Flink任务自动恢复脚本

Flink任务自动恢复脚本

线上环境经常遇到flink任务挂掉得问题,这里写一个自动恢复脚本

# 我这里使用得datastream api编写的任务,类class路径
MAIN_CLASS="com.flink.job.CommonFlinkStreamJob"
# 我的代码包
JAR_PATH="/home/dev/flink/lib/flink-cdc-1.0.jar"
FLINK_HOME="/home/dev/flink"
FLINK_CHECKPOINT_PATH="/home/dev/flink/flink-checkpoints"
# 配置新得任务时需要修改下方两个配置
# 这个是任务参数,你没有可以不写,取决于你的class任务里面是否设置
JOB_ARGS="--jobKey cdcmysql"
# 这个是你给这个任务的名字
JOB_NAME="cdcmysql"

# 获取正在运行的任务id,5秒*10次的容错。
get_job_id() {
  local retry_count=0                         # 初始化重试计数
  local max_retries=10                        # 最大重试次数
  local j_id=""                               # 用于存储Job ID
  while [ $retry_count -lt $max_retries ]; do # 当重试次数小于最大重试次数时
    # 使用flink命令获取正在运行的任务,并截取到job_id
    j_id=$($FLINK_HOME/bin/flink list | grep -o '[0-9a-f]\{32\} : '"$JOB_NAME"' (RUNNING)')
    if [ -n "$j_id" ]; then # 如果成功获取到Job ID
      break                 # 跳出循环
    fi
    sleep 5 # 等待5秒后重试
    echo "尝试获取jobid $((retry_count + 1))/$max_retries "
    retry_count=$((retry_count + 1)) # 重试计数加1
  done
  J_ID=$j_id # 将获取到的Job ID赋值给全局变量J_ID
}
start_job() {
  echo "Starting Flink job..."
  # 检查是否存在之前保存的作业ID文件
  if [ -f "${FLINK_HOME}/job_id_$JOB_NAME.tmp" ]; then
    # 从文件中读取最新记录的job_id,空格是必须的
    job_id=$(grep "${JOB_NAME} " ${FLINK_HOME}/job_id_$JOB_NAME.tmp | cut -d ':' -f 1 | sed 's/ //g')

    if [ -n "$job_id" ]; then
      # 查找该job_id的检查点目录
      checkpoint_dirs=$(find "$FLINK_CHECKPOINT_PATH/$job_id" -type d -name 'chk-*')
      echo "job[$JOB_NAME]检查点查询,job_id: $job_id, 路径:$checkpoint_dirs"

      if [ -z "$checkpoint_dirs" ]; then
        # 路径为空,查找上一个可恢复的检查点路径
        pre_job_id=$(grep "${JOB_NAME}bak" ${FLINK_HOME}/job_id_$JOB_NAME.tmp | cut -d ':' -f 1 | sed 's/ //g')

        if [ -n "$pre_job_id" ]; then
          pre_checkpoint_dirs=$(find "$FLINK_CHECKPOINT_PATH/$pre_job_id" -type d -name 'chk-*')

          if [ -z "$pre_checkpoint_dirs" ]; then
            echo "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs"
            echo "无法恢复任务,停止脚本,请手动设置检查点:从上一次成功执行并且$FLINK_CHECKPOINT_PATH/job_id目录下有chk开头的路径恢复"
            exit 1
          else
            echo "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs"
            job_id=$pre_job_id
            echo "尝试成功,正在恢复任务进度,从Checkpoint path:$pre_checkpoint_dirs"
            $FLINK_HOME/bin/flink run -s $pre_checkpoint_dirs -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out &
          fi
        else
          echo "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs"
          # 没有备份jobid
          echo "无法恢复任务,停止脚本,请手动设置检查点:从上一次成功执行并且$FLINK_CHECKPOINT_PATH/job_id目录下有chk开头的路径恢复"
          exit 1
        fi
      else
        echo "正在恢复任务进度,从Checkpoint path:$checkpoint_dirs"
        $FLINK_HOME/bin/flink run -s $checkpoint_dirs -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out &
      fi
      sleep 10
      get_job_id
      if [ -n "$J_ID" ]; then
        echo "Job启动脚本执行成功. JobID: $J_ID"
        echo "$J_ID" >"${FLINK_HOME}/job_id_$JOB_NAME.tmp"
        # 将上一个可恢复的检查点job作为备份记录
        echo "${job_id} : ${JOB_NAME}bak" >>"${FLINK_HOME}/job_id_$JOB_NAME.tmp"
      else
        echo "Failed to get JobID after $max_retries retries. Check if the job is started."
      fi
    fi
  else
    echo "job_id_$JOB_NAME.tmp 文件不存在,starting a new job!!!"
    $FLINK_HOME/bin/flink run -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out &
    sleep 10
    get_job_id
    if [ -n "$J_ID" ]; then
      echo "Job启动脚本执行成功. JobID: $J_ID"
      echo "$J_ID" >"${FLINK_HOME}/job_id_$JOB_NAME.tmp"
    else
      echo "Failed to get JobID after $max_retries retries. Check if the job is started."
    fi
  fi
}
watch_job() {
  while true; do
    status=$($FLINK_HOME/bin/flink list | grep -o '[0-9a-f]\{32\} : '"$JOB_NAME"' (RUNNING)')
    if [ -z "$status" ]; then
      echo "Flink job is not running. Restarting..."
      start_job
    else
      echo "$(date +"%Y-%m-%d %H:%M:%S") Flink job is running."
    fi
    sleep 180 # 每3分钟检查一次任务状态
  done
}
watch_job

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106

这个脚本的流程:从tmp文件里面读取jobID如果读取到就查找checkpoints路径从而恢复,如果第一个jobid没有找到tmp文件里面会记录上一次可用检查点的jobid,然后恢复;启动任务后会调用函数有容错的读取jobid从而写入到tmp文件中,并且会使用一个线程一直去监测任务的运行情况,执行上述的逻辑。

不足

1、这个脚本在一些情况下会停止,使用linux定时任务检查这个脚本并在停止后重新拉起是个可行
的方案。
2、flink cdc是依赖于binlog运行的,如果你也使用了rds,请注意阿里云的日志过期策略,频繁的主动停止任务并恢复是我正在尝试的方案(公司不愿意花钱增加日志存储周期)

讨论

在flink cdc (mysql -> starrocks)的示例中运行,你在cdc过程中又遇到哪些问题,是如何解决的呢?可以留言讨论一下,给出常见问题的解决思路:
https://www.alibabacloud.com/help/zh/flink/support/faq-about-cdc

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/660015?site
推荐阅读
相关标签
  

闽ICP备14008679号