赞
踩
公众号关注 「奇妙的 Linux 世界」
设为「星标」,每天带你玩转 Linux !
Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
Flink官网:https://flink.apache.org/
不同版本的文档:https://nightlies.apache.org/flink/
k8s on flink 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/
GitHub地址:https://github.com/apache/flink/tree/release-1.14.6/
官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/overview/
FLink on yarn 有三种运行模式:
yarn-session模式(Seesion Mode)
yarn-cluster模式(Per-Job Mode)
Application模式(Application Mode)
【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在FLINK-26000中。
下载地址:https://flink.apache.org/downloads.html
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
- docker pull apache/flink:1.14.6-scala_2.12
- docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
- docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12
Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。Kubernetes 中的Flink Session 集群部署至少包含三个组件:
运行JobManager
的部署
TaskManagers
池的部署
暴露JobManager
的REST 和 UI 端口的服务
参数配置:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace
- FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
- RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
- RUN export LANG=zh_CN.UTF-8
开始构建镜像
- docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache
-
- # 上传镜像
- docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
- # 创建namespace
- kubectl create ns flink
- # 创建serviceaccount
- kubectl create serviceaccount flink-service-account -n flink
- # 用户授权
- kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- ./bin/kubernetes-session.sh \
- -Dkubernetes.cluster-id=my-first-flink-cluster \
- -Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
- -Dkubernetes.namespace=flink \
- -Dkubernetes.jobmanager.service-account=flink-service-account \
- -Dkubernetes.rest-service.exposed.type=NodePort
- ./bin/flink run \
- --target kubernetes-session \
- -Dkubernetes.cluster-id=my-first-flink-cluster \
- -Dkubernetes.namespace=flink \
- -Dkubernetes.jobmanager.service-account=flink-service-account \
- ./examples/streaming/TopSpeedWindowing.jar
-
-
- # 参数配置
- ./examples/streaming/WordCount.jar
- -Dkubernetes.taskmanager.cpu=2000m \
- -Dexternal-resource.limits.kubernetes.cpu=4000m \
- -Dexternal-resource.limits.kubernetes.memory=10Gi \
- -Dexternal-resource.requests.kubernetes.cpu=2000m \
- -Dexternal-resource.requests.kubernetes.memory=8Gi \
- -Dkubernetes.taskmanager.cpu=2000m \
【温馨提示】注意jdk版本,目前jdk8是正常的。
- kubectl get pods -n flink
- kubectl logs -f my-first-flink-cluster-taskmanager-1-1
- kubectl delete deployment/my-first-flink-cluster -n flink
- kubectl delete ns flink --force
默认用户是flink用户,这里我换成admin,根据企业需要更换用户,脚本可以通过上面运行的pod拿到。
启动脚本 docker-entrypoint.sh
- #!/usr/bin/env bash
-
- ###############################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ###############################################################################
-
- COMMAND_STANDALONE="standalone-job"
- COMMAND_HISTORY_SERVER="history-server"
-
- # If unspecified, the hostname of the container is taken as the JobManager address
- JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
- CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
-
- drop_privs_cmd() {
- if [ $(id -u) != 0 ]; then
- # Don't need to drop privs if EUID != 0
- return
- elif [ -x /sbin/su-exec ]; then
- # Alpine
- echo su-exec admin
- else
- # Others
- echo gosu admin
- fi
- }
- copy_plugins_if_required() {
- if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
- return 0
- fi
- echo "Enabling required built-in plugins"
- for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
- echo "Linking ${target_plugin} to plugin directory"
- plugin_name=${target_plugin%.jar}
- mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
- if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
- echo "Plugin ${target_plugin} does not exist. Exiting."
- exit 1
- else
- ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
- echo "Successfully enabled ${target_plugin}"
- fi
- done
- }
- set_config_option() {
- local option=$1
- local value=$2
- # escape periods for usage in regular expressions
- local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
- # either override an existing entry, or append a new one
- if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
- sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
- else
- echo "${option}: ${value}" >> "${CONF_FILE}"
- fi
- }
- prepare_configuration() {
- set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
- set_config_option blob.server.port 6124
- set_config_option query.server.port 6125
- if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
- set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
- fi
- if [ -n "${FLINK_PROPERTIES}" ]; then
- echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
- fi
- envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
- }
- maybe_enable_jemalloc() {
- if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
- JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
- JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
- if [ -f "$JEMALLOC_PATH" ]; then
- export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
- elif [ -f "$JEMALLOC_FALLBACK" ]; then
- export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
- else
- if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
- MSG_PATH=$JEMALLOC_PATH
- else
- MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
- fi
- echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
- fi
- fi
- }
- maybe_enable_jemalloc
- copy_plugins_if_required
- prepare_configuration
- args=("$@")
- if [ "$1" = "help" ]; then
- printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
- printf " Or $(basename "$0") help\n\n"
- printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
- exit 0
- elif [ "$1" = "jobmanager" ]; then
- args=("${args[@]:1}")
- echo "Starting Job Manager"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
- elif [ "$1" = ${COMMAND_STANDALONE} ]; then
- args=("${args[@]:1}")
- echo "Starting Job Manager"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
- elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
- args=("${args[@]:1}")
- echo "Starting History Server"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
- elif [ "$1" = "taskmanager" ]; then
- args=("${args[@]:1}")
- echo "Starting Task Manager"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
- fi
- args=("${args[@]}")
- # Running command in pass-through mode
- exec $(drop_privs_cmd) "${args[@]}"
编排Dockerfile
- FROM myharbor.com/bigdata/centos:7.9.2009
-
- USER root
-
- # 安装常用工具
- RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
-
- # 设置时区,默认是UTC时区
- RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
-
- RUN mkdir -p /opt/apache
-
- ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
-
- ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/
-
- ENV FLINK_HOME /opt/apache/flink-1.14.6
- ENV JAVA_HOME /opt/apache/jdk1.8.0_212
- ENV PATH $JAVA_HOME/bin:$PATH
-
- # 创建用户应用jar目录
- RUN mkdir $FLINK_HOME/usrlib/
-
- #RUN mkdir home
- COPY docker-entrypoint.sh /opt/apache/
- RUN chmod +x /opt/apache/docker-entrypoint.sh
-
- RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
-
- RUN chown -R admin:admin /opt/apache
-
- #设置的工作目录
- WORKDIR $FLINK_HOME
-
- # 对外暴露端口
- EXPOSE 6123 8081
-
- # 执行脚本,构建镜像时不执行,运行实例才会执行
- ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
- CMD ["help"]
开始构建镜像
- docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
-
- # 上传镜像
- docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
-
- # 删除镜像
- docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- # 创建namespace
- kubectl create ns flink
- # 创建serviceaccount
- kubectl create serviceaccount flink-service-account -n flink
- # 用户授权
- kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
flink-configuration-configmap.yaml
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: flink-config
- labels:
- app: flink
- data:
- flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
- taskmanager.numberOfTaskSlots: 2
- blob.server.port: 6124
- jobmanager.rpc.port: 6123
- taskmanager.rpc.port: 6122
- queryable-state.proxy.ports: 6125
- jobmanager.memory.process.size: 3200m
- taskmanager.memory.process.size: 2728m
- taskmanager.memory.flink.size: 2280m
- parallelism.default: 2
- log4j-console.properties: |+
- # This affects logging for both user code and Flink
- rootLogger.level = INFO
- rootLogger.appenderRef.console.ref = ConsoleAppender
- rootLogger.appenderRef.rolling.ref = RollingFileAppender
-
- # Uncomment this if you want to _only_ change Flink's logging
- #logger.flink.name = org.apache.flink
- #logger.flink.level = INFO
- # The following lines keep the log level of common libraries/connectors on
- # log level INFO. The root logger does not override this. You have to manually
- # change the log levels here.
- logger.akka.name = akka
- logger.akka.level = INFO
- logger.kafka.name= org.apache.kafka
- logger.kafka.level = INFO
- logger.hadoop.name = org.apache.hadoop
- logger.hadoop.level = INFO
- logger.zookeeper.name = org.apache.zookeeper
- logger.zookeeper.level = INFO
- # Log all infos to the console
- appender.console.name = ConsoleAppender
- appender.console.type = CONSOLE
- appender.console.layout.type = PatternLayout
- appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- # Log all infos in the given rolling file
- appender.rolling.name = RollingFileAppender
- appender.rolling.type = RollingFile
- appender.rolling.append = false
- appender.rolling.fileName = ${sys:log.file}
- appender.rolling.filePattern = ${sys:log.file}.%i
- appender.rolling.layout.type = PatternLayout
- appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- appender.rolling.policies.type = Policies
- appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
- appender.rolling.policies.size.size=100MB
- appender.rolling.strategy.type = DefaultRolloverStrategy
- appender.rolling.strategy.max = 10
- # Suppress the irrelevant (wrong) warnings from the Netty channel handler
- logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
- logger.netty.level = OFF
jobmanager-service.yaml
可选服务,仅非 HA 模式需要。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager
- spec:
- type: ClusterIP
- ports:
- - name: rpc
- port: 6123
- - name: blob-server
- port: 6124
- - name: webui
- port: 8081
- selector:
- app: flink
- component: jobmanager
jobmanager-rest-service.yaml
可选服务,将 jobmanager rest
端口公开为公共 Kubernetes 节点的端口。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager-rest
- spec:
- type: NodePort
- ports:
- - name: rest
- port: 8081
- targetPort: 8081
- nodePort: 30081
- selector:
- app: flink
- component: jobmanager
taskmanager-query-state-service.yaml
可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-taskmanager-query-state
- spec:
- type: NodePort
- ports:
- - name: query-state
- port: 6125
- targetPort: 6125
- nodePort: 30025
- selector:
- app: flink
- component: taskmanager
以上几个配置文件是公共的
jobmanager-session-deployment-non-ha.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-jobmanager
- spec:
- replicas: 1
- selector:
- matchLabels:
- app: flink
- component: jobmanager
- template:
- metadata:
- labels:
- app: flink
- component: jobmanager
- spec:
- containers:
- - name: jobmanager
- image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- args: ["jobmanager"]
- ports:
- - containerPort: 6123
- name: rpc
- - containerPort: 6124
- name: blob-server
- - containerPort: 8081
- name: webui
- livenessProbe:
- tcpSocket:
- port: 6123
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/apache/flink-1.14.6/conf/
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
taskmanager-session-deployment.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-taskmanager
- spec:
- replicas: 2
- selector:
- matchLabels:
- app: flink
- component: taskmanager
- template:
- metadata:
- labels:
- app: flink
- component: taskmanager
- spec:
- containers:
- - name: taskmanager
- image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- args: ["taskmanager"]
- ports:
- - containerPort: 6122
- name: rpc
- - containerPort: 6125
- name: query-state
- livenessProbe:
- tcpSocket:
- port: 6122
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/apache/flink-1.14.6/conf/
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
- kubectl create ns flink
- # Configuration and service definition
- kubectl create -f flink-configuration-configmap.yaml -n flink
-
- # service
- kubectl create -f jobmanager-service.yaml -n flink
- kubectl create -f jobmanager-rest-service.yaml -n flink
- kubectl create -f taskmanager-query-state-service.yaml -n flink
-
- # Create the deployments for the cluster
- kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
- kubectl create -f taskmanager-session-deployment.yaml -n flink
镜像逆向解析dockerfile
- alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
- whaler flink:1.14.6-scala_2.12
查看
kubectl get pods,svc -n flink -owide
web:http://192.168.182.110:30081/#/overview
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
- kubectl delete -f jobmanager-service.yaml -n flink
- kubectl delete -f flink-configuration-configmap.yaml -n flink
- kubectl delete -f taskmanager-session-deployment.yaml -n flink
- kubectl delete -f jobmanager-session-deployment.yaml -n flink
- kubectl delete ns flink --force
端口就是jobmanager-rest-service.yaml
文件中的NodePort
http://192.168.182.110:30081/#/overview
Kubernetes 中一个基本的Flink Application 集群部署包含三个组件:
运行JobManager
的应用程序
TaskManagers
池的部署
暴露JobManager
的REST 和 UI 端口的服务
- FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
- RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
- RUN export LANG=zh_CN.UTF-8
- RUN mkdir -p $FLINK_HOME/usrlib
- COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/
开始构建镜像
- docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
-
- # 上传镜像
- docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
-
- # 删除镜像
- docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
- crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
- # 创建namespace
- kubectl create ns flink
- # 创建serviceaccount
- kubectl create serviceaccount flink-service-account -n flink
- # 用户授权
- kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- ./bin/flink run-application \
- --target kubernetes-application \
- -Dkubernetes.cluster-id=my-first-application-cluster \
- -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
- -Dkubernetes.jobmanager.replicas=1 \
- -Dkubernetes.namespace=flink \
- -Dkubernetes.jobmanager.service-account=flink-service-account \
- -Dexternal-resource.limits.kubernetes.cpu=2000m \
- -Dexternal-resource.limits.kubernetes.memory=2Gi \
- -Dexternal-resource.requests.kubernetes.cpu=1000m \
- -Dexternal-resource.requests.kubernetes.memory=1Gi \
- -Dkubernetes.rest-service.exposed.type=NodePort \
- local:///opt/flink/usrlib/TopSpeedWindowing.jar
【注意】
local
是应用模式中唯一支持的方案。local代表本地环境,这里即pod或者容器环境,并非宿主机。
查看
kubectl get pods pods,svc -n flink
kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink
- kubectl delete deployment/my-first-application-cluster -n flink
- kubectl delete ns flink --force
启动脚本 docker-entrypoint.sh
- #!/usr/bin/env bash
-
- ###############################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ###############################################################################
-
- COMMAND_STANDALONE="standalone-job"
- COMMAND_HISTORY_SERVER="history-server"
-
- # If unspecified, the hostname of the container is taken as the JobManager address
- JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
- CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
-
- drop_privs_cmd() {
- if [ $(id -u) != 0 ]; then
- # Don't need to drop privs if EUID != 0
- return
- elif [ -x /sbin/su-exec ]; then
- # Alpine
- echo su-exec admin
- else
- # Others
- echo gosu admin
- fi
- }
- copy_plugins_if_required() {
- if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
- return 0
- fi
- echo "Enabling required built-in plugins"
- for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
- echo "Linking ${target_plugin} to plugin directory"
- plugin_name=${target_plugin%.jar}
- mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
- if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
- echo "Plugin ${target_plugin} does not exist. Exiting."
- exit 1
- else
- ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
- echo "Successfully enabled ${target_plugin}"
- fi
- done
- }
- set_config_option() {
- local option=$1
- local value=$2
- # escape periods for usage in regular expressions
- local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
- # either override an existing entry, or append a new one
- if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
- sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
- else
- echo "${option}: ${value}" >> "${CONF_FILE}"
- fi
- }
- prepare_configuration() {
- set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
- set_config_option blob.server.port 6124
- set_config_option query.server.port 6125
- if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
- set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
- fi
- if [ -n "${FLINK_PROPERTIES}" ]; then
- echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
- fi
- envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
- }
- maybe_enable_jemalloc() {
- if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
- JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
- JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
- if [ -f "$JEMALLOC_PATH" ]; then
- export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
- elif [ -f "$JEMALLOC_FALLBACK" ]; then
- export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
- else
- if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
- MSG_PATH=$JEMALLOC_PATH
- else
- MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
- fi
- echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
- fi
- fi
- }
- maybe_enable_jemalloc
- copy_plugins_if_required
- prepare_configuration
- args=("$@")
- if [ "$1" = "help" ]; then
- printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
- printf " Or $(basename "$0") help\n\n"
- printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
- exit 0
- elif [ "$1" = "jobmanager" ]; then
- args=("${args[@]:1}")
- echo "Starting Job Manager"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
- elif [ "$1" = ${COMMAND_STANDALONE} ]; then
- args=("${args[@]:1}")
- echo "Starting Job Manager"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
- elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
- args=("${args[@]:1}")
- echo "Starting History Server"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
- elif [ "$1" = "taskmanager" ]; then
- args=("${args[@]:1}")
- echo "Starting Task Manager"
- exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
- fi
- args=("${args[@]}")
- # Running command in pass-through mode
- exec $(drop_privs_cmd) "${args[@]}"
编排Dockerfile
- FROM myharbor.com/bigdata/centos:7.9.2009
-
- USER root
-
- # 安装常用工具
- RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
-
- # 设置时区,默认是UTC时区
- RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
-
- RUN mkdir -p /opt/apache
-
- ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
-
- ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/
-
- ENV FLINK_HOME /opt/apache/flink-1.14.6
- ENV JAVA_HOME /opt/apache/jdk1.8.0_212
- ENV PATH $JAVA_HOME/bin:$PATH
-
- # 创建用户应用jar目录
- RUN mkdir $FLINK_HOME/usrlib/
-
- #RUN mkdir home
- COPY docker-entrypoint.sh /opt/apache/
-
- RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
-
- RUN chown -R admin:admin /opt/apache
- RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh
-
- #设置的工作目录
- WORKDIR $FLINK_HOME
-
- # 对外暴露端口
- EXPOSE 6123 8081
-
- # 执行脚本,构建镜像时不执行,运行实例才会执行
- ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
- CMD ["help"]
- docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
-
- # 上传镜像
- docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
-
- # 删除镜像
- docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- # 创建namespace
- kubectl create ns flink
- # 创建serviceaccount
- kubectl create serviceaccount flink-service-account -n flink
- # 用户授权
- kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
- flink-configuration-configmap.yaml
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: flink-config
- labels:
- app: flink
- data:
- flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
- taskmanager.numberOfTaskSlots: 2
- blob.server.port: 6124
- jobmanager.rpc.port: 6123
- taskmanager.rpc.port: 6122
- queryable-state.proxy.ports: 6125
- jobmanager.memory.process.size: 3200m
- taskmanager.memory.process.size: 2728m
- taskmanager.memory.flink.size: 2280m
- parallelism.default: 2
- log4j-console.properties: |+
- # This affects logging for both user code and Flink
- rootLogger.level = INFO
- rootLogger.appenderRef.console.ref = ConsoleAppender
- rootLogger.appenderRef.rolling.ref = RollingFileAppender
-
- # Uncomment this if you want to _only_ change Flink's logging
- #logger.flink.name = org.apache.flink
- #logger.flink.level = INFO
- # The following lines keep the log level of common libraries/connectors on
- # log level INFO. The root logger does not override this. You have to manually
- # change the log levels here.
- logger.akka.name = akka
- logger.akka.level = INFO
- logger.kafka.name= org.apache.kafka
- logger.kafka.level = INFO
- logger.hadoop.name = org.apache.hadoop
- logger.hadoop.level = INFO
- logger.zookeeper.name = org.apache.zookeeper
- logger.zookeeper.level = INFO
- # Log all infos to the console
- appender.console.name = ConsoleAppender
- appender.console.type = CONSOLE
- appender.console.layout.type = PatternLayout
- appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- # Log all infos in the given rolling file
- appender.rolling.name = RollingFileAppender
- appender.rolling.type = RollingFile
- appender.rolling.append = false
- appender.rolling.fileName = ${sys:log.file}
- appender.rolling.filePattern = ${sys:log.file}.%i
- appender.rolling.layout.type = PatternLayout
- appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- appender.rolling.policies.type = Policies
- appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
- appender.rolling.policies.size.size=100MB
- appender.rolling.strategy.type = DefaultRolloverStrategy
- appender.rolling.strategy.max = 10
- # Suppress the irrelevant (wrong) warnings from the Netty channel handler
- logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
- logger.netty.level = OFF
jobmanager-service.yaml
可选服务,仅非 HA 模式需要。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager
- spec:
- type: ClusterIP
- ports:
- - name: rpc
- port: 6123
- - name: blob-server
- port: 6124
- - name: webui
- port: 8081
- selector:
- app: flink
- component: jobmanager
jobmanager-rest-service.yaml
可选服务,将 jobmanager rest
端口公开为公共 Kubernetes 节点的端口。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager-rest
- spec:
- type: NodePort
- ports:
- - name: rest
- port: 8081
- targetPort: 8081
- nodePort: 30081
- selector:
- app: flink
- component: jobmanager
taskmanager-query-state-service.yaml
可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-taskmanager-query-state
- spec:
- type: NodePort
- ports:
- - name: query-state
- port: 6125
- targetPort: 6125
- nodePort: 30025
- selector:
- app: flink
- component: taskmanager
jobmanager-application-non-ha.yaml
,非高可用
- apiVersion: batch/v1
- kind: Job
- metadata:
- name: flink-jobmanager
- spec:
- template:
- metadata:
- labels:
- app: flink
- component: jobmanager
- spec:
- restartPolicy: OnFailure
- containers:
- - name: jobmanager
- image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- env:
- args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]
- ports:
- - containerPort: 6123
- name: rpc
- - containerPort: 6124
- name: blob-server
- - containerPort: 8081
- name: webui
- livenessProbe:
- tcpSocket:
- port: 6123
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/apache/flink-1.14.6/conf
- - name: job-artifacts-volume
- mountPath: /opt/apache/flink-1.14.6/usrlib
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
- - name: job-artifacts-volume
- hostPath:
- path: /mnt/nfsdata/flink/application/job-artifacts
【温馨提示】注意这里的挂载
/mnt/bigdata/flink/usrlib
,最好这里使用共享目录。
- taskmanager-job-deployment.yaml
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-taskmanager
- spec:
- replicas: 2
- selector:
- matchLabels:
- app: flink
- component: taskmanager
- template:
- metadata:
- labels:
- app: flink
- component: taskmanager
- spec:
- containers:
- - name: taskmanager
- image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
- env:
- args: ["taskmanager"]
- ports:
- - containerPort: 6122
- name: rpc
- - containerPort: 6125
- name: query-state
- livenessProbe:
- tcpSocket:
- port: 6122
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/apache/flink-1.14.6/conf
- - name: job-artifacts-volume
- mountPath: /opt/apache/flink-1.14.6/usrlib
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
- - name: job-artifacts-volume
- hostPath:
- path: /mnt/nfsdata/flink/application/job-artifacts
- kubectl create ns flink
- # Configuration and service definition
- kubectl create -f flink-configuration-configmap.yaml -n flink
-
- # service
- kubectl create -f jobmanager-service.yaml -n flink
- kubectl create -f jobmanager-rest-service.yaml -n flink
- kubectl create -f taskmanager-query-state-service.yaml -n flink
-
- # Create the deployments for the cluster
- kubectl create -f jobmanager-application-non-ha.yaml -n flink
- kubectl create -f taskmanager-job-deployment.yaml -n flink
查看
kubectl get pods,svc -n flink
- kubectl delete -f flink-configuration-configmap.yaml -n flink
- kubectl delete -f jobmanager-service.yaml -n flink
- kubectl delete -f jobmanager-rest-service.yaml -n flink
- kubectl delete -f taskmanager-query-state-service.yaml -n flink
- kubectl delete -f jobmanager-application-non-ha.yaml -n flink
- kubectl delete -f taskmanager-job-deployment.yaml -n flink
-
- kubectl delete ns flink --force
- kubectl get pods,svc -n flink
- kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享【云原生+大数据】相关的教程,请小伙伴耐心等待~
本文转自
大数据老司机
,原文:https://www.cnblogs.com/liugp/p/16755095.html
,版权归原作者所有。
最近,我们建立了一个技术交流微信群。目前群里已加入了不少行业内的大神,有兴趣的同学可以加入和我们一起交流技术,在 「奇妙的 Linux 世界」 公众号直接回复 「加群」 邀请你入群。
你可能还喜欢
点击下方图片即可阅读
万字干货,一文搞懂 Elasticsearch 监控
点击上方图片,『美团|饿了么』外卖红包天天免费领
更多有趣的互联网新鲜事,关注「奇妙的互联网」视频号全了解!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。