当前位置:   article > 正文

Flink Cluster On Kubernetes部署_flink kubernetes cluster

flink kubernetes cluster

#Flink Cluster On Kubernetes部署

Flink版本:1.10.1

kubernetes:1.16.5

Flink 在Kubernetes上部署分为Job cluster和Session cluster两种模式。Job cluster需要我们将自己的Jar打到flink镜像里一块部署,session模式可以启动cluster之后,我们再提交jar到session cluster。

1 环境准备

YARN模式部署的文章里,我们是直接从官网下载编译后的包进行部署的。由于Job Cluster模式需要我们重新打镜像,在环境准备这一部分,我们尝试从github上拉去flink源码手动编译一下。

1.1 从Github上拉取Flink源码

  1. 进入 flink github,地址:https://github.com/apache/flink

  2. 点击【Releases】,查找我们需要下载版本的包,例如我们需要下载1.10.1的源码,我们就在Releases页面下载release-1.10.1这个包

    wget https://github.com/apache/flink/archive/release-1.10.1.tar.gz
    

    小技巧:由于网络原因我们可能下载代码失败,我们可以注册一个码云账号https://gitee.com/,在码云上新创建一个仓库,在导入已有仓库选项中将flink在github上的地址填入进去,最后我们从码云上下载代码就很快了。

  3. 下载代码后进行解压

    tar -zxvf flink-release-1.10.1.tar.gz
    

1.2 编译

  1. 进入解压目录执行如下Maven命令进行编译

    mvn clean package -DskipTests -Dfast
    

    编译过程中可能遇到如下错误:

    Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1

    1. [ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.8.2: Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1 in https://maven.aliyun.com/repository/public was cached in the local repository, resolution will not be reattempted until the update interval of aliyunmaven has elapsed or updates are forced -> [Help 1]
    2. [ERROR]
    3. [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    4. [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    5. [ERROR]
    6. [ERROR] For more information about the errors and possible solutions, please read the following articles:
    7. [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
    8. [ERROR]
    9. [ERROR] After correcting the problems, you can resume the build with the command
    10. [ERROR] mvn <goals> -rf :flink-avro-confluent-registry

    只是由于在maven仓库中找不到kafka-schema-registry-client:jar:3.3.1导致的,这里我尝试换用官方仓库和阿里仓库都没有找到该报,通过查阅资料,我们可以手动下载该包,然后安装到我们的maven仓库中。

    在flink代码目录创建lib包,用来存放下载的JAR,然后下载从io.confluent官网下载对应版本的JAR

    1. mkdir lib
    2. cd lib
    3. wget http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar

    然后使用maven命令安装该JAR到本地仓库

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=kafka-schema-registry-client-3.3.1.jar
    

    重新尝试编译。

    image-20200922142327350

  2. build-target目录下就是我们编译后的文件

1.3 Kubernetes环境

我们也需要k8s环境,可以尝试在自己电脑上使用docker on desktop安装kuberneteshttps://www.docker.com/products/docker-desktop,这里就不详细介绍,可以参考:https://blog.csdn.net/shirukai/article/details/103512497在自己的电脑上搭建k8s环境。

2 Flink session cluster on Kubernetes

Flink Session cluster是作为长期运行的Kubernetes Deployment。一个session cluster可以提交多个Flink job,集群部署后,需要将Job提交到集群。

一个基础的Flink session 集群包含以下k8s资源组件:

Flink Conf ConfigMap

  • 用于存储flink-conf.yaml,log4j-console.properties等配置信息
  • Flink JM和TM Deployment启动时会自动获取配置

JobManager Service

  • 通过Service Name和Port暴露JobManager服务,让TaskManager能够连接到JobManager

JobManager Deployment

  • 定义JobManager Pod 副本数目,版本等,保证在Pods中至少有一个副本

TaskManager Deployment

  • 定义TaskManager Pod副本数目,版本等,保证在Pods中至少有一个副本。

2.1 在K8S上部署Flink Session cluster

这里参考官网给的资源定义:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions

2.1.1 Flink ConfigMap

用来存放Flink的相关配置文件,如flink-conf.yaml、log4j.properties等。

  1. 创建一个名为flink-configuration-configmap.yaml的文件。

    内容如下:

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 1
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. jobmanager.heap.size: 1024m
    15. taskmanager.memory.process.size: 1024m
    16. log4j.properties: |+
    17. log4j.rootLogger=INFO, file
    18. log4j.logger.akka=INFO
    19. log4j.logger.org.apache.kafka=INFO
    20. log4j.logger.org.apache.hadoop=INFO
    21. log4j.logger.org.apache.zookeeper=INFO
    22. log4j.appender.file=org.apache.log4j.FileAppender
    23. log4j.appender.file.file=${log.file}
    24. log4j.appender.file.layout=org.apache.log4j.PatternLayout
    25. log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    26. log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  2. 创建Flink Conf ConfigMap

    kubectl create -f flink-configuration-configmap.yaml 
    
  3. 查看已创建的configmap

    kubectl get configmap
    

2.1.2 JobManager Service

  1. 编辑jobmanager-service.yaml文件

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. type: ClusterIP
    7. ports:
    8. - name: rpc
    9. port: 6123
    10. - name: blob
    11. port: 6124
    12. - name: ui
    13. port: 8081
    14. selector:
    15. app: flink
    16. component: jobmanager
  2. 创建Flink jobmanager-service

    kubectl create -f jobmanager-service.yaml
    

2.1.3 JobManager Deployment

  1. 编辑jobmanager-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: flink:1.10.1
    20. workingDir: /opt/flink
    21. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
    22. while :;
    23. do
    24. if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
    25. then tail -f -n +1 log/*jobmanager*.log;
    26. fi;
    27. done"]
    28. ports:
    29. - containerPort: 6123
    30. name: rpc
    31. - containerPort: 6124
    32. name: blob
    33. - containerPort: 8081
    34. name: ui
    35. livenessProbe:
    36. tcpSocket:
    37. port: 6123
    38. initialDelaySeconds: 30
    39. periodSeconds: 60
    40. volumeMounts:
    41. - name: flink-config-volume
    42. mountPath: /opt/flink/conf
    43. securityContext:
    44. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    45. volumes:
    46. - name: flink-config-volume
    47. configMap:
    48. name: flink-config
    49. items:
    50. - key: flink-conf.yaml
    51. path: flink-conf.yaml
    52. - key: log4j.properties
    53. path: log4j.properties
  2. 创建Flink jobmanager-deployment

    kubectl create -f jobmanager-deployment.yaml
    

2.1.4 TaskManager Deployment

  1. 编辑taskmanager-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: flink:1.10.1
    20. workingDir: /opt/flink
    21. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
    22. while :;
    23. do
    24. if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
    25. then tail -f -n +1 log/*taskmanager*.log;
    26. fi;
    27. done"]
    28. ports:
    29. - containerPort: 6122
    30. name: rpc
    31. livenessProbe:
    32. tcpSocket:
    33. port: 6122
    34. initialDelaySeconds: 30
    35. periodSeconds: 60
    36. volumeMounts:
    37. - name: flink-config-volume
    38. mountPath: /opt/flink/conf/
    39. securityContext:
    40. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    41. volumes:
    42. - name: flink-config-volume
    43. configMap:
    44. name: flink-config
    45. items:
    46. - key: flink-conf.yaml
    47. path: flink-conf.yaml
    48. - key: log4j.properties
    49. path: log4j.properties
  2. 创建Flink taskmanager-deployment

    kubectl create -f taskmanager-deployment.yaml
    

2.1.5 通过三种方式访问Flink UI

2.1.5.1 kubectl proxy

  1. 在命令行执行kubectl proxy

  2. 然后在浏览器访问 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

2.1.5.2 kubectl port forward

  1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
  2. Navigate to http://localhost:8081 in your browser.
  3. Moreover, you could use the following command below to submit jobs to the cluster:

获取所有pod

  1. (base) shirukai@shirukaideMacBook-Pro session % kubectl get pods
  2. NAME READY STATUS RESTARTS AGE
  3. flink-jobmanager-6b67975bc6-rshdt 1/1 Running 0 23m
  4. flink-taskmanager-9dbb6dbc4-lhdhr 1/1 Running 0 4m58s
  5. flink-taskmanager-9dbb6dbc4-mx7rn 1/1 Running 0 4m58s
  • 查看jobmanager pod
kubectl describe pod flink-jobmanager-6b67975bc6-rshdt

端口转发

kubectl port-forward flink-jobmanager-6b67975bc6-rshdt 8081:8081

2.1.5.3 NodePort方式

  1. 编辑jobmanager-rest-service.yaml

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager-rest
    5. spec:
    6. type: NodePort
    7. ports:
    8. - name: rest
    9. port: 8081 #Cluster IP 上监听的端口
    10. targetPort: 8081 #Pod监听的端口
    11. nodePort: 30081 #k8s节点上监听的端口
    12. selector:
    13. app: flink
    14. component: jobmanager
  2. 创建 jobmanager-rest-service

    kubectl create -f jobmanager-rest-service.yaml
    
  3. 通过kubectl get svc 查看对外端口

    kubectl get svc
    

    image-20200922105755486

    访问: localhost:30081

2.1.5.4 提交任务到seesion集群

假如本地有flink客户端,可以直接进入bin目录下,使用客户端进行任务提交

 ./flink run -m localhost:30081  ../examples/streaming/WordCount.jar 
  • 1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lxiALPdk-1600850032069)(https://cdn.jsdelivr.net/gh/shirukai/images/20200922133510.png)]

image-20200922133334904

2.1.5.5 停止集群

  1. kubectl delete -f jobmanager-rest-service.yaml
  2. kubectl delete -f jobmanager-service.yaml
  3. kubectl delete -f jobmanager-deployment.yaml
  4. kubectl delete -f taskmanager-deployment.yaml
  5. kubectl delete -f flink-configuration-configmap.yaml

2.2 Flink Per-Job on Kubernetes

Per-Job需要提前在容器内准备好Jar包,有两种方式,一种是重新构建镜像,将用户的Jar包打到镜像里,第二种是通过挂载卷的方式将用户存放Jar的存储,挂载到容器里的/opt/flink/usrlib下。

2.2.1 重新构建镜像

重构镜像也有两种方式,一种是基于源码编译后然后重新构建镜像。第二种是基于官方的镜像,将我们的Jar打进去。

2.2.1.1 基于编码编重新构建对象

源码编译完成之后,进入flink/flink-container/docker目录,执行如下命令进行构建镜像

sh build.sh --from-local-dist --image-name flink-job:1.10.1-source --job-artifacts ../../build-target/examples/streaming/WordCount.jar

2.2.1.2 基于官方镜像重新构建

编写dockerfile

  1. From flink:1.10.1
  2. ADD *.jar /opt/flink/usrlib/

构建镜像

docker build -t flink-job:1.10.1-image .
  • 2.2.1.3 定义k8s资源

flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 1
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. jobmanager.heap.size: 1024m
  15. taskmanager.memory.process.size: 1024m
  16. log4j.properties: |+
  17. log4j.rootLogger=INFO, file
  18. log4j.logger.akka=INFO
  19. log4j.logger.org.apache.kafka=INFO
  20. log4j.logger.org.apache.hadoop=INFO
  21. log4j.logger.org.apache.zookeeper=INFO
  22. log4j.appender.file=org.apache.log4j.FileAppender
  23. log4j.appender.file.file=${log.file}
  24. log4j.appender.file.layout=org.apache.log4j.PatternLayout
  25. log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  26. log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  27. log4j-console.properties: |+
  28. log4j-console.properties: |+
  29. # This affects logging for both user code and Flink
  30. log4j.rootLogger=INFO, console
  31. # Uncomment this if you want to _only_ change Flink's logging
  32. #log4j.logger.org.apache.flink=INFO
  33. # The following lines keep the log level of common libraries/connectors on
  34. # log level INFO. The root logger does not override this. You have to manually
  35. # change the log levels here.
  36. log4j.logger.akka=INFO
  37. log4j.logger.org.apache.kafka=INFO
  38. log4j.logger.org.apache.hadoop=INFO
  39. log4j.logger.org.apache.zookeeper=INFO
  40. # Log all infos to the console
  41. log4j.appender.console=org.apache.log4j.ConsoleAppender
  42. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  43. log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  44. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  45. log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

jobmanager-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

jobmanager-rest-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. nodePort: 30081
  12. selector:
  13. app: flink
  14. component: jobmanager

jobmanager-job-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. # -----
  17. spec:
  18. # restartPolicy: OnFailure
  19. containers:
  20. - name: jobmanager
  21. image: flink-job:1.10.1-image
  22. env:
  23. args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount.WordCount"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
  24. ports:
  25. - containerPort: 6123
  26. name: rpc
  27. - containerPort: 6124
  28. name: blob-server
  29. - containerPort: 8081
  30. name: webui
  31. livenessProbe:
  32. tcpSocket:
  33. port: 6123
  34. initialDelaySeconds: 30
  35. periodSeconds: 60
  36. volumeMounts:
  37. - name: flink-config-volume
  38. mountPath: /opt/flink/conf
  39. # - name: job-artifacts-volume
  40. # mountPath: /opt/flink/usrlib
  41. securityContext:
  42. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  43. volumes:
  44. - name: flink-config-volume
  45. configMap:
  46. name: flink-config
  47. items:
  48. - key: flink-conf.yaml
  49. path: flink-conf.yaml
  50. - key: log4j.properties
  51. path: log4j.properties
  52. - key: log4j-console.properties
  53. path: log4j-console.properties
  54. # - name: job-artifacts-volume
  55. # hostPath:
  56. # path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: flink-job:1.10.1-image
  20. env:
  21. args: ["taskmanager"]
  22. ports:
  23. - containerPort: 6122
  24. name: rpc
  25. - containerPort: 6125
  26. name: query-state
  27. livenessProbe:
  28. tcpSocket:
  29. port: 6122
  30. initialDelaySeconds: 30
  31. periodSeconds: 60
  32. volumeMounts:
  33. - name: flink-config-volume
  34. mountPath: /opt/flink/conf/
  35. #- name: job-artifacts-volume
  36. # mountPath: /opt/flink/usrlib
  37. securityContext:
  38. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  39. volumes:
  40. - name: flink-config-volume
  41. configMap:
  42. name: flink-config
  43. items:
  44. - key: flink-conf.yaml
  45. path: flink-conf.yaml
  46. - key: log4j-console.properties
  47. path: log4j-console.properties
  48. # - name: job-artifacts-volume
  49. # hostPath:
  50. # path: /host/path/to/job/artifacts

2.2.1.4 部署Per-Job集群

  1. kubectl create -f flink-configuration-configmap.yaml
  2. kubectl create -f jobmanager-rest-service.yaml
  3. kubectl create -f jobmanager-service.yaml
  4. kubectl create -f jobmanager-job-deployment.yaml
  5. kubectl create -f taskmanager-job-deployment.yaml

2.2.1.5 停止集群

  1. kubectl delete -f flink-configuration-configmap.yaml
  2. kubectl delete -f jobmanager-rest-service.yaml
  3. kubectl delete -f jobmanager-service.yaml
  4. kubectl delete -f jobmanager-job-deployment.yaml
  5. kubectl delete -f taskmanager-job-deployment.yaml

docker rm $(docker ps -a | grep Exit | awk ‘{print $1}’)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/714915
推荐阅读
相关标签
  

闽ICP备14008679号