当前位置:   article > 正文

如何在 Kubernetes 中快速部署 Apache Flink 集群

by default, flink image adopts jemalloc as default memory allocator. this be

公众号关注 「奇妙的 Linux 世界」

设为「星标」,每天带你玩转 Linux !

fa8be12626e0b8233698394361ff21d5.png

一、概述

Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。

  • Flink官网:https://flink.apache.org/

  • 不同版本的文档:https://nightlies.apache.org/flink/

  • k8s on flink 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/

  • GitHub地址:https://github.com/apache/flink/tree/release-1.14.6/

二、Flink 运行模式

官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/overview/

FLink on yarn 有三种运行模式:

  • yarn-session模式(Seesion Mode)

  • yarn-cluster模式(Per-Job Mode)

  • Application模式(Application Mode)

093c9216df196d676606848f95991839.png

【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在FLINK-26000中。

三、Flink on k8s实战操作

1158b50367b6a7fd2ed1269bd7143d86.png

1)flink下载

下载地址:https://flink.apache.org/downloads.html

wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

2)构建基础镜像

  1. docker pull apache/flink:1.14.6-scala_2.12
  2. docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
  3. docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

3)session模式

Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。Kubernetes 中的Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署

  • TaskManagers池的部署

  • 暴露JobManager 的REST 和 UI 端口的服务

1、Native Kubernetes 模式

参数配置:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace

【1】构建镜像Dockerfile
  1. FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
  2. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  3. RUN export LANG=zh_CN.UTF-8

开始构建镜像

  1. docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache
  2. # 上传镜像
  3. docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
【2】创建命名空间和serviceaccount
  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建flink集群
  1. ./bin/kubernetes-session.sh \
  2.  -Dkubernetes.cluster-id=my-first-flink-cluster  \
  3.  -Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
  4.  -Dkubernetes.namespace=flink \
  5.  -Dkubernetes.jobmanager.service-account=flink-service-account \
  6.  -Dkubernetes.rest-service.exposed.type=NodePort

4297ff5401b7aa4539a22b896b3f1af1.png4caa5f338d530417d41280cd2ee3995d.png

【4】提交任务
  1. ./bin/flink run \
  2.     --target kubernetes-session \
  3.     -Dkubernetes.cluster-id=my-first-flink-cluster \
  4.     -Dkubernetes.namespace=flink \
  5.     -Dkubernetes.jobmanager.service-account=flink-service-account \
  6.     ./examples/streaming/TopSpeedWindowing.jar
  7.     
  8.     
  9.     #   参数配置
  10.     ./examples/streaming/WordCount.jar
  11.     -Dkubernetes.taskmanager.cpu=2000m \
  12.     -Dexternal-resource.limits.kubernetes.cpu=4000m \
  13.  -Dexternal-resource.limits.kubernetes.memory=10Gi \
  14.  -Dexternal-resource.requests.kubernetes.cpu=2000m \
  15.  -Dexternal-resource.requests.kubernetes.memory=8Gi \
  16.  -Dkubernetes.taskmanager.cpu=2000m \

【温馨提示】注意jdk版本,目前jdk8是正常的。

e4be7f27af8bb49aa6102323c9e4b476.png
【5】查看
  1. kubectl get pods -n flink
  2. kubectl logs -f my-first-flink-cluster-taskmanager-1-1
ca2f5e22304b4a7d74186c15d3f112b3.png 08ec40c4c6d2c1be60d72af2acec73d4.png
【6】删除flink集群
  1. kubectl delete deployment/my-first-flink-cluster -n flink
  2. kubectl delete ns flink --force
2、Standalone模式
【1】构建镜像

默认用户是flink用户,这里我换成admin,根据企业需要更换用户,脚本可以通过上面运行的pod拿到。

启动脚本 docker-entrypoint.sh

  1. #!/usr/bin/env bash
  2. ###############################################################################
  3. #  Licensed to the Apache Software Foundation (ASF) under one
  4. #  or more contributor license agreements.  See the NOTICE file
  5. #  distributed with this work for additional information
  6. #  regarding copyright ownership.  The ASF licenses this file
  7. #  to you under the Apache License, Version 2.0 (the
  8. #  "License"); you may not use this file except in compliance
  9. #  with the License.  You may obtain a copy of the License at
  10. #
  11. #      http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. #  Unless required by applicable law or agreed to in writing, software
  14. #  distributed under the License is distributed on an "AS IS" BASIS,
  15. #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. #  See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ###############################################################################
  19. COMMAND_STANDALONE="standalone-job"
  20. COMMAND_HISTORY_SERVER="history-server"
  21. # If unspecified, the hostname of the container is taken as the JobManager address
  22. JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
  23. CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
  24. drop_privs_cmd() {
  25.     if [ $(id -u) != 0 ]; then
  26.         # Don't need to drop privs if EUID != 0
  27.         return
  28.     elif [ -x /sbin/su-exec ]; then
  29.         # Alpine
  30.         echo su-exec admin
  31.     else
  32.         # Others
  33.         echo gosu admin
  34.     fi
  35. }
  36. copy_plugins_if_required() {
  37.   if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
  38.     return 0
  39.   fi
  40.   echo "Enabling required built-in plugins"
  41.   for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
  42.     echo "Linking ${target_plugin} to plugin directory"
  43.     plugin_name=${target_plugin%.jar}
  44.     mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
  45.     if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  46.       echo "Plugin ${target_plugin} does not exist. Exiting."
  47.       exit 1
  48.     else
  49.       ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  50.       echo "Successfully enabled ${target_plugin}"
  51.     fi
  52.   done
  53. }
  54. set_config_option() {
  55.   local option=$1
  56.   local value=$2
  57.   # escape periods for usage in regular expressions
  58.   local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  59.   # either override an existing entry, or append a new one
  60.   if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
  61.         sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  62.   else
  63.         echo "${option}: ${value}" >> "${CONF_FILE}"
  64.   fi
  65. }
  66. prepare_configuration() {
  67.     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
  68.     set_config_option blob.server.port 6124
  69.     set_config_option query.server.port 6125
  70.     if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
  71.         set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
  72.     fi
  73.     if [ -n "${FLINK_PROPERTIES}" ]; then
  74.         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
  75.     fi
  76.     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
  77. }
  78. maybe_enable_jemalloc() {
  79.     if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
  80.         JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
  81.         JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
  82.         if [ -f "$JEMALLOC_PATH" ]; then
  83.             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
  84.         elif [ -f "$JEMALLOC_FALLBACK" ]; then
  85.             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
  86.         else
  87.             if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
  88.                 MSG_PATH=$JEMALLOC_PATH
  89.             else
  90.                 MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
  91.             fi
  92.             echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
  93.         fi
  94.     fi
  95. }
  96. maybe_enable_jemalloc
  97. copy_plugins_if_required
  98. prepare_configuration
  99. args=("$@")
  100. if [ "$1" = "help" ]; then
  101.     printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
  102.     printf "    Or $(basename "$0") help\n\n"
  103.     printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
  104.     exit 0
  105. elif [ "$1" = "jobmanager" ]; then
  106.     args=("${args[@]:1}")
  107.     echo "Starting Job Manager"
  108.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
  109. elif [ "$1" = ${COMMAND_STANDALONE} ]; then
  110.     args=("${args[@]:1}")
  111.     echo "Starting Job Manager"
  112.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
  113. elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
  114.     args=("${args[@]:1}")
  115.     echo "Starting History Server"
  116.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
  117. elif [ "$1" = "taskmanager" ]; then
  118.     args=("${args[@]:1}")
  119.     echo "Starting Task Manager"
  120.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
  121. fi
  122. args=("${args[@]}")
  123. # Running command in pass-through mode
  124. exec $(drop_privs_cmd) "${args[@]}"

编排Dockerfile

  1. FROM myharbor.com/bigdata/centos:7.9.2009
  2. USER root
  3. # 安装常用工具
  4. RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
  5. # 设置时区,默认是UTC时区
  6. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  7. RUN mkdir -p /opt/apache
  8. ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
  9. ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/
  10. ENV FLINK_HOME /opt/apache/flink-1.14.6
  11. ENV JAVA_HOME /opt/apache/jdk1.8.0_212
  12. ENV PATH $JAVA_HOME/bin:$PATH
  13. # 创建用户应用jar目录
  14. RUN mkdir $FLINK_HOME/usrlib/
  15. #RUN mkdir home
  16. COPY docker-entrypoint.sh /opt/apache/
  17. RUN chmod +x /opt/apache/docker-entrypoint.sh
  18. RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
  19. RUN chown -R admin:admin /opt/apache
  20. #设置的工作目录
  21. WORKDIR $FLINK_HOME
  22. # 对外暴露端口
  23. EXPOSE 6123 8081
  24. # 执行脚本,构建镜像时不执行,运行实例才会执行
  25. ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
  26. CMD ["help"]

开始构建镜像

  1. docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
  2. # 上传镜像
  3. docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  4. # 删除镜像
  5. docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  6. crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
【2】创建命名空间和serviceaccount
  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排yaml文件
  • flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4.   name: flink-config
  5.   labels:
  6.     app: flink
  7. data:
  8.   flink-conf.yaml: |+
  9.     jobmanager.rpc.address: flink-jobmanager
  10.     taskmanager.numberOfTaskSlots: 2
  11.     blob.server.port: 6124
  12.     jobmanager.rpc.port: 6123
  13.     taskmanager.rpc.port: 6122
  14.     queryable-state.proxy.ports: 6125
  15.     jobmanager.memory.process.size: 3200m
  16.     taskmanager.memory.process.size: 2728m
  17.     taskmanager.memory.flink.size: 2280m
  18.     parallelism.default2    
  19.   log4j-console.properties: |+
  20.     # This affects logging for both user code and Flink
  21.     rootLogger.level = INFO
  22.     rootLogger.appenderRef.console.ref = ConsoleAppender
  23.     rootLogger.appenderRef.rolling.ref = RollingFileAppender
  24.     # Uncomment this if you want to _only_ change Flink's logging
  25.     #logger.flink.name = org.apache.flink
  26.     #logger.flink.level = INFO
  27.     # The following lines keep the log level of common libraries/connectors on
  28.     # log level INFO. The root logger does not override this. You have to manually
  29.     # change the log levels here.
  30.     logger.akka.name = akka
  31.     logger.akka.level = INFO
  32.     logger.kafka.name= org.apache.kafka
  33.     logger.kafka.level = INFO
  34.     logger.hadoop.name = org.apache.hadoop
  35.     logger.hadoop.level = INFO
  36.     logger.zookeeper.name = org.apache.zookeeper
  37.     logger.zookeeper.level = INFO
  38.     # Log all infos to the console
  39.     appender.console.name = ConsoleAppender
  40.     appender.console.type = CONSOLE
  41.     appender.console.layout.type = PatternLayout
  42.     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  43.     # Log all infos in the given rolling file
  44.     appender.rolling.name = RollingFileAppender
  45.     appender.rolling.type = RollingFile
  46.     appender.rolling.append = false
  47.     appender.rolling.fileName = ${sys:log.file}
  48.     appender.rolling.filePattern = ${sys:log.file}.%i
  49.     appender.rolling.layout.type = PatternLayout
  50.     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  51.     appender.rolling.policies.type = Policies
  52.     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  53.     appender.rolling.policies.size.size=100MB
  54.     appender.rolling.strategy.type = DefaultRolloverStrategy
  55.     appender.rolling.strategy.max = 10
  56.     # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  57.     logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  58.     logger.netty.level = OFF
  • jobmanager-service.yaml可选服务,仅非 HA 模式需要。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: flink-jobmanager
  5. spec:
  6.   type: ClusterIP
  7.   ports:
  8.   - name: rpc
  9.     port: 6123
  10.   - name: blob-server
  11.     port: 6124
  12.   - name: webui
  13.     port: 8081
  14.   selector:
  15.     app: flink
  16.     component: jobmanager
  • jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: flink-jobmanager-rest
  5. spec:
  6.   type: NodePort
  7.   ports:
  8.   - name: rest
  9.     port: 8081
  10.     targetPort: 8081
  11.     nodePort: 30081
  12.   selector:
  13.     app: flink
  14.     component: jobmanager
  • taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: flink-taskmanager-query-state
  5. spec:
  6.   type: NodePort
  7.   ports:
  8.   - name: query-state
  9.     port: 6125
  10.     targetPort: 6125
  11.     nodePort: 30025
  12.   selector:
  13.     app: flink
  14.     component: taskmanager

以上几个配置文件是公共的

  • jobmanager-session-deployment-non-ha.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4.   name: flink-jobmanager
  5. spec:
  6.   replicas: 1
  7.   selector:
  8.     matchLabels:
  9.       app: flink
  10.       component: jobmanager
  11.   template:
  12.     metadata:
  13.       labels:
  14.         app: flink
  15.         component: jobmanager
  16.     spec:
  17.       containers:
  18.       - name: jobmanager
  19.         image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  20.         args: ["jobmanager"]
  21.         ports:
  22.         - containerPort: 6123
  23.           name: rpc
  24.         - containerPort: 6124
  25.           name: blob-server
  26.         - containerPort: 8081
  27.           name: webui
  28.         livenessProbe:
  29.           tcpSocket:
  30.             port: 6123
  31.           initialDelaySeconds: 30
  32.           periodSeconds: 60
  33.         volumeMounts:
  34.         - name: flink-config-volume
  35.           mountPath: /opt/apache/flink-1.14.6/conf/
  36.         securityContext:
  37.           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
  38.       volumes:
  39.       - name: flink-config-volume
  40.         configMap:
  41.           name: flink-config
  42.           items:
  43.           - key: flink-conf.yaml
  44.             path: flink-conf.yaml
  45.           - key: log4j-console.properties
  46.             path: log4j-console.properties
  • taskmanager-session-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4.   name: flink-taskmanager
  5. spec:
  6.   replicas: 2
  7.   selector:
  8.     matchLabels:
  9.       app: flink
  10.       component: taskmanager
  11.   template:
  12.     metadata:
  13.       labels:
  14.         app: flink
  15.         component: taskmanager
  16.     spec:
  17.       containers:
  18.       - name: taskmanager
  19.         image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  20.         args: ["taskmanager"]
  21.         ports:
  22.         - containerPort: 6122
  23.           name: rpc
  24.         - containerPort: 6125
  25.           name: query-state
  26.         livenessProbe:
  27.           tcpSocket:
  28.             port: 6122
  29.           initialDelaySeconds: 30
  30.           periodSeconds: 60
  31.         volumeMounts:
  32.         - name: flink-config-volume
  33.           mountPath: /opt/apache/flink-1.14.6/conf/
  34.         securityContext:
  35.           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
  36.       volumes:
  37.       - name: flink-config-volume
  38.         configMap:
  39.           name: flink-config
  40.           items:
  41.           - key: flink-conf.yaml
  42.             path: flink-conf.yaml
  43.           - key: log4j-console.properties
  44.             path: log4j-console.properties
【4】创建flink集群
  1. kubectl create ns flink
  2. # Configuration and service definition
  3. kubectl create -f flink-configuration-configmap.yaml -n flink
  4. # service
  5. kubectl create -f jobmanager-service.yaml -n flink
  6. kubectl create -f jobmanager-rest-service.yaml -n flink
  7. kubectl create -f taskmanager-query-state-service.yaml -n flink
  8. # Create the deployments for the cluster
  9. kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
  10. kubectl create -f taskmanager-session-deployment.yaml -n flink

镜像逆向解析dockerfile

  1. alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
  2. whaler flink:1.14.6-scala_2.12

查看

kubectl get pods,svc -n flink -owide
c00a9266761d19b61cbd7ae7f0ed8710.png

web:http://192.168.182.110:30081/#/overview

79fc076b905edbbc09bdc499c5b6ed08.png
【5】提交任务
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
c1b92bdee02f0ca74b72d81f9b0aeac9.png
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
228724d7c09dbae9983562024f51a5e8.png 9fb63fb17f08afdadb51494f20f0197f.png
【6】删除flink集群
  1. kubectl delete -f jobmanager-service.yaml -n flink
  2. kubectl delete -f flink-configuration-configmap.yaml -n flink
  3. kubectl delete -f taskmanager-session-deployment.yaml -n flink
  4. kubectl delete -f jobmanager-session-deployment.yaml -n flink
  5. kubectl delete ns flink --force
【7】访问flink web

端口就是jobmanager-rest-service.yaml文件中的NodePort

http://192.168.182.110:30081/#/overview

959e3bf27d1fe467689486f713398873.png

4)application模式(推荐)

Kubernetes 中一个基本的Flink Application 集群部署包含三个组件:

  • 运行JobManager的应用程序

  • TaskManagers池的部署

  • 暴露JobManager 的REST 和 UI 端口的服务

1、Native Kubernetes 模式(常用)
【1】构建镜像Dockerfile
  1. FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
  2. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  3. RUN export LANG=zh_CN.UTF-8
  4. RUN mkdir -p $FLINK_HOME/usrlib
  5. COPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/

开始构建镜像

  1. docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
  2. # 上传镜像
  3. docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
  4. # 删除镜像
  5. docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
  6. crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
【2】创建命名空间和serviceacount
  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建flink集群并提交任务
  1. ./bin/flink run-application \
  2.     --target kubernetes-application \
  3.     -Dkubernetes.cluster-id=my-first-application-cluster  \
  4.  -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
  5.  -Dkubernetes.jobmanager.replicas=1 \
  6.  -Dkubernetes.namespace=flink \
  7.  -Dkubernetes.jobmanager.service-account=flink-service-account \
  8.  -Dexternal-resource.limits.kubernetes.cpu=2000m \
  9.  -Dexternal-resource.limits.kubernetes.memory=2Gi \
  10.  -Dexternal-resource.requests.kubernetes.cpu=1000m \
  11.  -Dexternal-resource.requests.kubernetes.memory=1Gi \
  12.  -Dkubernetes.rest-service.exposed.type=NodePort \
  13.  local:///opt/flink/usrlib/TopSpeedWindowing.jar

【注意】 local是应用模式中唯一支持的方案。local代表本地环境,这里即pod或者容器环境,并非宿主机。

查看

kubectl get pods pods,svc -n flink
017944081f664276f653fd4988d06db1.png
kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink
2287b4392d31caa16cc368a53bef657e.png a3c96ec0789f41e2df1f2a624ad09a59.png 11014d7c985a0b53a2f74375a857380e.png
【4】删除flink集群
  1. kubectl delete deployment/my-first-application-cluster -n flink
  2. kubectl delete ns flink --force
2、Standalone模式
【1】构建镜像 Dockerfile

启动脚本 docker-entrypoint.sh

  1. #!/usr/bin/env bash
  2. ###############################################################################
  3. #  Licensed to the Apache Software Foundation (ASF) under one
  4. #  or more contributor license agreements.  See the NOTICE file
  5. #  distributed with this work for additional information
  6. #  regarding copyright ownership.  The ASF licenses this file
  7. #  to you under the Apache License, Version 2.0 (the
  8. #  "License"); you may not use this file except in compliance
  9. #  with the License.  You may obtain a copy of the License at
  10. #
  11. #      http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. #  Unless required by applicable law or agreed to in writing, software
  14. #  distributed under the License is distributed on an "AS IS" BASIS,
  15. #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. #  See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ###############################################################################
  19. COMMAND_STANDALONE="standalone-job"
  20. COMMAND_HISTORY_SERVER="history-server"
  21. # If unspecified, the hostname of the container is taken as the JobManager address
  22. JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
  23. CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
  24. drop_privs_cmd() {
  25.     if [ $(id -u) != 0 ]; then
  26.         # Don't need to drop privs if EUID != 0
  27.         return
  28.     elif [ -x /sbin/su-exec ]; then
  29.         # Alpine
  30.         echo su-exec admin
  31.     else
  32.         # Others
  33.         echo gosu admin
  34.     fi
  35. }
  36. copy_plugins_if_required() {
  37.   if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
  38.     return 0
  39.   fi
  40.   echo "Enabling required built-in plugins"
  41.   for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
  42.     echo "Linking ${target_plugin} to plugin directory"
  43.     plugin_name=${target_plugin%.jar}
  44.     mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
  45.     if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  46.       echo "Plugin ${target_plugin} does not exist. Exiting."
  47.       exit 1
  48.     else
  49.       ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  50.       echo "Successfully enabled ${target_plugin}"
  51.     fi
  52.   done
  53. }
  54. set_config_option() {
  55.   local option=$1
  56.   local value=$2
  57.   # escape periods for usage in regular expressions
  58.   local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  59.   # either override an existing entry, or append a new one
  60.   if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
  61.         sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  62.   else
  63.         echo "${option}: ${value}" >> "${CONF_FILE}"
  64.   fi
  65. }
  66. prepare_configuration() {
  67.     set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
  68.     set_config_option blob.server.port 6124
  69.     set_config_option query.server.port 6125
  70.     if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
  71.         set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
  72.     fi
  73.     if [ -n "${FLINK_PROPERTIES}" ]; then
  74.         echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
  75.     fi
  76.     envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
  77. }
  78. maybe_enable_jemalloc() {
  79.     if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
  80.         JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
  81.         JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
  82.         if [ -f "$JEMALLOC_PATH" ]; then
  83.             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
  84.         elif [ -f "$JEMALLOC_FALLBACK" ]; then
  85.             export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
  86.         else
  87.             if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
  88.                 MSG_PATH=$JEMALLOC_PATH
  89.             else
  90.                 MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
  91.             fi
  92.             echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
  93.         fi
  94.     fi
  95. }
  96. maybe_enable_jemalloc
  97. copy_plugins_if_required
  98. prepare_configuration
  99. args=("$@")
  100. if [ "$1" = "help" ]; then
  101.     printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
  102.     printf "    Or $(basename "$0") help\n\n"
  103.     printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
  104.     exit 0
  105. elif [ "$1" = "jobmanager" ]; then
  106.     args=("${args[@]:1}")
  107.     echo "Starting Job Manager"
  108.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
  109. elif [ "$1" = ${COMMAND_STANDALONE} ]; then
  110.     args=("${args[@]:1}")
  111.     echo "Starting Job Manager"
  112.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
  113. elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
  114.     args=("${args[@]:1}")
  115.     echo "Starting History Server"
  116.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
  117. elif [ "$1" = "taskmanager" ]; then
  118.     args=("${args[@]:1}")
  119.     echo "Starting Task Manager"
  120.     exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
  121. fi
  122. args=("${args[@]}")
  123. # Running command in pass-through mode
  124. exec $(drop_privs_cmd) "${args[@]}"

编排Dockerfile

  1. FROM myharbor.com/bigdata/centos:7.9.2009
  2. USER root
  3. # 安装常用工具
  4. RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
  5. # 设置时区,默认是UTC时区
  6. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  7. RUN mkdir -p /opt/apache
  8. ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
  9. ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/
  10. ENV FLINK_HOME /opt/apache/flink-1.14.6
  11. ENV JAVA_HOME /opt/apache/jdk1.8.0_212
  12. ENV PATH $JAVA_HOME/bin:$PATH
  13. # 创建用户应用jar目录
  14. RUN mkdir $FLINK_HOME/usrlib/
  15. #RUN mkdir home
  16. COPY docker-entrypoint.sh /opt/apache/
  17. RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
  18. RUN chown -R admin:admin /opt/apache
  19. RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh
  20. #设置的工作目录
  21. WORKDIR $FLINK_HOME
  22. # 对外暴露端口
  23. EXPOSE 6123 8081
  24. # 执行脚本,构建镜像时不执行,运行实例才会执行
  25. ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
  26. CMD ["help"]
  27. docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
  28. # 上传镜像
  29. docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  30. # 删除镜像
  31. docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
【2】创建命名空间和 serviceacount
  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排yaml文件
  1. flink-configuration-configmap.yaml
  2. apiVersion: v1
  3. kind: ConfigMap
  4. metadata:
  5.   name: flink-config
  6.   labels:
  7.     app: flink
  8. data:
  9.   flink-conf.yaml: |+
  10.     jobmanager.rpc.address: flink-jobmanager
  11.     taskmanager.numberOfTaskSlots: 2
  12.     blob.server.port: 6124
  13.     jobmanager.rpc.port: 6123
  14.     taskmanager.rpc.port: 6122
  15.     queryable-state.proxy.ports: 6125
  16.     jobmanager.memory.process.size: 3200m
  17.     taskmanager.memory.process.size: 2728m
  18.     taskmanager.memory.flink.size: 2280m
  19.     parallelism.default2    
  20.   log4j-console.properties: |+
  21.     # This affects logging for both user code and Flink
  22.     rootLogger.level = INFO
  23.     rootLogger.appenderRef.console.ref = ConsoleAppender
  24.     rootLogger.appenderRef.rolling.ref = RollingFileAppender
  25.     # Uncomment this if you want to _only_ change Flink's logging
  26.     #logger.flink.name = org.apache.flink
  27.     #logger.flink.level = INFO
  28.     # The following lines keep the log level of common libraries/connectors on
  29.     # log level INFO. The root logger does not override this. You have to manually
  30.     # change the log levels here.
  31.     logger.akka.name = akka
  32.     logger.akka.level = INFO
  33.     logger.kafka.name= org.apache.kafka
  34.     logger.kafka.level = INFO
  35.     logger.hadoop.name = org.apache.hadoop
  36.     logger.hadoop.level = INFO
  37.     logger.zookeeper.name = org.apache.zookeeper
  38.     logger.zookeeper.level = INFO
  39.     # Log all infos to the console
  40.     appender.console.name = ConsoleAppender
  41.     appender.console.type = CONSOLE
  42.     appender.console.layout.type = PatternLayout
  43.     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  44.     # Log all infos in the given rolling file
  45.     appender.rolling.name = RollingFileAppender
  46.     appender.rolling.type = RollingFile
  47.     appender.rolling.append = false
  48.     appender.rolling.fileName = ${sys:log.file}
  49.     appender.rolling.filePattern = ${sys:log.file}.%i
  50.     appender.rolling.layout.type = PatternLayout
  51.     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  52.     appender.rolling.policies.type = Policies
  53.     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  54.     appender.rolling.policies.size.size=100MB
  55.     appender.rolling.strategy.type = DefaultRolloverStrategy
  56.     appender.rolling.strategy.max = 10
  57.     # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  58.     logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  59.     logger.netty.level = OFF

jobmanager-service.yaml可选服务,仅非 HA 模式需要。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: flink-jobmanager
  5. spec:
  6.   type: ClusterIP
  7.   ports:
  8.   - name: rpc
  9.     port: 6123
  10.   - name: blob-server
  11.     port: 6124
  12.   - name: webui
  13.     port: 8081
  14.   selector:
  15.     app: flink
  16.     component: jobmanager

jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: flink-jobmanager-rest
  5. spec:
  6.   type: NodePort
  7.   ports:
  8.   - name: rest
  9.     port: 8081
  10.     targetPort: 8081
  11.     nodePort: 30081
  12.   selector:
  13.     app: flink
  14.     component: jobmanager

taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4.   name: flink-taskmanager-query-state
  5. spec:
  6.   type: NodePort
  7.   ports:
  8.   - name: query-state
  9.     port: 6125
  10.     targetPort: 6125
  11.     nodePort: 30025
  12.   selector:
  13.     app: flink
  14.     component: taskmanager

jobmanager-application-non-ha.yaml ,非高可用

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4.   name: flink-jobmanager
  5. spec:
  6.   template:
  7.     metadata:
  8.       labels:
  9.         app: flink
  10.         component: jobmanager
  11.     spec:
  12.       restartPolicy: OnFailure
  13.       containers:
  14.         - name: jobmanager
  15.           image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  16.           env:
  17.           args: ["standalone-job""--job-classname""org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]
  18.           ports:
  19.             - containerPort: 6123
  20.               name: rpc
  21.             - containerPort: 6124
  22.               name: blob-server
  23.             - containerPort: 8081
  24.               name: webui
  25.           livenessProbe:
  26.             tcpSocket:
  27.               port: 6123
  28.             initialDelaySeconds: 30
  29.             periodSeconds: 60
  30.           volumeMounts:
  31.             - name: flink-config-volume
  32.               mountPath: /opt/apache/flink-1.14.6/conf
  33.             - name: job-artifacts-volume
  34.               mountPath: /opt/apache/flink-1.14.6/usrlib
  35.           securityContext:
  36.             runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
  37.       volumes:
  38.         - name: flink-config-volume
  39.           configMap:
  40.             name: flink-config
  41.             items:
  42.               - key: flink-conf.yaml
  43.                 path: flink-conf.yaml
  44.               - key: log4j-console.properties
  45.                 path: log4j-console.properties
  46.         - name: job-artifacts-volume
  47.           hostPath:
  48.             path: /mnt/nfsdata/flink/application/job-artifacts

【温馨提示】注意这里的挂载/mnt/bigdata/flink/usrlib,最好这里使用共享目录。

  1. taskmanager-job-deployment.yaml
  2. apiVersion: apps/v1
  3. kind: Deployment
  4. metadata:
  5.   name: flink-taskmanager
  6. spec:
  7.   replicas: 2
  8.   selector:
  9.     matchLabels:
  10.       app: flink
  11.       component: taskmanager
  12.   template:
  13.     metadata:
  14.       labels:
  15.         app: flink
  16.         component: taskmanager
  17.     spec:
  18.       containers:
  19.       - name: taskmanager
  20.         image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  21.         env:
  22.         args: ["taskmanager"]
  23.         ports:
  24.         - containerPort: 6122
  25.           name: rpc
  26.         - containerPort: 6125
  27.           name: query-state
  28.         livenessProbe:
  29.           tcpSocket:
  30.             port: 6122
  31.           initialDelaySeconds: 30
  32.           periodSeconds: 60
  33.         volumeMounts:
  34.         - name: flink-config-volume
  35.           mountPath: /opt/apache/flink-1.14.6/conf
  36.         - name: job-artifacts-volume
  37.           mountPath: /opt/apache/flink-1.14.6/usrlib
  38.         securityContext:
  39.           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
  40.       volumes:
  41.       - name: flink-config-volume
  42.         configMap:
  43.           name: flink-config
  44.           items:
  45.           - key: flink-conf.yaml
  46.             path: flink-conf.yaml
  47.           - key: log4j-console.properties
  48.             path: log4j-console.properties
  49.       - name: job-artifacts-volume
  50.         hostPath:
  51.           path: /mnt/nfsdata/flink/application/job-artifacts
【4】创建flink集群并提交任务
  1. kubectl create ns flink
  2. # Configuration and service definition
  3. kubectl create -f flink-configuration-configmap.yaml -n flink
  4. # service
  5. kubectl create -f jobmanager-service.yaml -n flink
  6. kubectl create -f jobmanager-rest-service.yaml -n flink
  7. kubectl create -f taskmanager-query-state-service.yaml -n flink
  8. # Create the deployments for the cluster
  9. kubectl create -f  jobmanager-application-non-ha.yaml -n flink
  10. kubectl create -f  taskmanager-job-deployment.yaml -n flink

查看

kubectl get pods,svc -n flink
56dee8c4cc903a0a7e95ec3815096d04.png
【5】删除flink集群
  1. kubectl delete -f flink-configuration-configmap.yaml -n flink
  2. kubectl delete -f jobmanager-service.yaml -n flink
  3. kubectl delete -f jobmanager-rest-service.yaml -n flink
  4. kubectl delete -f taskmanager-query-state-service.yaml -n flink
  5. kubectl delete -f jobmanager-application-non-ha.yaml -n flink
  6. kubectl delete -f taskmanager-job-deployment.yaml -n flink
  7. kubectl delete ns flink --force
【6】查看
  1. kubectl get pods,svc -n flink
  2. kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
7b54feb75aa6a621aff733e5fc708363.png

Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享【云原生+大数据】相关的教程,请小伙伴耐心等待~

本文转自大数据老司机,原文:https://www.cnblogs.com/liugp/p/16755095.html,版权归原作者所有。

fe6238424bedb7c1ca3f00e63d2396ce.gif

最近,我们建立了一个技术交流微信群。目前群里已加入了不少行业内的大神,有兴趣的同学可以加入和我们一起交流技术,在 「奇妙的 Linux 世界」 公众号直接回复 「加群」 邀请你入群。

00e59df099a3648feed9bacbde93ea2a.png

你可能还喜欢

点击下方图片即可阅读

503dc3530a411c15c686f99fe522fbf4.png

万字干货,一文搞懂 Elasticsearch 监控

2976125d8e5afbeeee19697e385977df.png
点击上方图片,『美团|饿了么』外卖红包天天免费领

dde8d107bd50324b53eb985ac870db7a.png

更多有趣的互联网新鲜事,关注「奇妙的互联网」视频号全了解!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/558654
推荐阅读
相关标签
  

闽ICP备14008679号