赞
踩
对于StreamPark,任务的启动的时间线涉及到以下几个部分:
目前文件上传涉及用户的jar包。目前完全使用XOS进行资源管理。对于使用StreamPark的用户来说,目前只涉及到PUT、GET(下载到网页端)。一般情况下,不涉及fail over相关问题。
完成文件上传后,用户将进行参数配置。对任务选择flink镜像版本、sql语句、依赖jar包等。在这一步,StreamPark实际上将会将选中的XOS中的依赖jar包下载到本地供后续使用。
目前将会对镜像构建过程中的详细步骤进行分析,如果需要进行
(FAILED OVER需要考虑)在StreamPark本机环境中构建工作空间环境,这里主要就是新建一个目录,用于存储用户数据,简单地通过目录进行隔离。
// Step-1: init build workspace of flink job
// the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
val buildWorkspace =
execStep(1) {
val buildWorkspace = s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
LfsOperator.mkCleanDirs(buildWorkspace)
logInfo(s"recreate building workspace: $buildWorkspace")
buildWorkspace
}.getOrElse(throw getError.exception)
(FAILED OVER需要考虑)将用户的k8s Podtemplate导出至翼flink-StreamPark端的相应存储目录。
在镜像构建步骤中,只会讲podtemplate文件进行导出为本地文件,在后续的任务的执行的过程中将会使用。
// Step-2: export k8s pod template files
val podTemplatePaths = request.flinkPodTemplate match {
case podTemplate if podTemplate.isEmpty =>
skipStep(2)
Map[String, String]()
case podTemplate =>
execStep(2) {
val podTemplateFiles = PodTemplateTool.preparePodTemplateFiles(buildWorkspace, podTemplate).tmplFiles
logInfo(s"export flink podTemplates: ${podTemplateFiles.values.mkString(",")}")
podTemplateFiles
}.getOrElse(throw getError.exception)
}
(FAILED OVER需要考虑)打包生成任务的运行时jar包
在这里,是一个复杂的过程,打包运行时jar包主要将如下一些要素打包成一个fatjar(所有依赖打包在一起,防止依赖缺失,直接通过java -jar即可调用)。在这里,依赖jar包应该为创建任务时已经从XOS下载完成的,因此可能存在问题
最终地,将生成能够动态解析用户sql的flink运行jar包,如下本jar包的入口信息(MANIFEST.MF),可见其标记了入口的主类,如此Flink加载此jar包时候将优先使用sqlclient作为flink任务的入口。
Manifest-Version: 1.0
Implementation-Title: StreamPark : Flink Shims 1.14
Implementation-Version: 2.0.0
Specification-Vendor: Apache Software Foundation
Specification-Title: StreamPark : Flink Shims 1.14
Build-Jdk-Spec: 1.8
Created-By: Maven JAR Plugin 3.2.2
Specification-Version: 2.0
Implementation-Vendor: Apache Software Foundation
Main-Class: org.apache.streampark.flink.cli.SqlClient
任务镜像的生成
目前而言,任务的镜像主要作用为将上述第3步的运行时jar包加入到镜像目录中,详细地请看如下的dockerfile,主要使用flink官方提供的基础镜像基础上将用户的jar包添加到/usr/local/flink/usrlib
目录下。之后此镜像将作为我们任务的运行时镜像,目前阶段通过镜像仓保存。
FROM apache/flink:1.14.3-scala_2.12
RUN mkdir -p $FLINK_HOME/usrlib
COPY lib $FLINK_HOME/lib/
COPY streampark-flinkjob_test-local.jar $FLINK_HOME/usrlib/streampark-flinkjob_test-local.jar
对于翼flink而言,最终的任务执行端将是上述jar包中的org.apache.streampark.flink.cli.SqlClient
类的主函数。
任务镜像的推送
镜像的推送主要地将通过翼flink-StreamPark所在的宿主机上的docker将上一步生成的镜像推送到目标镜像仓,在这里相关的镜像仓的设定应该提前设定好,并且需要注意的,该任务镜像包含了用户自定义的UDF可能存在一定的隐私性,最好能够提供一个私有的镜像仓(如自建harbor)。当镜像推送成功后,云原生底座才能进行调用
一般地,在翼flink-StreamPark启动前,应该在其相应的数据库中初始化相应的docker信息,如下所示。
Ingress的生成(目前不涉及,可选)
目前地,对于在StreamPark中的flink任务启动,主要是用K8s Application模式对任务进行启动。主要地,其步骤和本文中介绍的手动启动相似,主要则是通过flink官方原生java任务客户端。对于StreamPark中的代码,唯一做的事就是把用户的相关信息解析成flink支持的参数,与K8S交互部分完全使用flink端原生的方法。
最终地与手动启动类似,通过参数进行控制。为加强理解,下面将会把目前经常使用到的StreamPark的最终执行方式进行解析,如下:
./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=flink-cluster \ # 指定容器启动的镜像(与之前提交的保持一致) -Dkubernetes.container.image=registry.cn.hangzhou.aliyuncs.com/dockerxiahu/flink:1.15.1-app-test-05 \ -Dkubernetes.jobmanager.replicas=1 \ # 指定容器运行的命名空间 -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dkubernetes.taskmanager.cpu=1 \ -Dtaskmanager.memory.process.size=4096mb \ -Dkubernetes.jobmanager.cpu=1 \ -Djobmanager.memory.process.size=4096mb \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dclassloader.resolve-order=parent-first \ # yaml 模板,为解决hosts映射,后续可以通过编排此yaml文件,实现动态替换启动jar包和配置文件 -Dkubernetes.pod-template-file=/opt/flink-1.14.2/flink-templeta.yaml \ # Main方法 -c com.clb.hadoop.hub.flink.realtime.launch.FlinkConsumeKafkaToHdfs \ # 启动Jar包和启动配置文件的绝对路径(容器内部,不是宿主机) local:///opt/flink/lib/flink-realtime-1.0-SNAPSHOT.jar /opt/flink/usrlib/flink-realtime-hdfs.properties
通过分析可以知晓,一般情况下,StreamPark只会指定PodTemplate这一文件作为外部文件,如果出现容器重启情况,可能会导致该文件丢失,无法顺利启动flink任务。
在以上步骤中,如果在其中一些关键步骤中发生容器重启(重启后本地文件缺失),则会导致后续问题的出现。因此,以下将会对各种时间点发生问题的情况进行分开分析。
【任务创建】成功后,容器重启
【任务创建】成功后,相关依赖JAR包将会被下载到本地的工作空间,如果容器重启后,已经下载完成的jar包将会消失。导致【镜像创建】步骤中发生找不到依赖JAR而构建失败。
【镜像创建】过程中,容器重启
【镜像创建】过程中,如果发生了容器重启,则会使任务显示一直在【发布中】状态,如果需要构建的话则需要选择(强制构建选项)。在这一步骤中容器重启,将会将构建过程停止在某一个步骤中不再更新。
【镜像创建】成功后,容器重启
【镜像创建】成功后,如果容器进行了重启,只会导致新启动的SP容器中工作空间目录、podTemplate文件缺失,如果使用podtemplate,则会因文件确认导致后续的【任务执行】失败。
【任务执行】过程中,容器重启
【任务执行】过程中,如果发生了容器重启。则会导致任务一直处于【STARTING(启动中)】状态。
【任务执行】成功后,容器重启
【任务执行】成功后,容器发生重启,则不会影响现有任务的执行。并且新启动的容器中的【监听器】将会继续监听各个任务。总之,影响不大。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。