赞
踩
flink可以基于自身的standalone模式进行分布式集群计算,也可以利用第三方资源管理器完成分布式集群计算。目前比较流行的第三方资源管理器包括Hadoop Yarn,Apache Mesos,Kubernetes等。但是相对来说,因为yarn能够同时支持hadoop mapreduce和spark等大数据框架,因此普遍使用yarn模式来管理集群资源。因此这里主要对yarn模式进行介绍。
目前flink有两种方式将应用提交到yarn上,分别是yarn session模式和single job模式。关于这两种模式的区别可以看这篇文章。
这里采用三台虚拟机进行集群搭建,这三台虚拟机上需要提前配置好Hadoop环境,可以参考这篇文章。
因为在搭建Hadoop集群时就已经配置好jdk和ssh免密登录,因此在搭建flink yarn模式集群的时候只需要对flink进行相关配置即可。
首先下载https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.12.tgz压缩包上传到服务器,解压后生成flink-1.7.2文件夹,接下来需要对conf文件夹下的masters,slaves和flink-conf.yaml三个文件进行配置。
首先配置masters文件,其中的设置指定了flink JobManager的hostname以及端口,配置如下
- #该文件用于指定主节点及其web访问端口,表示集群的Jobmanager
- flink1:8081
然后配置slaves文件 (因为在配置免密登录的时候配置过/etc/hosts文件,因此这里可以直接用主机名映射到IP)
- #localhost
- #指定从节点,表示集群的taskManager
- flink1
- flink2
- flink3
最后配置flink-conf.yaml文件
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
-
-
- #==============================================================================
- # Common
- #==============================================================================
-
- # The external address of the host on which the JobManager runs and can be
- # reached by the TaskManagers and any clients which want to connect. This setting
- # is only used in Standalone mode and may be overwritten on the JobManager side
- # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
- # In high availability mode, if you use the bin/start-cluster.sh script and setup
- # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
- # automatically configure the host name based on the hostname of the node where the
- # JobManager runs.
-
- #Jobmanager的IP地址,即master地址。
- jobmanager.rpc.address: flink1
-
- # The RPC port where the JobManager is reachable.
-
- jobmanager.rpc.port: 6123
-
-
- # The heap size for the JobManager JVM
- #JobManager的堆大小(单位是MB)。当长时间运行operator非常多的程序时,需要增加此值。具体设置多少只能通过测试不断调整。
- jobmanager.heap.size: 1024m
-
-
- # The heap size for the TaskManager JVM
- #每一个TaskManager的堆大小(单位是MB),由于每个taskmanager要运行operator的各种
- #函数(Map、Reduce、CoGroup等,包含sorting、hashing、caching),因此这个值应该尽可能的大。
- #如果集群仅仅跑Flink的程序,建议此值等于机器的内存大小减去1、2G,剩余的1、2GB用于操作系统。
- #如果是Yarn模式,这个值通过指定tm参数来分配给container,同样要减去操作系统可以容忍的大小(1、2GB)。
- taskmanager.heap.size: 1024m
-
-
- # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
- #每个TaskManager的并行度。一个slot对应一个core,默认值是1.一个并行度对应一个线程。总的内存大小要且分给不同的线程使用。
- taskmanager.numberOfTaskSlots: 2
-
- # The parallelism used for programs that did not specify and other parallelism.
-
- #每个operator的默认并行度。默认是1.如果程序中对operator设置了setParallelism,或者提交程序
- #时指定了-p参数,则会覆盖此参数。如果只有一个Job运行时,此值可以设置为
- #taskManager的数量 * 每个taskManager的slots数量。即NumTaskManagers * NumSlotsPerTaskManager 。
- parallelism.default: 6
-
- # The default file system scheme and authority.
- #
- # By default file paths without scheme are interpreted relative to the local
- # root file system 'file:///'. Use this to override the default and interpret
- # relative paths relative to a different file system,
- # for example 'hdfs://mynamenode:12345'
- #
- # fs.default-scheme
-
- #==============================================================================
- # High Availability
- #==============================================================================
-
- # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
- #
- # high-availability: zookeeper
-
- # The path where metadata for master recovery is persisted. While ZooKeeper stores
- # the small ground truth for checkpoint and leader election, this location stores
- # the larger objects, like persisted dataflow graphs.
- #
- # Must be a durable file system that is accessible from all nodes
- # (like HDFS, S3, Ceph, nfs, ...)
- #
- # high-availability.storageDir: hdfs:///flink/ha/
-
- # The list of ZooKeeper quorum peers that coordinate the high-availability
- # setup. This must be a list of the form:
- # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
- #
- # high-availability.zookeeper.quorum: localhost:2181
-
-
- # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
- # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
- # The default value is "open" and it can be changed to "creator" if ZK security is enabled
- #
- # high-availability.zookeeper.client.acl: open
-
- #==============================================================================
- # Fault tolerance and checkpointing
- #==============================================================================
-
- # The backend that will be used to store operator state checkpoints if
- # checkpointing is enabled.
- #
- # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
- # <class-name-of-factory>.
- #
- # state.backend: filesystem
-
- # Directory for checkpoints filesystem, when using any of the default bundled
- # state backends.
- #
- # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
-
- # Default target directory for savepoints, optional.
- #
- # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
-
- # Flag to enable/disable incremental checkpoints for backends that
- # support incremental checkpoints (like the RocksDB state backend).
- #
- # state.backend.incremental: false
-
- #==============================================================================
- # Web Frontend
- #==============================================================================
-
- # The address under which the web-based runtime monitor listens.
- #
- #web.address: 0.0.0.0
-
- # The port under which the web-based runtime monitor listens.
- # A value of -1 deactivates the web server.
-
- rest.port: 8081
-
- # Flag to specify whether job submission is enabled from the web-based
- # runtime monitor. Uncomment to disable.
-
- #web.submit.enable: false
-
- #==============================================================================
- # Advanced
- #==============================================================================
-
- # Override the directories for temporary files. If not specified, the
- # system-specific Java temporary directory (java.io.tmpdir property) is taken.
- # 指定临时文件目录,如果不指定,则使用系统默认的Java临时目录(java.io.tmpdir)
- io.tmp.dirs: /tmp/flink
- # For framework setups on Yarn or Mesos, Flink will automatically pick up the
- # containers' temp directories without any need for configuration.
- #
- # Add a delimited list for multiple directories, using the system directory
- # delimiter (colon ':' on unix) or a comma, e.g.:
- # /data1/tmp:/data2/tmp:/data3/tmp
- #
- # Note: Each directory entry is read from and written to by a different I/O
- # thread. You can include the same directory multiple times in order to create
- # multiple I/O threads against that directory. This is for example relevant for
- # high-throughput RAIDs.
- #
- # io.tmp.dirs: /tmp
- # Specify whether TaskManager's managed memory should be allocated when starting
- # up (true) or when memory is requested.
- #
- # We recommend to set this value to 'true' only in setups for pure batch
- # processing (DataSet API). Streaming setups currently do not use the TaskManager's
- # managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
- # while the 'memory' and 'filesystem' backends explicitly keep data as objects
- # to save on serialization cost.
- #
- # taskmanager.memory.preallocate: false
-
- # The classloading resolve order. Possible values are 'child-first' (Flink's default)
- # and 'parent-first' (Java's default).
- #
- # Child first classloading allows users to use different dependency/library
- # versions in their application than those in the classpath. Switching back
- # to 'parent-first' may help with debugging dependency issues.
- #
- # classloader.resolve-order: child-first
-
- # The amount of memory going to the network stack. These numbers usually need
- # no tuning. Adjusting them may be necessary in case of an "Insufficient number
- # of network buffers" error. The default min is 64MB, teh default max is 1GB.
- #
- # taskmanager.network.memory.fraction: 0.1
- # taskmanager.network.memory.min: 64mb
- # taskmanager.network.memory.max: 1gb
-
- #==============================================================================
- # Flink Cluster Security Configuration
- #==============================================================================
-
- # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
- # may be enabled in four steps:
- # 1. configure the local krb5.conf file
- # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
- # 3. make the credentials available to various JAAS login contexts
- # 4. configure the connector to use JAAS/SASL
-
- # The below configure how Kerberos credentials are provided. A keytab will be used instead of
- # a ticket cache if the keytab path and principal are set.
-
- # security.kerberos.login.use-ticket-cache: true
- # security.kerberos.login.keytab: /path/to/kerberos/keytab
- # security.kerberos.login.principal: flink-user
-
- # The configuration below defines which JAAS login contexts
-
- # security.kerberos.login.contexts: Client,KafkaClient
-
- #==============================================================================
- # ZK Security Configuration
- #==============================================================================
-
- # Below configurations are applicable if ZK ensemble is configured for security
-
- # Override below configuration to provide custom ZK service name if configured
- # zookeeper.sasl.service-name: zookeeper
-
- # The configuration below must match one of the values set in "security.kerberos.login.contexts"
- # zookeeper.sasl.login-context-name: Client
-
- #==============================================================================
- # HistoryServer
- #==============================================================================
-
- # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
-
- # Directory to upload completed jobs to. Add this directory to the list of
- # monitored directories of the HistoryServer as well (see below).
- #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
-
- # The address under which the web-based HistoryServer listens.
- #historyserver.web.address: 0.0.0.0
-
- # The port under which the web-based HistoryServer listens.
- #historyserver.web.port: 8082
-
- # Comma separated list of directories to monitor for completed jobs.
- #historyserver.archive.fs.dir: hdfs:///completed-jobs/
-
- # Interval in milliseconds for refreshing the monitored directories.
- #historyserver.archive.fs.refresh-interval: 10000
-
将配置完的flink-1.7.2整个文件夹scp到另外两台机器相同的目录下。
接下还有一步非常关键,就是在master节点(这里时flink1机器)上需要配置HADOOP_CONF_DIR变量,以便启动flink的时候能够自动识别到Hadoop配置信息,连接到yarn的resourcemanager和hdfs。这里配置的HADOOP_CONF_DIR的值就是Hadoop的配置文件conf所在的路径
export HADOOP_CONF_DIR=/opt/hadoop-2.7.5/etc/hadoop
(别忘了source使环境变量生效)
下面是别人写的更详细的配置策略:
- 1.会查看YARN_CONF_DIR,HADOOP_CONF_DIR或者HADOOP_CONF_PATH是否设置,按照顺序检查的。然后,假如配置了就会从该文件夹下读取配置。
-
- 2.如果上面环境变量都没有配置的话,会使用HADOOP_HOME环境变量。对于hadoop2的话会查找的配置路径是 $HADOOP_HOME/etc/hadoop;对于hadoop1会查找的路径是$HADOOP_HOME/conf.
通过上面的配置策略可知,这里也可以不再单独配置HADOOP_CONF_DIR变量,因为搭建Hadoop的时候已经配置了HADOOP_HOME。
接下来就可以在集群上通过测试自己的程序了,这里使用的是在另一篇文章(IDEA配置flink开发环境及local集群代码测试)中写的测试程序,通过single job的方式进行测试的,具体如下
- [root@flink1 opt]# ./flink-1.7.2/bin/flink run -m yarn-cluster -yn 2 ./flinkLearn-1.0-SNAPSHOT-jar-with-dependencies.jar
- 2019-10-20 15:30:11,491 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at flink1/192.168.89.128:8032
- 2019-10-20 15:30:11,922 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
- 2019-10-20 15:30:11,922 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
- 2019-10-20 15:30:12,004 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
- 2019-10-20 15:30:12,004 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
- 2019-10-20 15:30:13,554 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
- 2019-10-20 15:30:15,444 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/opt/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
- 2019-10-20 15:30:27,872 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1571555933107_0001
- 2019-10-20 15:30:29,165 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1571555933107_0001
- 2019-10-20 15:30:29,167 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
- 2019-10-20 15:30:29,199 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
- 2019-10-20 15:30:48,257 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
- Starting execution of program
- Program execution finished
- Job with JobID 9102fbc05daf96f38a53e29d4823e35b has finished.
- Job Runtime: 46309 ms
可是你可能会发现虽然运行成功了,但是在flink webui中找不到此job的任何信息。这是因为使用的是yarn模式,会先在yarn中申请资源启动yarn-session,然后在yarn-session上运行提交的job。所以相关的job信息已经交给yarn来处理了,所以在yarn webui中可以看到任务的执行信息。
从上图中可以看到应用的状态变成了finished,也就是说single job这种提交方式在执行完job后会自动停掉yarn-session,释放资源。与第一种常驻yarn session模式相比,这种方式具有很大的优势,只有在需要的时候才会申请资源,运行完后自动释放资源,不会占用资源。而第一种申请完之后,需要通过yarn application -kill applicationID的方式手动释放资源。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。