赞
踩
#Flink Cluster On Kubernetes部署
Flink版本:1.10.1
kubernetes:1.16.5
Flink 在Kubernetes上部署分为Job cluster和Session cluster两种模式。Job cluster需要我们将自己的Jar打到flink镜像里一块部署,session模式可以启动cluster之后,我们再提交jar到session cluster。
在YARN模式部署的文章里,我们是直接从官网下载编译后的包进行部署的。由于Job Cluster模式需要我们重新打镜像,在环境准备这一部分,我们尝试从github上拉去flink源码手动编译一下。
进入 flink github,地址:https://github.com/apache/flink
点击【Releases】,查找我们需要下载版本的包,例如我们需要下载1.10.1的源码,我们就在Releases页面下载release-1.10.1这个包
wget https://github.com/apache/flink/archive/release-1.10.1.tar.gz
小技巧:由于网络原因我们可能下载代码失败,我们可以注册一个码云账号https://gitee.com/,在码云上新创建一个仓库,在导入已有仓库选项中将flink在github上的地址填入进去,最后我们从码云上下载代码就很快了。
下载代码后进行解压
tar -zxvf flink-release-1.10.1.tar.gz
进入解压目录执行如下Maven命令进行编译
mvn clean package -DskipTests -Dfast
编译过程中可能遇到如下错误:
Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1
- [ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.8.2: Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1 in https://maven.aliyun.com/repository/public was cached in the local repository, resolution will not be reattempted until the update interval of aliyunmaven has elapsed or updates are forced -> [Help 1]
- [ERROR]
- [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
- [ERROR] Re-run Maven using the -X switch to enable full debug logging.
- [ERROR]
- [ERROR] For more information about the errors and possible solutions, please read the following articles:
- [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
- [ERROR]
- [ERROR] After correcting the problems, you can resume the build with the command
- [ERROR] mvn <goals> -rf :flink-avro-confluent-registry
-
只是由于在maven仓库中找不到kafka-schema-registry-client:jar:3.3.1导致的,这里我尝试换用官方仓库和阿里仓库都没有找到该报,通过查阅资料,我们可以手动下载该包,然后安装到我们的maven仓库中。
在flink代码目录创建lib包,用来存放下载的JAR,然后下载从io.confluent官网下载对应版本的JAR
- mkdir lib
- cd lib
- wget http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar
然后使用maven命令安装该JAR到本地仓库
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=kafka-schema-registry-client-3.3.1.jar
重新尝试编译。
build-target目录下就是我们编译后的文件
我们也需要k8s环境,可以尝试在自己电脑上使用docker on desktop安装kuberneteshttps://www.docker.com/products/docker-desktop,这里就不详细介绍,可以参考:https://blog.csdn.net/shirukai/article/details/103512497在自己的电脑上搭建k8s环境。
Flink Session cluster是作为长期运行的Kubernetes Deployment。一个session cluster可以提交多个Flink job,集群部署后,需要将Job提交到集群。
一个基础的Flink session 集群包含以下k8s资源组件:
Flink Conf ConfigMap
JobManager Service
JobManager Deployment
TaskManager Deployment
这里参考官网给的资源定义:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
用来存放Flink的相关配置文件,如flink-conf.yaml、log4j.properties等。
创建一个名为flink-configuration-configmap.yaml的文件。
内容如下:
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: flink-config
- labels:
- app: flink
- data:
- flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
- taskmanager.numberOfTaskSlots: 1
- blob.server.port: 6124
- jobmanager.rpc.port: 6123
- taskmanager.rpc.port: 6122
- jobmanager.heap.size: 1024m
- taskmanager.memory.process.size: 1024m
- log4j.properties: |+
- log4j.rootLogger=INFO, file
- log4j.logger.akka=INFO
- log4j.logger.org.apache.kafka=INFO
- log4j.logger.org.apache.hadoop=INFO
- log4j.logger.org.apache.zookeeper=INFO
- log4j.appender.file=org.apache.log4j.FileAppender
- log4j.appender.file.file=${log.file}
- log4j.appender.file.layout=org.apache.log4j.PatternLayout
- log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
创建Flink Conf ConfigMap
kubectl create -f flink-configuration-configmap.yaml
查看已创建的configmap
kubectl get configmap
编辑jobmanager-service.yaml文件
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager
- spec:
- type: ClusterIP
- ports:
- - name: rpc
- port: 6123
- - name: blob
- port: 6124
- - name: ui
- port: 8081
- selector:
- app: flink
- component: jobmanager
创建Flink jobmanager-service
kubectl create -f jobmanager-service.yaml
编辑jobmanager-deployment.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-jobmanager
- spec:
- replicas: 1
- selector:
- matchLabels:
- app: flink
- component: jobmanager
- template:
- metadata:
- labels:
- app: flink
- component: jobmanager
- spec:
- containers:
- - name: jobmanager
- image: flink:1.10.1
- workingDir: /opt/flink
- command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
- while :;
- do
- if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
- then tail -f -n +1 log/*jobmanager*.log;
- fi;
- done"]
- ports:
- - containerPort: 6123
- name: rpc
- - containerPort: 6124
- name: blob
- - containerPort: 8081
- name: ui
- livenessProbe:
- tcpSocket:
- port: 6123
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j.properties
- path: log4j.properties
创建Flink jobmanager-deployment
kubectl create -f jobmanager-deployment.yaml
编辑taskmanager-deployment.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-taskmanager
- spec:
- replicas: 2
- selector:
- matchLabels:
- app: flink
- component: taskmanager
- template:
- metadata:
- labels:
- app: flink
- component: taskmanager
- spec:
- containers:
- - name: taskmanager
- image: flink:1.10.1
- workingDir: /opt/flink
- command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
- while :;
- do
- if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
- then tail -f -n +1 log/*taskmanager*.log;
- fi;
- done"]
- ports:
- - containerPort: 6122
- name: rpc
- livenessProbe:
- tcpSocket:
- port: 6122
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf/
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j.properties
- path: log4j.properties
创建Flink taskmanager-deployment
kubectl create -f taskmanager-deployment.yaml
在命令行执行kubectl proxy
然后在浏览器访问 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
to forward your jobmanager’s web ui port to local 8081.获取所有pod
- (base) shirukai@shirukaideMacBook-Pro session % kubectl get pods
- NAME READY STATUS RESTARTS AGE
- flink-jobmanager-6b67975bc6-rshdt 1/1 Running 0 23m
- flink-taskmanager-9dbb6dbc4-lhdhr 1/1 Running 0 4m58s
- flink-taskmanager-9dbb6dbc4-mx7rn 1/1 Running 0 4m58s
kubectl describe pod flink-jobmanager-6b67975bc6-rshdt
端口转发
kubectl port-forward flink-jobmanager-6b67975bc6-rshdt 8081:8081
编辑jobmanager-rest-service.yaml
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager-rest
- spec:
- type: NodePort
- ports:
- - name: rest
- port: 8081 #Cluster IP 上监听的端口
- targetPort: 8081 #Pod监听的端口
- nodePort: 30081 #k8s节点上监听的端口
- selector:
- app: flink
- component: jobmanager
创建 jobmanager-rest-service
kubectl create -f jobmanager-rest-service.yaml
通过kubectl get svc 查看对外端口
kubectl get svc
访问: localhost:30081
假如本地有flink客户端,可以直接进入bin目录下,使用客户端进行任务提交
./flink run -m localhost:30081 ../examples/streaming/WordCount.jar
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lxiALPdk-1600850032069)(https://cdn.jsdelivr.net/gh/shirukai/images/20200922133510.png)]
- kubectl delete -f jobmanager-rest-service.yaml
- kubectl delete -f jobmanager-service.yaml
- kubectl delete -f jobmanager-deployment.yaml
- kubectl delete -f taskmanager-deployment.yaml
- kubectl delete -f flink-configuration-configmap.yaml
Per-Job需要提前在容器内准备好Jar包,有两种方式,一种是重新构建镜像,将用户的Jar包打到镜像里,第二种是通过挂载卷的方式将用户存放Jar的存储,挂载到容器里的/opt/flink/usrlib下。
重构镜像也有两种方式,一种是基于源码编译后然后重新构建镜像。第二种是基于官方的镜像,将我们的Jar打进去。
源码编译完成之后,进入flink/flink-container/docker目录,执行如下命令进行构建镜像
sh build.sh --from-local-dist --image-name flink-job:1.10.1-source --job-artifacts ../../build-target/examples/streaming/WordCount.jar
编写dockerfile
- From flink:1.10.1
- ADD *.jar /opt/flink/usrlib/
构建镜像
docker build -t flink-job:1.10.1-image .
flink-configuration-configmap.yaml
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: flink-config
- labels:
- app: flink
- data:
- flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
- taskmanager.numberOfTaskSlots: 1
- blob.server.port: 6124
- jobmanager.rpc.port: 6123
- taskmanager.rpc.port: 6122
- jobmanager.heap.size: 1024m
- taskmanager.memory.process.size: 1024m
- log4j.properties: |+
- log4j.rootLogger=INFO, file
- log4j.logger.akka=INFO
- log4j.logger.org.apache.kafka=INFO
- log4j.logger.org.apache.hadoop=INFO
- log4j.logger.org.apache.zookeeper=INFO
- log4j.appender.file=org.apache.log4j.FileAppender
- log4j.appender.file.file=${log.file}
- log4j.appender.file.layout=org.apache.log4j.PatternLayout
- log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
- log4j-console.properties: |+
- log4j-console.properties: |+
- # This affects logging for both user code and Flink
- log4j.rootLogger=INFO, console
-
- # Uncomment this if you want to _only_ change Flink's logging
- #log4j.logger.org.apache.flink=INFO
- # The following lines keep the log level of common libraries/connectors on
- # log level INFO. The root logger does not override this. You have to manually
- # change the log levels here.
- log4j.logger.akka=INFO
- log4j.logger.org.apache.kafka=INFO
- log4j.logger.org.apache.hadoop=INFO
- log4j.logger.org.apache.zookeeper=INFO
- # Log all infos to the console
- log4j.appender.console=org.apache.log4j.ConsoleAppender
- log4j.appender.console.layout=org.apache.log4j.PatternLayout
- log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- # Suppress the irrelevant (wrong) warnings from the Netty channel handler
- log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
jobmanager-service.yaml
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager
- spec:
- type: ClusterIP
- ports:
- - name: rpc
- port: 6123
- - name: blob-server
- port: 6124
- - name: webui
- port: 8081
- selector:
- app: flink
- component: jobmanager
jobmanager-rest-service.yaml
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager-rest
- spec:
- type: NodePort
- ports:
- - name: rest
- port: 8081
- targetPort: 8081
- nodePort: 30081
- selector:
- app: flink
- component: jobmanager
jobmanager-job-deployment.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-jobmanager
- spec:
- replicas: 1
- selector:
- matchLabels:
- app: flink
- component: jobmanager
- template:
- metadata:
- labels:
- app: flink
- component: jobmanager
-
- # -----
- spec:
- # restartPolicy: OnFailure
- containers:
- - name: jobmanager
- image: flink-job:1.10.1-image
- env:
- args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount.WordCount"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
- ports:
- - containerPort: 6123
- name: rpc
- - containerPort: 6124
- name: blob-server
- - containerPort: 8081
- name: webui
- livenessProbe:
- tcpSocket:
- port: 6123
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf
- # - name: job-artifacts-volume
- # mountPath: /opt/flink/usrlib
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j.properties
- path: log4j.properties
- - key: log4j-console.properties
- path: log4j-console.properties
- # - name: job-artifacts-volume
- # hostPath:
- # path: /host/path/to/job/artifacts
-
taskmanager-job-deployment.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-taskmanager
- spec:
- replicas: 2
- selector:
- matchLabels:
- app: flink
- component: taskmanager
- template:
- metadata:
- labels:
- app: flink
- component: taskmanager
- spec:
- containers:
- - name: taskmanager
- image: flink-job:1.10.1-image
- env:
- args: ["taskmanager"]
- ports:
- - containerPort: 6122
- name: rpc
- - containerPort: 6125
- name: query-state
- livenessProbe:
- tcpSocket:
- port: 6122
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf/
- #- name: job-artifacts-volume
- # mountPath: /opt/flink/usrlib
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
- # - name: job-artifacts-volume
- # hostPath:
- # path: /host/path/to/job/artifacts
- kubectl create -f flink-configuration-configmap.yaml
- kubectl create -f jobmanager-rest-service.yaml
- kubectl create -f jobmanager-service.yaml
- kubectl create -f jobmanager-job-deployment.yaml
- kubectl create -f taskmanager-job-deployment.yaml
- kubectl delete -f flink-configuration-configmap.yaml
- kubectl delete -f jobmanager-rest-service.yaml
- kubectl delete -f jobmanager-service.yaml
- kubectl delete -f jobmanager-job-deployment.yaml
- kubectl delete -f taskmanager-job-deployment.yaml
-
docker rm $(docker ps -a | grep Exit | awk ‘{print $1}’)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。