当前位置:   article > 正文

Flink_Flink 集群搭建_flink集群搭建

flink集群搭建

本文主要讲解下 flink 如何搭建 :

 

单节点集群

standalone 集群 

yarnsession 集群

 

最近学习了下 Flink ,看了许多天的书,一上手搭建集群遇到了许多问题。  我在这里整理下集群搭建所遇到的问题。

 

 

单节点集群

    单节点集群,其实不难。主要我是虚拟机器,内存很小,所以我们要调整  task-manager 的内存参数。 

task-manager 的内存分配管理 与 参数配置 是一个大问题,我专门写了一篇文章 :

https://blog.csdn.net/u010003835/article/details/106294342

由于我搭建的环境是虚拟机,所以需要调整集群中节点的内存 

调整的内存大小如下:

  1. jobmanager.heap.size: 64m
  2. taskmanager.memory.process.size: 512m
  3. taskmanager.memory.framework.heap.size: 64m
  4. taskmanager.memory.framework.off-heap.size: 64m
  5. taskmanager.memory.jvm-metaspace.size: 64m
  6. taskmanager.memory.jvm-overhead.fraction: 0.2
  7. taskmanager.memory.jvm-overhead.min: 16m
  8. taskmanager.memory.jvm-overhead.max: 64m
  9. taskmanager.memory.network.fraction: 0.1
  10. taskmanager.memory.network.min: 1mb
  11. taskmanager.memory.network.max: 256mb

内存调整为如上大小后,我成功启动了单节点集群

详细配置 :

  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. #=================================================================
  19. #=================================================================
  20. env.yarn.conf.dir: /etc/hadoop/conf.cloudera.yarn
  21. env.hadoop.conf.dir: /etc/hadoop/conf.cloudera.hdfs
  22. #==============================================================================
  23. # Common
  24. #==============================================================================
  25. # The external address of the host on which the JobManager runs and can be
  26. # reached by the TaskManagers and any clients which want to connect. This setting
  27. # is only used in Standalone mode and may be overwritten on the JobManager side
  28. # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
  29. # In high availability mode, if you use the bin/start-cluster.sh script and setup
  30. # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
  31. # automatically configure the host name based on the hostname of the node where the
  32. # JobManager runs.
  33. jobmanager.rpc.address: cdh-manager
  34. # The RPC port where the JobManager is reachable.
  35. jobmanager.rpc.port: 6123
  36. # The heap size for the JobManager JVM
  37. jobmanager.heap.size: 64m
  38. # The total process memory size for the TaskManager.
  39. #
  40. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
  41. taskmanager.memory.process.size: 512m
  42. taskmanager.memory.framework.heap.size: 64m
  43. taskmanager.memory.framework.off-heap.size: 64m
  44. taskmanager.memory.jvm-metaspace.size: 64m
  45. taskmanager.memory.jvm-overhead.fraction: 0.2
  46. taskmanager.memory.jvm-overhead.min: 16m
  47. taskmanager.memory.jvm-overhead.max: 64m
  48. # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
  49. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
  50. #
  51. # taskmanager.memory.flink.size: 1280m
  52. # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
  53. taskmanager.numberOfTaskSlots: 1
  54. # The parallelism used for programs that did not specify and other parallelism.
  55. parallelism.default: 1
  56. # The default file system scheme and authority.
  57. #
  58. # By default file paths without scheme are interpreted relative to the local
  59. # root file system 'file:///'. Use this to override the default and interpret
  60. # relative paths relative to a different file system,
  61. # for example 'hdfs://mynamenode:12345'
  62. #
  63. # fs.default-scheme
  64. #==============================================================================
  65. # High Availability
  66. #==============================================================================
  67. # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
  68. #
  69. # high-availability: zookeeper
  70. # The path where metadata for master recovery is persisted. While ZooKeeper stores
  71. # the small ground truth for checkpoint and leader election, this location stores
  72. # the larger objects, like persisted dataflow graphs.
  73. #
  74. # Must be a durable file system that is accessible from all nodes
  75. # (like HDFS, S3, Ceph, nfs, ...)
  76. #
  77. # high-availability.storageDir: hdfs:///flink/ha/
  78. # The list of ZooKeeper quorum peers that coordinate the high-availability
  79. # setup. This must be a list of the form:
  80. # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
  81. #
  82. # high-availability.zookeeper.quorum: localhost:2181
  83. # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
  84. # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
  85. # The default value is "open" and it can be changed to "creator" if ZK security is enabled
  86. #
  87. # high-availability.zookeeper.client.acl: open
  88. #==============================================================================
  89. # Fault tolerance and checkpointing
  90. #==============================================================================
  91. # The backend that will be used to store operator state checkpoints if
  92. # checkpointing is enabled.
  93. #
  94. # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
  95. # <class-name-of-factory>.
  96. #
  97. # state.backend: filesystem
  98. # Directory for checkpoints filesystem, when using any of the default bundled
  99. # state backends.
  100. #
  101. # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
  102. # Default target directory for savepoints, optional.
  103. #
  104. # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
  105. # Flag to enable/disable incremental checkpoints for backends that
  106. # support incremental checkpoints (like the RocksDB state backend).
  107. #
  108. # state.backend.incremental: false
  109. # The failover strategy, i.e., how the job computation recovers from task failures.
  110. # Only restart tasks that may have been affected by the task failure, which typically includes
  111. # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
  112. jobmanager.execution.failover-strategy: region
  113. #==============================================================================
  114. # Rest & web frontend
  115. #==============================================================================
  116. # The port to which the REST client connects to. If rest.bind-port has
  117. # not been specified, then the server will bind to this port as well.
  118. #
  119. #rest.port: 8081
  120. # The address to which the REST client will connect to
  121. #
  122. #rest.address: 0.0.0.0
  123. # Port range for the REST and web server to bind to.
  124. #
  125. #rest.bind-port: 8080-8090
  126. # The address that the REST & web server binds to
  127. #
  128. #rest.bind-address: 0.0.0.0
  129. # Flag to specify whether job submission is enabled from the web-based
  130. # runtime monitor. Uncomment to disable.
  131. #web.submit.enable: false
  132. #==============================================================================
  133. # Advanced
  134. #==============================================================================
  135. # Override the directories for temporary files. If not specified, the
  136. # system-specific Java temporary directory (java.io.tmpdir property) is taken.
  137. #
  138. # For framework setups on Yarn or Mesos, Flink will automatically pick up the
  139. # containers' temp directories without any need for configuration.
  140. #
  141. # Add a delimited list for multiple directories, using the system directory
  142. # delimiter (colon ':' on unix) or a comma, e.g.:
  143. # /data1/tmp:/data2/tmp:/data3/tmp
  144. #
  145. # Note: Each directory entry is read from and written to by a different I/O
  146. # thread. You can include the same directory multiple times in order to create
  147. # multiple I/O threads against that directory. This is for example relevant for
  148. # high-throughput RAIDs.
  149. #
  150. io.tmp.dirs: /tmp
  151. # The classloading resolve order. Possible values are 'child-first' (Flink's default)
  152. # and 'parent-first' (Java's default).
  153. #
  154. # Child first classloading allows users to use different dependency/library
  155. # versions in their application than those in the classpath. Switching back
  156. # to 'parent-first' may help with debugging dependency issues.
  157. #
  158. # classloader.resolve-order: child-first
  159. # The amount of memory going to the network stack. These numbers usually need
  160. # no tuning. Adjusting them may be necessary in case of an "Insufficient number
  161. # of network buffers" error. The default min is 64MB, the default max is 1GB.
  162. #
  163. # taskmanager.memory.network.fraction: 0.1
  164. # taskmanager.memory.network.min: 64mb
  165. # taskmanager.memory.network.max: 1gb
  166. taskmanager.memory.network.fraction: 0.1
  167. taskmanager.memory.network.min: 1mb
  168. taskmanager.memory.network.max: 256mb
  169. #==============================================================================
  170. # Flink Cluster Security Configuration
  171. #==============================================================================
  172. # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
  173. # may be enabled in four steps:
  174. # 1. configure the local krb5.conf file
  175. # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
  176. # 3. make the credentials available to various JAAS login contexts
  177. # 4. configure the connector to use JAAS/SASL
  178. # The below configure how Kerberos credentials are provided. A keytab will be used instead of
  179. # a ticket cache if the keytab path and principal are set.
  180. # security.kerberos.login.use-ticket-cache: true
  181. # security.kerberos.login.keytab: /path/to/kerberos/keytab
  182. # security.kerberos.login.principal: flink-user
  183. # The configuration below defines which JAAS login contexts
  184. # security.kerberos.login.contexts: Client,KafkaClient
  185. #==============================================================================
  186. # ZK Security Configuration
  187. #==============================================================================
  188. # Below configurations are applicable if ZK ensemble is configured for security
  189. # Override below configuration to provide custom ZK service name if configured
  190. # zookeeper.sasl.service-name: zookeeper
  191. # The configuration below must match one of the values set in "security.kerberos.login.contexts"
  192. # zookeeper.sasl.login-context-name: Client
  193. #==============================================================================
  194. # HistoryServer
  195. #==============================================================================
  196. # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
  197. # Directory to upload completed jobs to. Add this directory to the list of
  198. # monitored directories of the HistoryServer as well (see below).
  199. #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
  200. # The address under which the web-based HistoryServer listens.
  201. #historyserver.web.address: 0.0.0.0
  202. # The port under which the web-based HistoryServer listens.
  203. #historyserver.web.port: 8082
  204. # Comma separated list of directories to monitor for completed jobs.
  205. #historyserver.archive.fs.dir: hdfs:///completed-jobs/
  206. # Interval in milliseconds for refreshing the monitored directories.
  207. #historyserver.archive.fs.refresh-interval: 10000

 

 

standalone 集群 

 

修改 masters , slaves 文件中的内容

 standalone 主要是进程运行在本地,这个时候除了修改配置文件,我们还需要修改  conf 目录下的 masters slaves 文件。

masters 文件内容如下:

cdh-manager:8081

slaves 文件内容如下:

  1. cdh-node1
  2. cdh-node2

masters 与 slaves 需要同步到各个节点上。

 

 

调整 conf/flink-conf.yaml 中的配置项 :

 

调整 conf/flink-conf.yaml 中的配置项 :

jobmanager.rpc.address

表示 Flink Cluster 集群的 JobManager RPC 通信地址,一般需要配置指定的 JobManager 的IP 地址,默认 localhost 不适合多节点集群模式。

 

jobmanager.heap.mb

对JobManager 的JVM堆内存大小进行配置,默认为1024M, 可以根据集群规模适当增加

 

taskmanager.heap.mb

对TaskManager 的JVM 堆内存大小进行配置,默认为 1024M, 可根据数据计算规模以及状态大小进行调整。

 

taskmanager.numberOfTaskSlots

配置每个TaskManager能够贡献出来的Slot数量,根据TaskManager 所在机器能供给Flink 的CPU 数量决定。

 

parallelism.default  

 Flink 任务默认并行度,与整个集群的CPU 数量有关,增加 parallelism 可以提高任务并行的计算的实例数,提升数据处理效率,但也会占用更多Slot.

 

taskmanager.tmp.dirs

集群临时文件夹地址,Flink 会将中间计算数据放置在相应路径中。

 

默认配置项会在集群启动的时候加载到Flink 集群中,当用户提交任务时,可以通过 -D 符号来动态设置系统参数,此时 flink-conf.yaml 配置文件中的参数就会被覆盖掉,例如使用    -Dfs.overwrite-files=true 动态参数

 

 

执行启动脚本 :

然后, 执行下 bin 目录下的 start_cluster.sh 即可。

 

 

相关节点

NodeManger 进程 

16988 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

TaskManager 进程

17329 org.apache.flink.runtime.taskexecutor.TaskManagerRunner

 

 

YARN 集群

 

flink on yarn 分为两种方式

1)Yarn Session Model

2)  Single Job Model

 

Yarn Session Model

   这种模式中Flink会向Hadoop Yarn 申请足够多的资源,并在 Yarn 上启动长时间运行的 Flink Session 集群,用户可以通过 RestAPI 或 Web页面 将Flink 任务提交到 Flink Session 集群上运行。y

 

 

基本配置

1) 依赖包

  首先是我们启动这样的任务,需要yarn 环境的基础依赖包。我们可以从官网上下载,并放到 lib 目录下。

参考文章 : https://blog.csdn.net/Alex_Sheng_Sea/article/details/102607937

如果我们不下载 Hadoop 相关的包,会出现如下报错 :

NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException

 

相关包 Hadoop下载地址 : 根据hadoop 版本选择合适的包 !! 

https://flink.apache.org/downloads.html#apache-flink-1101

 

2) 环境配置

为了能找到hadoop 中相关的配置,我这里将相关的配置写到了 flink-conf.yaml 中 

  1. env.yarn.conf.dir: /etc/hadoop/conf.cloudera.yarn
  2. env.hadoop.conf.dir: /etc/hadoop/conf.cloudera.hdfs

 

=====================

 

集群启动相关参数

 

启动 yarn-session 之前,我们看看都可以调整那些参数 :

Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
 

 

根据以上参数,我们启动一个 On Yarn Session 集群 :

./yarn-session.sh -d  -n 4 -jm 64m -tm 512m -s 4

-n 4 启动4个Yarn Container

-jm 参数配置JobManager 的JVM内存大小

-tm 参数配置TaskManager 内存大小

-s 集群总共启动16个slots 来提供应用启动task 实例

 

注意后台启动参数 -d 要写在第一位

 -d,--detached                   If present, runs the job in detached mode

 

 

 

 

 

Single Job Model

 

Single Job Model 主要是通过 flink 指令进行提交,相关的参数

  1. Action "run" compiles and runs a program.
  2. Syntax: run [OPTIONS] <jar-file> <arguments>
  3. "run" action options:
  4. -c,--class <classname> Class with the program entry point
  5. ("main()" method). Only needed if the
  6. JAR file does not specify the class in
  7. its manifest.
  8. -C,--classpath <url> Adds a URL to each user code
  9. classloader on all nodes in the
  10. cluster. The paths must specify a
  11. protocol (e.g. file://) and be
  12. accessible on all nodes (e.g. by means
  13. of a NFS share). You can use this
  14. option multiple times for specifying
  15. more than one URL. The protocol must
  16. be supported by the {@link
  17. java.net.URLClassLoader}.
  18. -d,--detached If present, runs the job in detached
  19. mode
  20. -n,--allowNonRestoredState Allow to skip savepoint state that
  21. cannot be restored. You need to allow
  22. this if you removed an operator from
  23. your program that was part of the
  24. program when the savepoint was
  25. triggered.
  26. -p,--parallelism <parallelism> The parallelism with which to run the
  27. program. Optional flag to override the
  28. default value specified in the
  29. configuration.
  30. -py,--python <pythonFile> Python script with the program entry
  31. point. The dependent resources can be
  32. configured with the `--pyFiles`
  33. option.
  34. -pyarch,--pyArchives <arg> Add python archive files for job. The
  35. archive files will be extracted to the
  36. working directory of python UDF
  37. worker. Currently only zip-format is
  38. supported. For each archive file, a
  39. target directory be specified. If the
  40. target directory name is specified,
  41. the archive file will be extracted to
  42. a name can directory with the
  43. specified name. Otherwise, the archive
  44. file will be extracted to a directory
  45. with the same name of the archive
  46. file. The files uploaded via this
  47. option are accessible via relative
  48. path. '#' could be used as the
  49. separator of the archive file path and
  50. the target directory name. Comma (',')
  51. could be used as the separator to
  52. specify multiple archive files. This
  53. option can be used to upload the
  54. virtual environment, the data files
  55. used in Python UDF (e.g.: --pyArchives
  56. file:///tmp/py37.zip,file:///tmp/data.
  57. zip#data --pyExecutable
  58. py37.zip/py37/bin/python). The data
  59. files could be accessed in Python UDF,
  60. e.g.: f = open('data/data.txt', 'r').
  61. -pyexec,--pyExecutable <arg> Specify the path of the python
  62. interpreter used to execute the python
  63. UDF worker (e.g.: --pyExecutable
  64. /usr/local/bin/python3). The python
  65. UDF worker depends on Python 3.5+,
  66. Apache Beam (version == 2.15.0), Pip
  67. (version >= 7.1.0) and SetupTools
  68. (version >= 37.0.0). Please ensure
  69. that the specified environment meets
  70. the above requirements.
  71. -pyfs,--pyFiles <pythonFiles> Attach custom python files for job.
  72. These files will be added to the
  73. PYTHONPATH of both the local client
  74. and the remote python UDF worker. The
  75. standard python resource file suffixes
  76. such as .py/.egg/.zip or directory are
  77. all supported. Comma (',') could be
  78. used as the separator to specify
  79. multiple files (e.g.: --pyFiles
  80. file:///tmp/myresource.zip,hdfs:///$na
  81. menode_address/myresource2.zip).
  82. -pym,--pyModule <pythonModule> Python module with the program entry
  83. point. This option must be used in
  84. conjunction with `--pyFiles`.
  85. -pyreq,--pyRequirements <arg> Specify a requirements.txt file which
  86. defines the third-party dependencies.
  87. These dependencies will be installed
  88. and added to the PYTHONPATH of the
  89. python UDF worker. A directory which
  90. contains the installation packages of
  91. these dependencies could be specified
  92. optionally. Use '#' as the separator
  93. if the optional parameter exists
  94. (e.g.: --pyRequirements
  95. file:///tmp/requirements.txt#file:///t
  96. mp/cached_dir).
  97. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
  98. from (for example
  99. hdfs:///flink/savepoint-1537).
  100. -sae,--shutdownOnAttachedExit If the job is submitted in attached
  101. mode, perform a best-effort cluster
  102. shutdown when the CLI is terminated
  103. abruptly, e.g., in response to a user
  104. interrupt, such as typing Ctrl + C.
  105. Options for executor mode:
  106. -D <property=value> Generic configuration options for
  107. execution/deployment and for the configured executor.
  108. The available options can be found at
  109. https://ci.apache.org/projects/flink/flink-docs-stabl
  110. e/ops/config.html
  111. -e,--executor <arg> The name of the executor to be used for executing the
  112. given job, which is equivalent to the
  113. "execution.target" config option. The currently
  114. available executors are: "remote", "local",
  115. "kubernetes-session", "yarn-per-job", "yarn-session".
  116. Options for yarn-cluster mode:
  117. -d,--detached If present, runs the job in detached
  118. mode
  119. -m,--jobmanager <arg> Address of the JobManager (master) to
  120. which to connect. Use this flag to
  121. connect to a different JobManager than
  122. the one specified in the
  123. configuration.
  124. -yat,--yarnapplicationType <arg> Set a custom application type for the
  125. application on YARN
  126. -yD <property=value> use value for given property
  127. -yd,--yarndetached If present, runs the job in detached
  128. mode (deprecated; use non-YARN
  129. specific option instead)
  130. -yh,--yarnhelp Help for the Yarn session CLI.
  131. -yid,--yarnapplicationId <arg> Attach to running YARN session
  132. -yj,--yarnjar <arg> Path to Flink jar file
  133. -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
  134. optional unit (default: MB)
  135. -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
  136. application
  137. -ynm,--yarnname <arg> Set a custom name for the application
  138. on YARN
  139. -yq,--yarnquery Display available YARN resources
  140. (memory, cores)
  141. -yqu,--yarnqueue <arg> Specify YARN queue.
  142. -ys,--yarnslots <arg> Number of slots per TaskManager
  143. -yt,--yarnship <arg> Ship files in the specified directory
  144. (t for transfer)
  145. -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
  146. optional unit (default: MB)
  147. -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
  148. sub-paths for high availability mode
  149. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
  150. sub-paths for high availability mode
  151. Options for default mode:
  152. -m,--jobmanager <arg> Address of the JobManager (master) to which
  153. to connect. Use this flag to connect to a
  154. different JobManager than the one specified
  155. in the configuration.
  156. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths
  157. for high availability mode

 

Yarn Single Job 方式 :

./flink run -m yarn-cluster -yn 2 ./***.jar

-yn 表示任务需要的 taskmanger 数量 (1.10 已经取消了)

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/598030
推荐阅读
相关标签
  

闽ICP备14008679号