当前位置:   article > 正文

k8s之flink的几种创建方式_flink的native kubernetes 模式

flink的native kubernetes 模式

在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库

注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启

一、session 模式

Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。


Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

1.1 Native Kubernetes 模式

Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。

在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl 命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。

1.1.1 构建镜像 Dockerfile

  1. 1.创建dockerfile
  2. FROM flink:1.16.2
  3. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  4. RUN export LANG=zh_CN.UTF-8
  5. 2.开始构建镜像
  6. docker build -t 192.168.20.62:2333/bigdata/flink-session:1.16.2
  7. 3.上传镜像
  8. docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2

1.1.2 创建命名空间和 serviceaccount

  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

1.1.3 创建 flink 集群

  1. ./bin/kubernetes-session.sh \
  2. -Dkubernetes.cluster-id=my-first-flink-cluster \
  3. -Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \
  4. -Dkubernetes.namespace=flink \
  5. -Dkubernetes.jobmanager.service-account=flink-service-account \
  6. -Dkubernetes.rest-service.exposed.type=NodePort

1.1.4 提交任务

  1. ./bin/flink run \
  2. --target kubernetes-session \
  3. -Dkubernetes.cluster-id=my-first-flink-cluster \
  4. -Dkubernetes.namespace=flink \
  5. -Dkubernetes.jobmanager.service-account=flink-service-account \
  6. ./examples/streaming/TopSpeedWindowing.jar \
  7. -Dkubernetes.taskmanager.cpu=2000m \
  8. -Dexternal-resource.limits.kubernetes.cpu=4000m \
  9. -Dexternal-resource.limits.kubernetes.memory=10Gi \
  10. -Dexternal-resource.requests.kubernetes.cpu=2000m \
  11. -Dexternal-resource.requests.kubernetes.memory=8Gi \
  12. -Dkubernetes.taskmanager.cpu=2000m \

1.1.5 删除 flink 集群

  1. kubectl delete deployment/my-first-flink-cluster -n flink
  2. kubectl delete ns flink --force

1.2 Standalone 模式

Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。

1.2.1 创建docker-entrypoint.sh脚本

  1. #!/usr/bin/env bash
  2. ###############################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ###############################################################################
  19. COMMAND_STANDALONE="standalone-job"
  20. COMMAND_HISTORY_SERVER="history-server"
  21. # If unspecified, the hostname of the container is taken as the JobManager address
  22. JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
  23. CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
  24. drop_privs_cmd() {
  25. if [ $(id -u) != 0 ]; then
  26. # Don't need to drop privs if EUID != 0
  27. return
  28. elif [ -x /sbin/su-exec ]; then
  29. # Alpine
  30. echo su-exec admin
  31. else
  32. # Others
  33. echo gosu admin
  34. fi
  35. }
  36. copy_plugins_if_required() {
  37. if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
  38. return 0
  39. fi
  40. echo "Enabling required built-in plugins"
  41. for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
  42. echo "Linking ${target_plugin} to plugin directory"
  43. plugin_name=${target_plugin%.jar}
  44. mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
  45. if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  46. echo "Plugin ${target_plugin} does not exist. Exiting."
  47. exit 1
  48. else
  49. ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  50. echo "Successfully enabled ${target_plugin}"
  51. fi
  52. done
  53. }
  54. set_config_option() {
  55. local option=$1
  56. local value=$2
  57. # escape periods for usage in regular expressions
  58. local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  59. # either override an existing entry, or append a new one
  60. if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
  61. sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  62. else
  63. echo "${option}: ${value}" >> "${CONF_FILE}"
  64. fi
  65. }
  66. prepare_configuration() {
  67. set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
  68. set_config_option blob.server.port 6124
  69. set_config_option query.server.port 6125
  70. if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
  71. set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
  72. fi
  73. if [ -n "${FLINK_PROPERTIES}" ]; then
  74. echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
  75. fi
  76. envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
  77. }
  78. maybe_enable_jemalloc() {
  79. if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
  80. JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
  81. JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
  82. if [ -f "$JEMALLOC_PATH" ]; then
  83. export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
  84. elif [ -f "$JEMALLOC_FALLBACK" ]; then
  85. export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
  86. else
  87. if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
  88. MSG_PATH=$JEMALLOC_PATH
  89. else
  90. MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
  91. fi
  92. echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
  93. fi
  94. fi
  95. }
  96. maybe_enable_jemalloc
  97. copy_plugins_if_required
  98. prepare_configuration
  99. args=("$@")
  100. if [ "$1" = "help" ]; then
  101. printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
  102. printf " Or $(basename "$0") help\n\n"
  103. printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
  104. exit 0
  105. elif [ "$1" = "jobmanager" ]; then
  106. args=("${args[@]:1}")
  107. echo "Starting Job Manager"
  108. exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
  109. elif [ "$1" = ${COMMAND_STANDALONE} ]; then
  110. args=("${args[@]:1}")
  111. echo "Starting Job Manager"
  112. exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
  113. elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
  114. args=("${args[@]:1}")
  115. echo "Starting History Server"
  116. exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
  117. elif [ "$1" = "taskmanager" ]; then
  118. args=("${args[@]:1}")
  119. echo "Starting Task Manager"
  120. exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
  121. fi
  122. args=("${args[@]}")
  123. # Running command in pass-through mode
  124. exec $(drop_privs_cmd) "${args[@]}"

1.2.2 编排 Dockerfile

  1. FROM centos:7.9.2009
  2. USER root
  3. # 安装常用工具
  4. RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
  5. # 设置时区,默认是UTC时区
  6. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  7. RUN mkdir -p /opt/apache
  8. ADD jdk-8u231-linux-x64.tar.gz /opt/apache/
  9. ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/
  10. ENV FLINK_HOME /opt/apache/flink-1.16.2
  11. ENV JAVA_HOME /opt/apache/jdk1.8.0_231
  12. ENV PATH $JAVA_HOME/bin:$PATH
  13. # 创建用户应用jar目录
  14. RUN mkdir $FLINK_HOME/usrlib/
  15. #RUN mkdir home
  16. COPY docker-entrypoint.sh /opt/apache/
  17. RUN chmod +x /opt/apache/docker-entrypoint.sh
  18. RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
  19. RUN chown -R admin:admin /opt/apache
  20. #设置的工作目录
  21. WORKDIR $FLINK_HOME
  22. # 对外暴露端口
  23. EXPOSE 6123 8081
  24. # 执行脚本,构建镜像时不执行,运行实例才会执行
  25. ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
  26. CMD ["help"]

1.2.3 开始构建镜像

  1. docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache
  2. # 上传镜像
  3. docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2

1.2.4 创建命名空间和 serviceaccount

  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

1.2.5 编排 yaml 文件

1.2.5.1  flink-configuration-configmap.yaml
  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 3200m
  16. taskmanager.memory.process.size: 2728m
  17. taskmanager.memory.flink.size: 2280m
  18. parallelism.default: 2
  19. log4j-console.properties: |+
  20. # This affects logging for both user code and Flink
  21. rootLogger.level = INFO
  22. rootLogger.appenderRef.console.ref = ConsoleAppender
  23. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  24. # Uncomment this if you want to _only_ change Flink's logging
  25. #logger.flink.name = org.apache.flink
  26. #logger.flink.level = INFO
  27. # The following lines keep the log level of common libraries/connectors on
  28. # log level INFO. The root logger does not override this. You have to manually
  29. # change the log levels here.
  30. logger.akka.name = akka
  31. logger.akka.level = INFO
  32. logger.kafka.name= org.apache.kafka
  33. logger.kafka.level = INFO
  34. logger.hadoop.name = org.apache.hadoop
  35. logger.hadoop.level = INFO
  36. logger.zookeeper.name = org.apache.zookeeper
  37. logger.zookeeper.level = INFO
  38. # Log all infos to the console
  39. appender.console.name = ConsoleAppender
  40. appender.console.type = CONSOLE
  41. appender.console.layout.type = PatternLayout
  42. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  43. # Log all infos in the given rolling file
  44. appender.rolling.name = RollingFileAppender
  45. appender.rolling.type = RollingFile
  46. appender.rolling.append = false
  47. appender.rolling.fileName = ${sys:log.file}
  48. appender.rolling.filePattern = ${sys:log.file}.%i
  49. appender.rolling.layout.type = PatternLayout
  50. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  51. appender.rolling.policies.type = Policies
  52. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  53. appender.rolling.policies.size.size=100MB
  54. appender.rolling.strategy.type = DefaultRolloverStrategy
  55. appender.rolling.strategy.max = 10
  56. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  57. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  58. logger.netty.level = OFF
1.2.5.2 jobmanager-service.yaml
  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager
1.2.5.3 jobmanager-rest-service.yaml

将 jobmanager rest端口公开为公共 Kubernetes 节点的端口

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. nodePort: 30081
  12. selector:
  13. app: flink
  14. component: jobmanager
1.2.5.4 taskmanager-query-state-service.yaml

公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-taskmanager-query-state
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: query-state
  9. port: 6125
  10. targetPort: 6125
  11. nodePort: 30025
  12. selector:
  13. app: flink
  14. component: taskmanager
1.2.5.5  jobmanager-session-deployment-non-ha.yaml
  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: jobmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: jobmanager
  16. spec:
  17. containers:
  18. - name: jobmanager
  19. image: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
  20. args: ["jobmanager"]
  21. ports:
  22. - containerPort: 6123
  23. name: rpc
  24. - containerPort: 6124
  25. name: blob-server
  26. - containerPort: 8081
  27. name: webui
  28. livenessProbe:
  29. tcpSocket:
  30. port: 6123
  31. initialDelaySeconds: 30
  32. periodSeconds: 60
  33. volumeMounts:
  34. - name: flink-config-volume
  35. mountPath: /opt/apache/flink-1.16.2/conf/
  36. securityContext:
  37. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  38. volumes:
  39. - name: flink-config-volume
  40. configMap:
  41. name: flink-config
  42. items:
  43. - key: flink-conf.yaml
  44. path: flink-conf.yaml
  45. - key: log4j-console.properties
  46. path: log4j-console.properties
1.2.5.6  taskmanager-session-deployment.yaml
  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
  20. args: ["taskmanager"]
  21. ports:
  22. - containerPort: 6122
  23. name: rpc
  24. - containerPort: 6125
  25. name: query-state
  26. livenessProbe:
  27. tcpSocket:
  28. port: 6122
  29. initialDelaySeconds: 30
  30. periodSeconds: 60
  31. volumeMounts:
  32. - name: flink-config-volume
  33. mountPath: /opt/apache/flink-1.16.2/conf/
  34. securityContext:
  35. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  36. volumes:
  37. - name: flink-config-volume
  38. configMap:
  39. name: flink-config
  40. items:
  41. - key: flink-conf.yaml
  42. path: flink-conf.yaml
  43. - key: log4j-console.properties
  44. path: log4j-console.properties

1.2.6 创建 flink 集群

  1. kubectl create ns flink
  2. # Configuration and service definition
  3. kubectl create -f flink-configuration-configmap.yaml -n flink
  4. # service
  5. kubectl create -f jobmanager-service.yaml -n flink
  6. kubectl create -f jobmanager-rest-service.yaml -n flink
  7. kubectl create -f taskmanager-query-state-service.yaml -n flink
  8. # Create the deployments for the cluster
  9. kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
  10. kubectl create -f taskmanager-session-deployment.yaml -n flink

1.2.7 提交任务

./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar

1.2.8 删除集群

  1. kubectl delete -f jobmanager-service.yaml -n flink
  2. kubectl delete -f flink-configuration-configmap.yaml -n flink
  3. kubectl delete -f taskmanager-session-deployment.yaml -n flink
  4. kubectl delete -f jobmanager-session-deployment.yaml -n flink
  5. kubectl delete ns flink --force

二 、application 模式(推荐)

Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件

  • 运行JobManager的应用程序

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

2.1 Native Kubernetes 模式(常用)

2.1.1 构建镜像 Dockerfile

  1. FROM flink:1.16.2
  2. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  3. RUN export LANG=zh_CN.UTF-8
  4. RUN mkdir -p $FLINK_HOME/usrlib
  5. COPY ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/
  6. 开始构建镜像
  7. docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache
  8. docker push 192.168.20.62:2333/bigdata/flink-application:1.16.2

2.1.2 创建命名空间和 serviceacount

  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

2.1.3 创建 flink 集群并提交任务

  1. ./bin/flink run-application \
  2. --target kubernetes-application \
  3. -Dkubernetes.cluster-id=my-first-application-cluster \
  4. -Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \
  5. -Dkubernetes.jobmanager.replicas=1 \
  6. -Dkubernetes.namespace=flink \
  7. -Dkubernetes.jobmanager.service-account=flink-service-account \
  8. -Dexternal-resource.limits.kubernetes.cpu=2000m \
  9. -Dexternal-resource.limits.kubernetes.memory=2Gi \
  10. -Dexternal-resource.requests.kubernetes.cpu=1000m \
  11. -Dexternal-resource.requests.kubernetes.memory=1Gi \
  12. -Dkubernetes.rest-service.exposed.type=NodePort \
  13. local:///opt/flink/usrlib/TopSpeedWindowing.jar

local是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

2.1.4 删除 flink 集群

  1. kubectl delete deployment/my-first-application-cluster -n flink
  2. kubectl delete ns flink --force

2.2 Standalone 模式

在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib

NFS共享存储服务

2.2.1 创建启动脚本docker-entrypoint.sh

  1. #!/usr/bin/env bash
  2. ###############################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ###############################################################################
  19. COMMAND_STANDALONE="standalone-job"
  20. COMMAND_HISTORY_SERVER="history-server"
  21. # If unspecified, the hostname of the container is taken as the JobManager address
  22. JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
  23. CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
  24. drop_privs_cmd() {
  25. if [ $(id -u) != 0 ]; then
  26. # Don't need to drop privs if EUID != 0
  27. return
  28. elif [ -x /sbin/su-exec ]; then
  29. # Alpine
  30. echo su-exec admin
  31. else
  32. # Others
  33. echo gosu admin
  34. fi
  35. }
  36. copy_plugins_if_required() {
  37. if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
  38. return 0
  39. fi
  40. echo "Enabling required built-in plugins"
  41. for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
  42. echo "Linking ${target_plugin} to plugin directory"
  43. plugin_name=${target_plugin%.jar}
  44. mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
  45. if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  46. echo "Plugin ${target_plugin} does not exist. Exiting."
  47. exit 1
  48. else
  49. ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  50. echo "Successfully enabled ${target_plugin}"
  51. fi
  52. done
  53. }
  54. set_config_option() {
  55. local option=$1
  56. local value=$2
  57. # escape periods for usage in regular expressions
  58. local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  59. # either override an existing entry, or append a new one
  60. if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
  61. sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  62. else
  63. echo "${option}: ${value}" >> "${CONF_FILE}"
  64. fi
  65. }
  66. prepare_configuration() {
  67. set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
  68. set_config_option blob.server.port 6124
  69. set_config_option query.server.port 6125
  70. if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
  71. set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
  72. fi
  73. if [ -n "${FLINK_PROPERTIES}" ]; then
  74. echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
  75. fi
  76. envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
  77. }
  78. maybe_enable_jemalloc() {
  79. if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
  80. JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
  81. JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
  82. if [ -f "$JEMALLOC_PATH" ]; then
  83. export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
  84. elif [ -f "$JEMALLOC_FALLBACK" ]; then
  85. export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
  86. else
  87. if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
  88. MSG_PATH=$JEMALLOC_PATH
  89. else
  90. MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
  91. fi
  92. echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
  93. fi
  94. fi
  95. }
  96. maybe_enable_jemalloc
  97. copy_plugins_if_required
  98. prepare_configuration
  99. args=("$@")
  100. if [ "$1" = "help" ]; then
  101. printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
  102. printf " Or $(basename "$0") help\n\n"
  103. printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
  104. exit 0
  105. elif [ "$1" = "jobmanager" ]; then
  106. args=("${args[@]:1}")
  107. echo "Starting Job Manager"
  108. exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
  109. elif [ "$1" = ${COMMAND_STANDALONE} ]; then
  110. args=("${args[@]:1}")
  111. echo "Starting Job Manager"
  112. exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
  113. elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
  114. args=("${args[@]:1}")
  115. echo "Starting History Server"
  116. exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
  117. elif [ "$1" = "taskmanager" ]; then
  118. args=("${args[@]:1}")
  119. echo "Starting Task Manager"
  120. exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
  121. fi
  122. args=("${args[@]}")
  123. # Running command in pass-through mode
  124. exec $(drop_privs_cmd) "${args[@]}"

2.2.2 编排Dockerfile

  1. FROM centos:7.9.2009
  2. USER root
  3. # 安装常用工具
  4. RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
  5. # 设置时区,默认是UTC时区
  6. RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
  7. RUN mkdir -p /opt/apache
  8. ADD jdk-8u231-linux-x64.tar.gz /opt/apache/
  9. ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/
  10. ENV FLINK_HOME /opt/apache/flink-1.16.2
  11. ENV JAVA_HOME /opt/apache/jdk1.8.0_231
  12. ENV PATH $JAVA_HOME/bin:$PATH
  13. # 创建用户应用jar目录
  14. RUN mkdir $FLINK_HOME/usrlib/
  15. #RUN mkdir home
  16. COPY docker-entrypoint.sh /opt/apache/
  17. COPY /
  18. RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
  19. RUN chown -R admin:admin /opt/apache
  20. RUN chmod +x /opt/apache/docker-entrypoint.sh
  21. #设置的工作目录
  22. WORKDIR $FLINK_HOME
  23. # 对外暴露端口
  24. EXPOSE 6123 8081
  25. # 执行脚本,构建镜像时不执行,运行实例才会执行
  26. ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
  27. CMD ["help"]
  28. docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache
  29. docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2

2.2.3 创建命名空间和 serviceacount

  1. # 创建namespace
  2. kubectl create ns flink
  3. # 创建serviceaccount
  4. kubectl create serviceaccount flink-service-account -n flink
  5. # 用户授权
  6. kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

2.2.4 flink-configuration-configmap.yaml

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: flink-config
  5. labels:
  6. app: flink
  7. data:
  8. flink-conf.yaml: |+
  9. jobmanager.rpc.address: flink-jobmanager
  10. taskmanager.numberOfTaskSlots: 2
  11. blob.server.port: 6124
  12. jobmanager.rpc.port: 6123
  13. taskmanager.rpc.port: 6122
  14. queryable-state.proxy.ports: 6125
  15. jobmanager.memory.process.size: 3200m
  16. taskmanager.memory.process.size: 2728m
  17. taskmanager.memory.flink.size: 2280m
  18. parallelism.default: 2
  19. log4j-console.properties: |+
  20. # This affects logging for both user code and Flink
  21. rootLogger.level = INFO
  22. rootLogger.appenderRef.console.ref = ConsoleAppender
  23. rootLogger.appenderRef.rolling.ref = RollingFileAppender
  24. # Uncomment this if you want to _only_ change Flink's logging
  25. #logger.flink.name = org.apache.flink
  26. #logger.flink.level = INFO
  27. # The following lines keep the log level of common libraries/connectors on
  28. # log level INFO. The root logger does not override this. You have to manually
  29. # change the log levels here.
  30. logger.akka.name = akka
  31. logger.akka.level = INFO
  32. logger.kafka.name= org.apache.kafka
  33. logger.kafka.level = INFO
  34. logger.hadoop.name = org.apache.hadoop
  35. logger.hadoop.level = INFO
  36. logger.zookeeper.name = org.apache.zookeeper
  37. logger.zookeeper.level = INFO
  38. # Log all infos to the console
  39. appender.console.name = ConsoleAppender
  40. appender.console.type = CONSOLE
  41. appender.console.layout.type = PatternLayout
  42. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  43. # Log all infos in the given rolling file
  44. appender.rolling.name = RollingFileAppender
  45. appender.rolling.type = RollingFile
  46. appender.rolling.append = false
  47. appender.rolling.fileName = ${sys:log.file}
  48. appender.rolling.filePattern = ${sys:log.file}.%i
  49. appender.rolling.layout.type = PatternLayout
  50. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
  51. appender.rolling.policies.type = Policies
  52. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
  53. appender.rolling.policies.size.size=100MB
  54. appender.rolling.strategy.type = DefaultRolloverStrategy
  55. appender.rolling.strategy.max = 10
  56. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
  57. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
  58. logger.netty.level = OFF

2.2.5 jobmanager-service.yaml

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. type: ClusterIP
  7. ports:
  8. - name: rpc
  9. port: 6123
  10. - name: blob-server
  11. port: 6124
  12. - name: webui
  13. port: 8081
  14. selector:
  15. app: flink
  16. component: jobmanager

2.2.6  jobmanager-rest-service.yaml

将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-jobmanager-rest
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: rest
  9. port: 8081
  10. targetPort: 8081
  11. nodePort: 30081
  12. selector:
  13. app: flink
  14. component: jobmanager

2.2.7 taskmanager-query-state-service.yaml

公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

  1. apiVersion: v1
  2. kind: Service
  3. metadata:
  4. name: flink-taskmanager-query-state
  5. spec:
  6. type: NodePort
  7. ports:
  8. - name: query-state
  9. port: 6125
  10. targetPort: 6125
  11. nodePort: 30025
  12. selector:
  13. app: flink
  14. component: taskmanager

2.2.8 jobmanager-application-non-ha.yaml

  1. apiVersion: batch/v1
  2. kind: Job
  3. metadata:
  4. name: flink-jobmanager
  5. spec:
  6. template:
  7. metadata:
  8. labels:
  9. app: flink
  10. component: jobmanager
  11. spec:
  12. restartPolicy: OnFailure
  13. containers:
  14. - name: jobmanager
  15. image: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
  16. env:
  17. args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"]
  18. ports:
  19. - containerPort: 6123
  20. name: rpc
  21. - containerPort: 6124
  22. name: blob-server
  23. - containerPort: 8081
  24. name: webui
  25. livenessProbe:
  26. tcpSocket:
  27. port: 6123
  28. initialDelaySeconds: 30
  29. periodSeconds: 60
  30. volumeMounts:
  31. - name: flink-config-volume
  32. mountPath: /opt/apache/flink-1.16.2/conf
  33. - name: job-artifacts-volume
  34. mountPath: /opt/apache/flink-1.16.2/usrlib
  35. securityContext:
  36. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  37. volumes:
  38. - name: flink-config-volume
  39. configMap:
  40. name: flink-config
  41. items:
  42. - key: flink-conf.yaml
  43. path: flink-conf.yaml
  44. - key: log4j-console.properties
  45. path: log4j-console.properties
  46. - name: job-artifacts-volume
  47. hostPath:
  48. path: /mnt/bigdata/flink/usrlib

2.2.9 taskmanager-job-deployment.yaml

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: flink-taskmanager
  5. spec:
  6. replicas: 2
  7. selector:
  8. matchLabels:
  9. app: flink
  10. component: taskmanager
  11. template:
  12. metadata:
  13. labels:
  14. app: flink
  15. component: taskmanager
  16. spec:
  17. containers:
  18. - name: taskmanager
  19. image: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
  20. env:
  21. args: ["taskmanager"]
  22. ports:
  23. - containerPort: 6122
  24. name: rpc
  25. - containerPort: 6125
  26. name: query-state
  27. livenessProbe:
  28. tcpSocket:
  29. port: 6122
  30. initialDelaySeconds: 30
  31. periodSeconds: 60
  32. volumeMounts:
  33. - name: flink-config-volume
  34. mountPath: /opt/apache/flink-1.16.2/conf
  35. - name: job-artifacts-volume
  36. mountPath: /opt/apache/flink-1.16.2/usrlib
  37. securityContext:
  38. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
  39. volumes:
  40. - name: flink-config-volume
  41. configMap:
  42. name: flink-config
  43. items:
  44. - key: flink-conf.yaml
  45. path: flink-conf.yaml
  46. - key: log4j-console.properties
  47. path: log4j-console.properties
  48. - name: job-artifacts-volume
  49. hostPath:
  50. path: /mnt/bigdata/flink/usrlib

2.2.10 创建 flink 集群并提交任务

  1. kubectl create ns flink
  2. # Configuration and service definition
  3. kubectl create -f flink-configuration-configmap.yaml -n flink
  4. # service
  5. kubectl create -f jobmanager-service.yaml -n flink
  6. kubectl create -f jobmanager-rest-service.yaml -n flink
  7. kubectl create -f taskmanager-query-state-service.yaml -n flink
  8. # Create the deployments for the cluster
  9. kubectl create -f jobmanager-application-non-ha.yaml -n flink
  10. kubectl create -f taskmanager-job-deployment.yaml -n flink

2.2.11 删除 flink 集群

  1. kubectl delete -f flink-configuration-configmap.yaml -n flink
  2. kubectl delete -f jobmanager-service.yaml -n flink
  3. kubectl delete -f jobmanager-rest-service.yaml -n flink
  4. kubectl delete -f taskmanager-query-state-service.yaml -n flink
  5. kubectl delete -f jobmanager-application-non-ha.yaml -n flink
  6. kubectl delete -f taskmanager-job-deployment.yaml -n flink
  7. kubectl delete ns flink --force

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/649113
推荐阅读
相关标签
  

闽ICP备14008679号