赞
踩
# 我这里使用得datastream api编写的任务,类class路径 MAIN_CLASS="com.flink.job.CommonFlinkStreamJob" # 我的代码包 JAR_PATH="/home/dev/flink/lib/flink-cdc-1.0.jar" FLINK_HOME="/home/dev/flink" FLINK_CHECKPOINT_PATH="/home/dev/flink/flink-checkpoints" # 配置新得任务时需要修改下方两个配置 # 这个是任务参数,你没有可以不写,取决于你的class任务里面是否设置 JOB_ARGS="--jobKey cdcmysql" # 这个是你给这个任务的名字 JOB_NAME="cdcmysql" # 获取正在运行的任务id,5秒*10次的容错。 get_job_id() { local retry_count=0 # 初始化重试计数 local max_retries=10 # 最大重试次数 local j_id="" # 用于存储Job ID while [ $retry_count -lt $max_retries ]; do # 当重试次数小于最大重试次数时 # 使用flink命令获取正在运行的任务,并截取到job_id j_id=$($FLINK_HOME/bin/flink list | grep -o '[0-9a-f]\{32\} : '"$JOB_NAME"' (RUNNING)') if [ -n "$j_id" ]; then # 如果成功获取到Job ID break # 跳出循环 fi sleep 5 # 等待5秒后重试 echo "尝试获取jobid $((retry_count + 1))/$max_retries " retry_count=$((retry_count + 1)) # 重试计数加1 done J_ID=$j_id # 将获取到的Job ID赋值给全局变量J_ID } start_job() { echo "Starting Flink job..." # 检查是否存在之前保存的作业ID文件 if [ -f "${FLINK_HOME}/job_id_$JOB_NAME.tmp" ]; then # 从文件中读取最新记录的job_id,空格是必须的 job_id=$(grep "${JOB_NAME} " ${FLINK_HOME}/job_id_$JOB_NAME.tmp | cut -d ':' -f 1 | sed 's/ //g') if [ -n "$job_id" ]; then # 查找该job_id的检查点目录 checkpoint_dirs=$(find "$FLINK_CHECKPOINT_PATH/$job_id" -type d -name 'chk-*') echo "job[$JOB_NAME]检查点查询,job_id: $job_id, 路径:$checkpoint_dirs" if [ -z "$checkpoint_dirs" ]; then # 路径为空,查找上一个可恢复的检查点路径 pre_job_id=$(grep "${JOB_NAME}bak" ${FLINK_HOME}/job_id_$JOB_NAME.tmp | cut -d ':' -f 1 | sed 's/ //g') if [ -n "$pre_job_id" ]; then pre_checkpoint_dirs=$(find "$FLINK_CHECKPOINT_PATH/$pre_job_id" -type d -name 'chk-*') if [ -z "$pre_checkpoint_dirs" ]; then echo "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs" echo "无法恢复任务,停止脚本,请手动设置检查点:从上一次成功执行并且$FLINK_CHECKPOINT_PATH/job_id目录下有chk开头的路径恢复" exit 1 else echo "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs" job_id=$pre_job_id echo "尝试成功,正在恢复任务进度,从Checkpoint path:$pre_checkpoint_dirs" $FLINK_HOME/bin/flink run -s $pre_checkpoint_dirs -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out & fi else echo "没有找到检查点路径,尝试从上一个检查点恢复:${pre_job_id},路径:$pre_checkpoint_dirs" # 没有备份jobid echo "无法恢复任务,停止脚本,请手动设置检查点:从上一次成功执行并且$FLINK_CHECKPOINT_PATH/job_id目录下有chk开头的路径恢复" exit 1 fi else echo "正在恢复任务进度,从Checkpoint path:$checkpoint_dirs" $FLINK_HOME/bin/flink run -s $checkpoint_dirs -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out & fi sleep 10 get_job_id if [ -n "$J_ID" ]; then echo "Job启动脚本执行成功. JobID: $J_ID" echo "$J_ID" >"${FLINK_HOME}/job_id_$JOB_NAME.tmp" # 将上一个可恢复的检查点job作为备份记录 echo "${job_id} : ${JOB_NAME}bak" >>"${FLINK_HOME}/job_id_$JOB_NAME.tmp" else echo "Failed to get JobID after $max_retries retries. Check if the job is started." fi fi else echo "job_id_$JOB_NAME.tmp 文件不存在,starting a new job!!!" $FLINK_HOME/bin/flink run -c $MAIN_CLASS $JAR_PATH $JOB_ARGS >>$FLINK_HOME/log/startjob$JOB_NAME.out & sleep 10 get_job_id if [ -n "$J_ID" ]; then echo "Job启动脚本执行成功. JobID: $J_ID" echo "$J_ID" >"${FLINK_HOME}/job_id_$JOB_NAME.tmp" else echo "Failed to get JobID after $max_retries retries. Check if the job is started." fi fi } watch_job() { while true; do status=$($FLINK_HOME/bin/flink list | grep -o '[0-9a-f]\{32\} : '"$JOB_NAME"' (RUNNING)') if [ -z "$status" ]; then echo "Flink job is not running. Restarting..." start_job else echo "$(date +"%Y-%m-%d %H:%M:%S") Flink job is running." fi sleep 180 # 每3分钟检查一次任务状态 done } watch_job
这个脚本的流程:从tmp文件里面读取jobID如果读取到就查找checkpoints路径从而恢复,如果第一个jobid没有找到tmp文件里面会记录上一次可用检查点的jobid,然后恢复;启动任务后会调用函数有容错的读取jobid从而写入到tmp文件中,并且会使用一个线程一直去监测任务的运行情况,执行上述的逻辑。
1、这个脚本在一些情况下会停止,使用linux定时任务检查这个脚本并在停止后重新拉起是个可行
的方案。
2、flink cdc是依赖于binlog运行的,如果你也使用了rds,请注意阿里云的日志过期策略,频繁的主动停止任务并恢复是我正在尝试的方案(公司不愿意花钱增加日志存储周期)
在flink cdc (mysql -> starrocks)的示例中运行,你在cdc过程中又遇到哪些问题,是如何解决的呢?可以留言讨论一下,给出常见问题的解决思路:
https://www.alibabacloud.com/help/zh/flink/support/faq-about-cdc
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。