赞
踩
大道至简,用简单的话来描述复杂的事,我是Antgeek,欢迎阅读.
在flink 3.0版本中,我们仅通过一个简单yaml文件就可以配置出一个复杂的数据同步任务,
然后再来一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以将任务提交,
本文就是来探索一下这个shell脚本,主要是研究如何通过一个shell命令+yaml文件将任务提交,其他的功能会在之后的文章中解读
大数据小菜鸡在努力学习中,文中内容有误多多指点.
目录
概述
流程图
flink-cdc.sh解读
完整代码
逐行解读
参考
首先需要思考一下,如果是自己来实现这一效果,那么应该如何设计,用什么技术?
我们知道flinkcdc的同步任务实际上也是一个flink任务,最终的提交的还是一个flink任务,而flink任务实际上就是个java任务,用jps命令都是可以查到的.
我们在编写flink streaming程序的时候,实际上主要的流程都是在一个main方法中,而main方法是可以接收参数的,所以这块设计起来其实很简单就是在shell脚本中获取到FLINK_HOME路径,然后将yaml文件通过命令行的方式传递到main方法中,然后再设计一个类来解析这个yaml文件形成一个任务实体类,然后根据这个实体类来生成一个flink任务,这就是一个大概的思路,里面肯定还有很多的细节,接下来就通过这个flink-cdc.sh脚本的解读来进一步看看大佬们是如何来实现这一功能的.
这里使用一个流程图来描述整个的流程,看完这个就知道这一脚本的大概内容了,如果有兴趣可以继续往下阅读,后面都是将脚本的一行一行的解读并配有中文注释.
源码路径 : flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh
#!/usr/bin/env bash ################################################################################ # Copyright 2023 Ververica Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # Setup FLINK_HOME args=("$@") # Check if FLINK_HOME is set in command-line arguments by "--flink-home" for ((i=0; i < ${#args[@]}; i++)); do case "${args[i]}" in --flink-home) if [[ -n "${args[i+1]}" ]]; then FLINK_HOME="${args[i+1]}" break fi ;; esac done if [[ -z $FLINK_HOME ]]; then echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"." exit 1 fi # Setup Flink related configurations # Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it _FLINK_HOME_DETERMINED=1 # FLINK_CONF_DIR is required by config.sh FLINK_CONF_DIR=$FLINK_HOME/conf # Use config.sh to setup Flink related configurations . $FLINK_HOME/bin/config.sh # Define Flink CDC directories SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) FLINK_CDC_HOME="$SCRIPT_DIR"/.. export FLINK_CDC_HOME=$FLINK_CDC_HOME FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib FLINK_CDC_LOG="$FLINK_CDC_HOME"/log # Build Java classpath CLASSPATH="" # Add Flink libraries to the classpath for jar in "$FLINK_HOME"/lib/*.jar; do CLASSPATH=$CLASSPATH:$jar done # Add Flink CDC libraries to classpath for jar in "$FLINK_CDC_LIB"/*.jar; do CLASSPATH=$CLASSPATH:$jar done # Add Hadoop classpath, which is defined in config.sh CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS # Trim classpath CLASSPATH=${CLASSPATH#:} # Setup logging LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties) # JAVA_RUN should have been setup in config.sh exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"
参数传入
#!/usr/bin/env bash
# Setup FLINK_HOME
# 获取这个脚本的所有参数,然后存储到args变量中
# ${#args[@]} 获取数组长度
# ${args[i]} 获取数组第i个值
args=("$@")
设置FLINK_HOME这个变量
# Check if FLINK_HOME is set in command-line arguments by "--flink-home" # 遍历传入的参数检查是否FLINK_HOME这个环境变量是通过命令行参数 --flink-home传递进来的 # shell中case的语法 # case 值 in # 模式1) # 这里的模式指的是shell中的通配符模式不是正则表达式,例如 a*,就是a开头的任意字符串 # 代码块 # ;; # 模式2) # 代码块 # ;; # *) # 默认代码块 # ;; # esac for ((i=0; i < ${#args[@]}; i++)); do case "${args[i]}" in --flink-home) # 如果匹配到到了就取他的下一个值给FLINK_HOME赋值,取值之前要判断一下是否存在 # -n 就是检查字符串长度是否大于0,大于0返回true,否则false if [[ -n "${args[i+1]}" ]]; then FLINK_HOME="${args[i+1]}" break fi ;; esac done
校验FLINK_HOME这个变量是否设置成功
# 如果经过上面的循环还是没有给FLINK_HOME赋值就退出程序
# 提示 [错误] 不能够在命令行参数--flink-home 或者 环境变量FLINK_HOME 找到 FLINK_HOME的值
if [[ -z $FLINK_HOME ]]; then
echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."
exit 1
fi
获取Flink的一些相关配置
# Setup Flink related configurations # 设置flink相关的配置 # Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it # 为了避免config.sh(这个文件在$FLINK_HOME/bin/config.sh)覆盖掉FLINK_HOME这个变量,所以这里将它置位1 # 为什么置为1呢,这里可以看一下config.sh中的相关代码,如下 # 可以看到如果变量_FLINK_HOME_DETERMINED为空那么就会把FLINK_HOME的值替换掉,所以这里将它的值赋值为1就是为了避免这个 # 具体FLINK_HOME会被替换成什么值呢 # dirname 就是要获取文件路径的路径,例如dirname /home/user/a.txt 返回 /home/user/ # $SYMLINK_RESOLVED_BIN 是什么值呢 # 是切换到$bin路径下,的绝对路径(pwd -P的意思就是获取实际文件系统路径,pwd是获取链接路径) # $bin是target的路径 # target="$0" # $0就是当前脚本的名称 # -L 判断是否是一个链接符号,判断target是否是一个链接符号 # 如果是一个链接符号,那么就执行循环的代码块 # 跳出的条件是target变量不是一个链接符号或者循环了100次跳出循环,-gt是大于 -ge是大于等于 # ls 就是列出目录信息 # -ld 有两个参数 -l和-d,-l是长格式进行显示,包括文件的属性和权限信息,相当于ll # -d是只显示目录自身的信息,而不列出目录中的文件,无论是文件还是目录,都不会进入它,仅是显示它自身的信息 # -- 是一个特殊的选项, 用于分隔选项与参数.它的作用是确保$target被视作参数,即使$target是 - 开头的,避免将其解析成选项 # 解释一下 target=`expr "$ls" : '.* -> \(.*\)$'` # 这行大概意思就是通过expr命令和正则表达式提取$ls变量中符号链接的目标路径或者目录,然后赋值给target # expr 是一个执行表达式的命令 # "$ls" 是作为参数传递给expr # : '.* -> \(.*\)$' 这是一个正则表达式,用于匹配符号链接中的目标文件或目录.通过使用圆括号 ( ) 捕获模式,可以将匹配到的部分提取出来 # target="$0" # # For the case, the executable has been directly symlinked, figure out # # the correct bin path by following its symlink up to an upper bound. # # Note: we can't use the readlink utility here if we want to be POSIX # # compatible. # iteration=0 # while [ -L "$target" ]; do # if [ "$iteration" -gt 100 ]; then # echo "Cannot resolve path: You have a cyclic symlink in $target." # break # fi # ls=`ls -ld -- "$target"` # target=`expr "$ls" : '.* -> \(.*\)$'` # iteration=$((iteration + 1)) # done # Convert relative path to absolute path and resolve directory symlinks # bin=`dirname "$target"` # SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` # if [ -z "$_FLINK_HOME_DETERMINED" ]; then # FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` # fi _FLINK_HOME_DETERMINED=1 # FLINK_CONF_DIR is required by config.sh # config.sh 需要 FLINK_CONF_DIR 配置 FLINK_CONF_DIR=$FLINK_HOME/conf # Use config.sh to setup Flink related configurations # 使用config.sh来配置 Flink相关的配置 . $FLINK_HOME/bin/config.sh
定义Flink cdc 的一些路径
# Define Flink CDC directories # 定义Flink cdc 的路径 # SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) # 这行的大概意思就是要获取脚本的绝对路径 # ${BASH_SOURCE[0]} bash的特殊变量,获取当前运行脚本的名称 # $(dirname -- ${BASH_SOURCE[0]}) 获取当前运行脚本的路径(不能直接用这个,因为可能会因为软连接或者其他情况导致路径获取不准确,最稳妥的方法就是cd 到这个路径然后pwd获取绝对路径),这里的 -- 就是防止后面的变量被识别成选项例如-开头 # cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" 切换到这个路径下 # &> /dev/null 就是将一些标准输出和错误输出都重定向到/dev/null,这样可以使输出更清晰 # && 当前一个命令执行成功后执行后面的命令 # pwd 获取当前路径 # FLINK_CDC_HOME="$SCRIPT_DIR"/.. # SCRIPT_DIR 的上级路径就是FLINK_CDC_HOME的值,就是切换到了bin目录的根目录 SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) FLINK_CDC_HOME="$SCRIPT_DIR"/.. export FLINK_CDC_HOME=$FLINK_CDC_HOME FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf FLINK_CDC_LIB="$FLINK_CDC_HOME"/lib FLINK_CDC_LOG="$FLINK_CDC_HOME"/log
构建任务启动需要的classpath
# Build Java classpath # 构建 Java的calsspath CLASSPATH="" # Add Flink libraries to the classpath # 将flink路径下lib的jar包都添加到classpath中 for jar in "$FLINK_HOME"/lib/*.jar; do CLASSPATH=$CLASSPATH:$jar done # Add Flink CDC libraries to classpath # 将cdc下lib的jar包都添加到classpath for jar in "$FLINK_CDC_LIB"/*.jar; do CLASSPATH=$CLASSPATH:$jar done # Add Hadoop classpath, which is defined in config.sh # 添加hadoop 的classpath CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS # Trim classpath # 去掉字符串开头的冒号 ,如果要去掉结尾的冒号 ${CLASSPATH%:} CLASSPATH=${CLASSPATH#:}
设置日志相关的配置
# Setup logging
# 配置日志
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
# 启动命令中将日志的配置参数拼接,指定日志文件以及日志配置文件
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)
启动任务
# JAVA_RUN should have been setup in config.sh
# exec 是一个用于替换当前进程的命令,一般用在脚本中,会将当前脚本的执行进程执行的内容替换成exec后面命令
# 有什么作用呢?
# 1.减少系统资源 : 不用创建一个新的进程
# 2.重定向标准输入/输出 : 通过使用 exec 命令执行新的命令.可以将标准输入,输出和错误重定向到新命令所指定的位置.
# 3.执行后续操作:在脚本中,使用 exec 命令可以执行一些命令或操作后,将控制权交给新的命令.这可以用于在脚本中完成某些初始化操作后,将脚本完全替换为另一个命令或程序.
# $JAVA_RUN 在config.sh就定义了,一般是java 或者 /bin/java
# -classpath 指定classpath路径
# "${LOG_SETTINGS[@]}" 日志的一些配置信息
# com.ververica.cdc.cli.CliFrontend 入口类
# "$@" 所有的命令行参数传到入口类中,通过String args[] 来接收
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"
[1] : https://github.com/apache/flink
[2] : https://github.com/ververica/flink-cdc-connectors
[3] : https://blog.csdn.net/wang2leee/article/details/132521566
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。