当前位置:   article > 正文

centos7搭建flink-1.7.2-bin-hadoop27-scala_2.12 yarn模式集群_flink 内存管理 1.7.2

flink 内存管理 1.7.2

flink集群

flink可以基于自身的standalone模式进行分布式集群计算,也可以利用第三方资源管理器完成分布式集群计算。目前比较流行的第三方资源管理器包括Hadoop Yarn,Apache Mesos,Kubernetes等。但是相对来说,因为yarn能够同时支持hadoop mapreduce和spark等大数据框架,因此普遍使用yarn模式来管理集群资源。因此这里主要对yarn模式进行介绍。

yarn模式

目前flink有两种方式将应用提交到yarn上,分别是yarn session模式和single job模式。关于这两种模式的区别可以看这篇文章

搭建yarn模式集群

这里采用三台虚拟机进行集群搭建,这三台虚拟机上需要提前配置好Hadoop环境,可以参考这篇文章

因为在搭建Hadoop集群时就已经配置好jdk和ssh免密登录,因此在搭建flink yarn模式集群的时候只需要对flink进行相关配置即可。

首先下载https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.12.tgz压缩包上传到服务器,解压后生成flink-1.7.2文件夹,接下来需要对conf文件夹下的masters,slaves和flink-conf.yaml三个文件进行配置。

首先配置masters文件,其中的设置指定了flink JobManager的hostname以及端口,配置如下

  1. #该文件用于指定主节点及其web访问端口,表示集群的Jobmanager
  2. flink1:8081

然后配置slaves文件 (因为在配置免密登录的时候配置过/etc/hosts文件,因此这里可以直接用主机名映射到IP)

  1. #localhost
  2. #指定从节点,表示集群的taskManager
  3. flink1
  4. flink2
  5. flink3

 最后配置flink-conf.yaml文件

  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. #==============================================================================
  19. # Common
  20. #==============================================================================
  21. # The external address of the host on which the JobManager runs and can be
  22. # reached by the TaskManagers and any clients which want to connect. This setting
  23. # is only used in Standalone mode and may be overwritten on the JobManager side
  24. # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
  25. # In high availability mode, if you use the bin/start-cluster.sh script and setup
  26. # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
  27. # automatically configure the host name based on the hostname of the node where the
  28. # JobManager runs.
  29. #Jobmanager的IP地址,即master地址。
  30. jobmanager.rpc.address: flink1
  31. # The RPC port where the JobManager is reachable.
  32. jobmanager.rpc.port: 6123
  33. # The heap size for the JobManager JVM
  34. #JobManager的堆大小(单位是MB)。当长时间运行operator非常多的程序时,需要增加此值。具体设置多少只能通过测试不断调整。
  35. jobmanager.heap.size: 1024m
  36. # The heap size for the TaskManager JVM
  37. #每一个TaskManager的堆大小(单位是MB),由于每个taskmanager要运行operator的各种
  38. #函数(Map、Reduce、CoGroup等,包含sorting、hashing、caching),因此这个值应该尽可能的大。
  39. #如果集群仅仅跑Flink的程序,建议此值等于机器的内存大小减去12G,剩余的12GB用于操作系统。
  40. #如果是Yarn模式,这个值通过指定tm参数来分配给container,同样要减去操作系统可以容忍的大小(12GB)。
  41. taskmanager.heap.size: 1024m
  42. # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
  43. #每个TaskManager的并行度。一个slot对应一个core,默认值是1.一个并行度对应一个线程。总的内存大小要且分给不同的线程使用。
  44. taskmanager.numberOfTaskSlots: 2
  45. # The parallelism used for programs that did not specify and other parallelism.
  46. #每个operator的默认并行度。默认是1.如果程序中对operator设置了setParallelism,或者提交程序
  47. #时指定了-p参数,则会覆盖此参数。如果只有一个Job运行时,此值可以设置为
  48. #taskManager的数量 * 每个taskManager的slots数量。即NumTaskManagers * NumSlotsPerTaskManager 。
  49. parallelism.default: 6
  50. # The default file system scheme and authority.
  51. #
  52. # By default file paths without scheme are interpreted relative to the local
  53. # root file system 'file:///'. Use this to override the default and interpret
  54. # relative paths relative to a different file system,
  55. # for example 'hdfs://mynamenode:12345'
  56. #
  57. # fs.default-scheme
  58. #==============================================================================
  59. # High Availability
  60. #==============================================================================
  61. # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
  62. #
  63. # high-availability: zookeeper
  64. # The path where metadata for master recovery is persisted. While ZooKeeper stores
  65. # the small ground truth for checkpoint and leader election, this location stores
  66. # the larger objects, like persisted dataflow graphs.
  67. #
  68. # Must be a durable file system that is accessible from all nodes
  69. # (like HDFS, S3, Ceph, nfs, ...)
  70. #
  71. # high-availability.storageDir: hdfs:///flink/ha/
  72. # The list of ZooKeeper quorum peers that coordinate the high-availability
  73. # setup. This must be a list of the form:
  74. # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
  75. #
  76. # high-availability.zookeeper.quorum: localhost:2181
  77. # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
  78. # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
  79. # The default value is "open" and it can be changed to "creator" if ZK security is enabled
  80. #
  81. # high-availability.zookeeper.client.acl: open
  82. #==============================================================================
  83. # Fault tolerance and checkpointing
  84. #==============================================================================
  85. # The backend that will be used to store operator state checkpoints if
  86. # checkpointing is enabled.
  87. #
  88. # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
  89. # <class-name-of-factory>.
  90. #
  91. # state.backend: filesystem
  92. # Directory for checkpoints filesystem, when using any of the default bundled
  93. # state backends.
  94. #
  95. # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
  96. # Default target directory for savepoints, optional.
  97. #
  98. # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
  99. # Flag to enable/disable incremental checkpoints for backends that
  100. # support incremental checkpoints (like the RocksDB state backend).
  101. #
  102. # state.backend.incremental: false
  103. #==============================================================================
  104. # Web Frontend
  105. #==============================================================================
  106. # The address under which the web-based runtime monitor listens.
  107. #
  108. #web.address: 0.0.0.0
  109. # The port under which the web-based runtime monitor listens.
  110. # A value of -1 deactivates the web server.
  111. rest.port: 8081
  112. # Flag to specify whether job submission is enabled from the web-based
  113. # runtime monitor. Uncomment to disable.
  114. #web.submit.enable: false
  115. #==============================================================================
  116. # Advanced
  117. #==============================================================================
  118. # Override the directories for temporary files. If not specified, the
  119. # system-specific Java temporary directory (java.io.tmpdir property) is taken.
  120. # 指定临时文件目录,如果不指定,则使用系统默认的Java临时目录(java.io.tmpdir)
  121. io.tmp.dirs: /tmp/flink
  122. # For framework setups on Yarn or Mesos, Flink will automatically pick up the
  123. # containers' temp directories without any need for configuration.
  124. #
  125. # Add a delimited list for multiple directories, using the system directory
  126. # delimiter (colon ':' on unix) or a comma, e.g.:
  127. # /data1/tmp:/data2/tmp:/data3/tmp
  128. #
  129. # Note: Each directory entry is read from and written to by a different I/O
  130. # thread. You can include the same directory multiple times in order to create
  131. # multiple I/O threads against that directory. This is for example relevant for
  132. # high-throughput RAIDs.
  133. #
  134. # io.tmp.dirs: /tmp
  135. # Specify whether TaskManager's managed memory should be allocated when starting
  136. # up (true) or when memory is requested.
  137. #
  138. # We recommend to set this value to 'true' only in setups for pure batch
  139. # processing (DataSet API). Streaming setups currently do not use the TaskManager's
  140. # managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
  141. # while the 'memory' and 'filesystem' backends explicitly keep data as objects
  142. # to save on serialization cost.
  143. #
  144. # taskmanager.memory.preallocate: false
  145. # The classloading resolve order. Possible values are 'child-first' (Flink's default)
  146. # and 'parent-first' (Java's default).
  147. #
  148. # Child first classloading allows users to use different dependency/library
  149. # versions in their application than those in the classpath. Switching back
  150. # to 'parent-first' may help with debugging dependency issues.
  151. #
  152. # classloader.resolve-order: child-first
  153. # The amount of memory going to the network stack. These numbers usually need
  154. # no tuning. Adjusting them may be necessary in case of an "Insufficient number
  155. # of network buffers" error. The default min is 64MB, teh default max is 1GB.
  156. #
  157. # taskmanager.network.memory.fraction: 0.1
  158. # taskmanager.network.memory.min: 64mb
  159. # taskmanager.network.memory.max: 1gb
  160. #==============================================================================
  161. # Flink Cluster Security Configuration
  162. #==============================================================================
  163. # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
  164. # may be enabled in four steps:
  165. # 1. configure the local krb5.conf file
  166. # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
  167. # 3. make the credentials available to various JAAS login contexts
  168. # 4. configure the connector to use JAAS/SASL
  169. # The below configure how Kerberos credentials are provided. A keytab will be used instead of
  170. # a ticket cache if the keytab path and principal are set.
  171. # security.kerberos.login.use-ticket-cache: true
  172. # security.kerberos.login.keytab: /path/to/kerberos/keytab
  173. # security.kerberos.login.principal: flink-user
  174. # The configuration below defines which JAAS login contexts
  175. # security.kerberos.login.contexts: Client,KafkaClient
  176. #==============================================================================
  177. # ZK Security Configuration
  178. #==============================================================================
  179. # Below configurations are applicable if ZK ensemble is configured for security
  180. # Override below configuration to provide custom ZK service name if configured
  181. # zookeeper.sasl.service-name: zookeeper
  182. # The configuration below must match one of the values set in "security.kerberos.login.contexts"
  183. # zookeeper.sasl.login-context-name: Client
  184. #==============================================================================
  185. # HistoryServer
  186. #==============================================================================
  187. # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
  188. # Directory to upload completed jobs to. Add this directory to the list of
  189. # monitored directories of the HistoryServer as well (see below).
  190. #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
  191. # The address under which the web-based HistoryServer listens.
  192. #historyserver.web.address: 0.0.0.0
  193. # The port under which the web-based HistoryServer listens.
  194. #historyserver.web.port: 8082
  195. # Comma separated list of directories to monitor for completed jobs.
  196. #historyserver.archive.fs.dir: hdfs:///completed-jobs/
  197. # Interval in milliseconds for refreshing the monitored directories.
  198. #historyserver.archive.fs.refresh-interval: 10000

将配置完的flink-1.7.2整个文件夹scp到另外两台机器相同的目录下。

接下还有一步非常关键,就是在master节点(这里时flink1机器)上需要配置HADOOP_CONF_DIR变量,以便启动flink的时候能够自动识别到Hadoop配置信息,连接到yarn的resourcemanager和hdfs。这里配置的HADOOP_CONF_DIR的值就是Hadoop的配置文件conf所在的路径

export HADOOP_CONF_DIR=/opt/hadoop-2.7.5/etc/hadoop

(别忘了source使环境变量生效)

下面是别人写的更详细的配置策略:

  1. 1.会查看YARN_CONF_DIR,HADOOP_CONF_DIR或者HADOOP_CONF_PATH是否设置,按照顺序检查的。然后,假如配置了就会从该文件夹下读取配置。
  2. 2.如果上面环境变量都没有配置的话,会使用HADOOP_HOME环境变量。对于hadoop2的话会查找的配置路径是 $HADOOP_HOME/etc/hadoop;对于hadoop1会查找的路径是$HADOOP_HOME/conf.

通过上面的配置策略可知,这里也可以不再单独配置HADOOP_CONF_DIR变量,因为搭建Hadoop的时候已经配置了HADOOP_HOME。

接下来就可以在集群上通过测试自己的程序了,这里使用的是在另一篇文章(IDEA配置flink开发环境及local集群代码测试)中写的测试程序,通过single job的方式进行测试的,具体如下

  1. [root@flink1 opt]# ./flink-1.7.2/bin/flink run -m yarn-cluster -yn 2 ./flinkLearn-1.0-SNAPSHOT-jar-with-dependencies.jar
  2. 2019-10-20 15:30:11,491 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at flink1/192.168.89.128:8032
  3. 2019-10-20 15:30:11,922 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  4. 2019-10-20 15:30:11,922 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  5. 2019-10-20 15:30:12,004 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
  6. 2019-10-20 15:30:12,004 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
  7. 2019-10-20 15:30:13,554 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
  8. 2019-10-20 15:30:15,444 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/opt/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
  9. 2019-10-20 15:30:27,872 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1571555933107_0001
  10. 2019-10-20 15:30:29,165 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1571555933107_0001
  11. 2019-10-20 15:30:29,167 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
  12. 2019-10-20 15:30:29,199 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
  13. 2019-10-20 15:30:48,257 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
  14. Starting execution of program
  15. Program execution finished
  16. Job with JobID 9102fbc05daf96f38a53e29d4823e35b has finished.
  17. Job Runtime: 46309 ms

可是你可能会发现虽然运行成功了,但是在flink webui中找不到此job的任何信息。这是因为使用的是yarn模式,会先在yarn中申请资源启动yarn-session,然后在yarn-session上运行提交的job。所以相关的job信息已经交给yarn来处理了,所以在yarn webui中可以看到任务的执行信息。

从上图中可以看到应用的状态变成了finished,也就是说single job这种提交方式在执行完job后会自动停掉yarn-session,释放资源。与第一种常驻yarn session模式相比,这种方式具有很大的优势,只有在需要的时候才会申请资源,运行完后自动释放资源,不会占用资源。而第一种申请完之后,需要通过yarn application -kill applicationID的方式手动释放资源。

参考:https://www.jianshu.com/p/1b05202c4fb6 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/997448
推荐阅读
相关标签
  

闽ICP备14008679号