赞
踩
Kubernetes 是 Google 开源的 容器集群管理系统,其提供应用部署、维护、扩展机制等功能,利用 Kubernetes 能方便地管理跨机器运行容器化的应用。Kubernetes 和 Yarn 相比,相当于下一代的资源管理系统,但是它的能力远远不止这些。
Kubernetes 中的 Master 节点,负责管理整个集群,含有一个集群的资源数据访问入口,还包含一个 Etcd
高可用键值存储服务。Master 中运行着 API Server
,Controller Manager
及 Scheduler
服务。
Node 为集群的一个操作单元,是 Pod 运行的宿主机。Node 节点里包含一个 Agent
进程,能够维护和管理该 Node 上的所有容器的创建、启停等。Node 还含有一个服务端 kube-proxy
,用于 服务发现、反向代理 和 负载均衡。Node 底层含有 docker engine
,docker 引擎主要负责本机容器的创建和管理工作。
Pod 运行于 Node 节点上,是若干相关容器的组合。在 K8s 里面 Pod 是创建、调度和管理的最小单位。
Kubernetes 的架构如图所示,从这个图里面能看出 Kubernetes 的整个运行过程。
API Server
相当于用户的一个请求入口,用户可以提交命令给 Etcd
,这时会将这些请求存储到 Etcd
里面去。Etcd
是一个键值存储,负责将任务分配给具体的机器,在每个节点上的 Kubelet 会找到对应的 container
在本机上运行。service
描述文件,并由 kube proxy
负责具体工作的流量转发。Kubernetes 中比较重要的概念有:
Replication Controller
(RC
)用来管理 Pod 的副本。RC 确保任何时候 Kubernetes 集群中有指定数量的 Pod 副本(replicas
)在运行, 如果少于指定数量的 Pod 副本,RC 会启动新的 Container,反之会杀死多余的以保证数量不变。Service
提供了一个统一的服务访问入口以及服务代理和发现机制Persistent Volume
(PV
)和 Persistent Volume Claim
(PVC
)用于数据的持久化存储。ConfigMap
是指存储用户程序的配置文件,其后端存储是基于 Etcd
。
Flink on Kubernetes 的架构如图所示,Flink 任务在 Kubernetes 上运行的步骤有:
container
。Flink-Container ResourceManager
、JobManager
和 Program Runner
。TaskManager
,并向负责资源管理的 ResourceManager
进行注册,注册完成之后,由 JobManager 将具体的任务分给 Container,再由 Container 去执行。JobManager 的执行过程分为两步:
flink-jobmanager
。service name
和 port
暴露 JobManager 服务,通过标签选择对应的 pods
。TaskManager 也是通过 Deployment 来进行描述,保证
n
n
n 个副本的 Container 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager
。
对于 JobManager 和 TaskManager 运行过程中需要的一些配置文件,如:flink-conf.yaml
、hdfs-site.xml
、core-site.xml
,可以通过将它们定义为 ConfigMap
来实现配置的传递和读取。
整个交互的流程比较简单,用户往 Kubernetes 集群提交定义好的资源描述文件即可,例如 deployment
、configmap
、service
等描述。后续的事情就交给 Kubernetes 集群自动完成。Kubernetes 集群会按照定义好的描述来启动 pod
,运行用户程序。各个组件的具体工作如下:
label selector
)找到 job manager
的 pod
暴露服务。container
运行 JM / TM,应用升级策略。pod
上通过挂载 /etc/flink
目录,包含 flink-conf.yaml
内容。接下来就讲一下 Flink on Kubernetes 的实践篇,即 K8s 上是怎么运行任务的。
# 启动
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
# Submit job
kubectl port-forward service/flink-jobmanager 8081:8081
bin/flink run -d -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
# 停止
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
首先启动 Session Cluster,执行上述三条启动命令就可以将 Flink 的 jobManager-service
、jobmanager-deployment
、taskmanager-deployment
启动起来。启动完成之后用户可以通过接口进行访问,然后通过端口进行提交任务。若想销毁集群,直接用 kubectl delete
即可,整个资源就可以销毁。
Flink 官方提供的例子如下图所示,图中左侧为 jobmanager-deployment.yaml
配置,右侧为 taskmanager-deployment.yaml
配置。
在 jobmanager-deployment.yaml
配置中:
apiVersion
,apiVersion
是 API 的一个版本号,版本号用的是 extensions/v1beta1
版本。kind
为 Deployment
。metadata
的名为 flink-jobmanager
。replicas
为
1
1
1。labels
标签用于 pod
的选取。containers
的镜像名为 jobmanager
,containers
包含从公共 docker
仓库下载的 image
,当然也可以使用公司内部的私有仓库。args
启动参数用于决定启动的是 jobmanager
还是 taskmanager
。ports
是服务端口,常见的服务端口为
8081
8081
8081 端口。env
是定义的环境变量,会传递给具体的启动脚本。右图为 taskmanager-deployment.yaml
配置,taskmanager-deployment.yaml
配置与 jobmanager-deployment.yaml
相似,但 taskmanager-deployment.yaml
的副本数是
2
2
2 个。
接下来是 jobmanager-service.yaml
的配置,jobmanager-service.yaml
的资源类型为 Service
,在 Service
中的配置相对少一些,spec
中配置需要暴露的服务端口的 port
,在 selector
中,通过标签选取 jobmanager
的 pod
。
除了 Session 模式,还有一种 Per Job 模式。在 Per Job 模式下,需要将用户代码都打到镜像里面,这样如果业务逻辑的变动涉及到 Jar 包的修改,都需要重新生成镜像,整个过程比较繁琐,因此在生产环境中使用的比较少。
以使用公用 Docker 仓库为例,Job Cluster 的运行步骤如下:
flink/flink-container/docker
目录下执行 build.sh
脚本,指定从哪个版本开始去构建镜像,成功后会输出 Successfully tagged topspeed:latest
的提示。sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8 --scala-version 2.11 --job-jar ~/flink/flink-1.7.1/examples/streaming/TopSpeedWindowing.jar --image-name topspeed
hub.docker.com
上需要注册账号和创建仓库进行上传镜像。docker tag topspeed zkb555/topspeedwindowing
docker push zkb555/topspeedwindowing
# 启动 Servive
kubectl create -f job-cluster-service.yaml
# 启动 JobManager
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-cluster-job.yaml.template | kubectl create -f –
# 启动 TaskManager
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB_PARALLELISM=4 envsubst < task-manager-deployment.yaml.template | kubectl create -f -
Flink 在 K8s 上可以通过 Operator 方式提交任务吗?
目前 Flink 官方还没有提供 Operator 的方式, L y f t Lyft Lyft 公司开源了自己的 Operator 实现:https://github.com/lyft/flinkk8soperator。
在 K8s 集群上如果不使用 Zookeeper 有没有其他高可用(HA)的方案?
Etcd 是一个类似于 Zookeeper 的高可用键值服务,目前 Flink 社区正在考虑基于 Etcd 实现高可用的方案(https://issues.apache.org/jira/browse/FLINK-11105)以及直接依赖 K8s API 的方案(https://issues.apache.org/jira/browse/FLINK-12884)。
Flink on K8s 在任务启动时需要指定 TaskManager 的个数,有和 Yarn 一样的动态资源申请方式吗?
Flink on K8s 目前的实现,在任务启动前就需要确定好 TaskManager 的个数,这样容易造成 TM 指定太少,任务无法启动,或者指定的太多,造成资源浪费。社区正在考虑实现和 Yarn 一样的任务启动时动态资源申请的方式。这是一种和 K8s 结合的更为 Nativey 的方式,称为 Active
模式。Active
意味着 ResourceManager 可以直接向 K8s 集群申请资源。具体设计方案和进展请关注:https://issues.apache.org/jira/browse/FLINK-9953。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。