当前位置:   article > 正文

Flink_Flink TaskManager 内存模型 ,以及相关内存配置参数_taskmanager.memory.flink.size

taskmanager.memory.flink.size

 

Flink 中 TaskManager 中模型比较复杂,本文讲解下 Flink 中的内存模型以及相关的配置 ( 基于 Flink 1.10 )

 

 

内存分布简介 

首先我们看一下 Flink 官网给出的内存示意图 :

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#set-up-task-executor-memory

 

一般来说我们只需要选择以下3种方式的一种对  task-manager 的内存调整就可以了 ,下面列出这3种方式 :

  1. taskmanager.memory.flink.size
  2. taskmanager.memory.process.size
  3. taskmanager.memory.task.heap.size & taskmanager.memory.managed.size

再结合上面的图,我们给出这3种选配参数之间的关系:

The total process memory of Flink JVM processes consists of memory consumed by Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM heap, managed memory (managed by Flink) and other direct (or native) memory.

可以看出来 它们 3者 之间是包含关系  : 

taskmanager.memory.process.size 

>  taskmanager.memory.flink.size 

>  taskmanager.memory.task.heap.size  +   taskmanager.memory.managed.size

 

 

内存分布详细介绍 :

 

但是我们有时候需要更详细的配置调整各个空间的大小 :

下给出 task-manager  更详细的内存分布图

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

 

The following table lists all memory components, depicted above, and references Flink configuration options which affect the size of the respective components:

下面列出了,所有影响上面内存组件的配置

  Component    Configuration options    Description  
Framework Heap Memorytaskmanager.memory.framework.heap.sizeJVM heap memory dedicated to Flink framework (advanced option)
Task Heap Memorytaskmanager.memory.task.heap.sizeJVM heap memory dedicated to Flink application to run operators and user code
Managed memorytaskmanager.memory.managed.size
taskmanager.memory.managed.fraction
Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend
Framework Off-heap Memorytaskmanager.memory.framework.off-heap.sizeOff-heap direct (or native) memory dedicated to Flink framework (advanced option)
Task Off-heap Memorytaskmanager.memory.task.off-heap.sizeOff-heap direct (or native) memory dedicated to Flink application to run operators
Network Memorytaskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network), it is a capped fractionated component of the total Flink memory
JVM metaspacetaskmanager.memory.jvm-metaspace.sizeMetaspace size of the Flink JVM process
JVM Overheadtaskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
Native memory reserved for other JVM overhead: e.g. thread stacks, code cache, garbage collection space etc, it is a capped fractionated component of the total process memory

 

As you can see, the size of some memory components can be simply set by the respective option. Other components can be tuned using multiple options.

如您所见,某些内存组件的大小可以简单地由相应的选项设置。其他组件可以使用多个选项进行自动调整。

 

 

 

 

内存 相关参数的默认值 :

 

Memory Configuration : 

These configuration values control the way that TaskManagers and JobManagers use memory.

Flink tries to shield users as much as possible from the complexity of configuring the JVM for data-intensive processing. In most cases, users should only need to set the values taskmanager.memory.process.size or taskmanager.memory.flink.size (depending on how the setup), and possibly adjusting the ratio of JVM heap and Managed Memory via taskmanager.memory.managed.fraction. The other options below can be used for performane tuning and fixing memory related errors.

For a detailed explanation of how these options interact, see the documentation on TaskManager memory configuration.

KeyDefaultTypeDescription

taskmanager.memory.flink.size

(none)MemorySizeTotal Flink Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, except for JVM Metaspace and JVM Overhead. It consists of Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Managed Memory, and Network Memory. See also 'taskmanager.memory.process.size' for total process memory size configuration.

taskmanager.memory.framework.heap.size

128 mbMemorySizeFramework Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for TaskExecutor framework, which will not be allocated to task slots.

taskmanager.memory.framework.off-heap.size

128 mbMemorySizeFramework Off-Heap Memory size for TaskExecutors. This is the size of off-heap memory (JVM direct memory and native memory) reserved for TaskExecutor framework, which will not be allocated to task slots. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

taskmanager.memory.jvm-metaspace.size

256 mbMemorySizeJVM Metaspace Size for the TaskExecutors.

taskmanager.memory.jvm-overhead.fraction

0.1FloatFraction of Total Process Memory to be reserved for JVM Overhead. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.jvm-overhead.max

1 gbMemorySizeMax JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.jvm-overhead.min

192 mbMemorySizeMin JVM Overhead size for the TaskExecutors. This is off-heap memory reserved for JVM overhead, such as thread stack space, compile cache, etc. This includes native memory but not direct memory, and will not be counted when Flink calculates JVM max direct memory size parameter. The size of JVM Overhead is derived to make up the configured fraction of the Total Process Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of JVM Overhead can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.managed.fraction

0.4FloatFraction of Total Flink Memory to be used as Managed Memory, if Managed Memory size is not explicitly specified.

taskmanager.memory.managed.size

(none)MemorySizeManaged Memory size for TaskExecutors. This is the size of off-heap memory managed by the memory manager, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend. Memory consumers can either allocate memory from the memory manager in the form of MemorySegments, or reserve bytes from the memory manager and keep their memory usage within that boundary. If unspecified, it will be derived to make up the configured fraction of the Total Flink Memory.

taskmanager.memory.network.fraction

0.1FloatFraction of Total Flink Memory to be used as Network Memory. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max size to the same value.

taskmanager.memory.network.max

1 gbMemorySizeMax Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.

taskmanager.memory.network.min

64 mbMemorySizeMin Network Memory size for TaskExecutors. Network Memory is off-heap memory reserved for ShuffleEnvironment (e.g., network buffers). Network Memory size is derived to make up the configured fraction of the Total Flink Memory. If the derived size is less/greater than the configured min/max size, the min/max size will be used. The exact size of Network Memory can be explicitly specified by setting the min/max to the same value.

taskmanager.memory.process.size

(none)MemorySizeTotal Process Memory size for the TaskExecutors. This includes all the memory that a TaskExecutor consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On containerized setups, this should be set to the container memory. See also 'taskmanager.memory.flink.size' for total Flink memory size configuration.

taskmanager.memory.task.heap.size

(none)MemorySizeTask Heap Memory size for TaskExecutors. This is the size of JVM heap memory reserved for tasks. If not specified, it will be derived as Total Flink Memory minus Framework Heap Memory, Task Off-Heap Memory, Managed Memory and Network Memory.

taskmanager.memory.task.off-heap.size

0 bytesMemorySizeTask Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter.

 

 

相关的 JVM 参数 

Flink explicitly adds the following memory related JVM arguments while starting the task executor process, based on the configured or derived memory component sizes:

  JVM Arguments    Value  
-Xmx and -XmsFramework + Task Heap Memory
-XX:MaxDirectMemorySizeFramework + Task Off-Heap + Network Memory
-XX:MaxMetaspaceSizeJVM Metaspace

 

 

 

常见错误 以及简单示例  

 

由于我使用的是虚拟机器,所以内存有限,我不能使用以上的默认值,我尝试去调整了这些值

 

刚开始由于对内存模型不熟,会遇到如下的问题 :

 exceed configured Total Flink Memory (256.000mb (268435456 bytes)). 

  1. [root@cdh-manager bin]# ./start-cluster.sh
  2. Starting cluster.
  3. [INFO] 1 instance(s) of standalonesession are already running on cdh-manager.
  4. Starting standalonesession daemon on host cdh-manager.
  5. [ERROR] Unexpected result: at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)
  6. [ERROR] The last line of the BashJavaUtils outputs is expected to be the execution result, following the prefix 'BASH_JAVA_UTILS_EXEC_RESULT:'
  7. - Loading configuration property: jobmanager.rpc.address, localhost
  8. - Loading configuration property: jobmanager.rpc.port, 6123
  9. - Loading configuration property: jobmanager.heap.size, 256m
  10. - Loading configuration property: taskmanager.memory.flink.size, 256m
  11. - Loading configuration property: taskmanager.numberOfTaskSlots, 1
  12. - Loading configuration property: parallelism.default, 1
  13. - Loading configuration property: jobmanager.execution.failover-strategy, region
  14. - Loading configuration property: taskmanager.memory.network.fraction, 0.1
  15. - Loading configuration property: taskmanager.memory.network.min, 1mb
  16. - Loading configuration property: taskmanager.memory.network.max, 256mb
  17. Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (102.400mb (107374184 bytes)) and Network Memory (25.600mb (26843546 bytes)) exceed configured Total Flink Memory (256.000mb (268435456 bytes)).
  18. at org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.deriveInternalMemoryFromTotalFlinkMemory(TaskExecutorProcessUtils.java:320)
  19. at org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.deriveProcessSpecWithTotalFlinkMemory(TaskExecutorProcessUtils.java:221)
  20. at org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.java:143)
  21. at org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
  22. at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)
  23. [ERROR] Could not get JVM parameters properly.

 

 

我最终基于我的机器给出了如下的配置 :

taskmanager 的 process memory 为 512m 

内存相关的配置属性 

  1. taskmanager.memory.process.size: 512m
  2. taskmanager.memory.framework.heap.size: 64m
  3. taskmanager.memory.framework.off-heap.size: 64m
  4. taskmanager.memory.jvm-metaspace.size: 64m
  5. taskmanager.memory.jvm-overhead.fraction: 0.2
  6. taskmanager.memory.jvm-overhead.min: 16m
  7. taskmanager.memory.jvm-overhead.max: 64m
  8. taskmanager.memory.network.fraction: 0.1
  9. taskmanager.memory.network.min: 1mb
  10. taskmanager.memory.network.max: 256mb

 

 

 

 

完整的配置文件 flink-conf.yaml

  1. ################################################################################
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. ################################################################################
  18. #=================================================================
  19. #=================================================================
  20. env.yarn.conf.dir: /etc/hadoop/conf.cloudera.yarn
  21. env.hadoop.conf.dir: /etc/hadoop/conf.cloudera.hdfs
  22. #==============================================================================
  23. # Common
  24. #==============================================================================
  25. # The external address of the host on which the JobManager runs and can be
  26. # reached by the TaskManagers and any clients which want to connect. This setting
  27. # is only used in Standalone mode and may be overwritten on the JobManager side
  28. # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executabl
  29. # In high availability mode, if you use the bin/start-cluster.sh script and setup
  30. # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
  31. # automatically configure the host name based on the hostname of the node where th
  32. # JobManager runs.
  33. jobmanager.rpc.address: cdh-manager
  34. # The RPC port where the JobManager is reachable.
  35. jobmanager.rpc.port: 6123
  36. # The heap size for the JobManager JVM
  37. jobmanager.heap.size: 64m
  38. # The total process memory size for the TaskManager.
  39. #
  40. # Note this accounts for all memory usage within the TaskManager process, includintaspace and other overhead.
  41. taskmanager.memory.process.size: 512m
  42. taskmanager.memory.framework.heap.size: 64m
  43. taskmanager.memory.framework.off-heap.size: 64m
  44. taskmanager.memory.jvm-metaspace.size: 64m
  45. taskmanager.memory.jvm-overhead.fraction: 0.2
  46. taskmanager.memory.jvm-overhead.min: 16m
  47. taskmanager.memory.jvm-overhead.max: 64m
  48. # To exclude JVM metaspace and overhead, please, use total Flink memory size insteaskmanager.memory.process.size'.
  49. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink me
  50. #
  51. # taskmanager.memory.flink.size: 1280m
  52. # The number of task slots that each TaskManager offers. Each slot runs one paralline.
  53. taskmanager.numberOfTaskSlots: 1
  54. # The parallelism used for programs that did not specify and other parallelism.
  55. parallelism.default: 1
  56. # The default file system scheme and authority.
  57. #
  58. # By default file paths without scheme are interpreted relative to the local
  59. # root file system 'file:///'. Use this to override the default and interpret
  60. # relative paths relative to a different file system,
  61. # for example 'hdfs://mynamenode:12345'
  62. #
  63. # fs.default-scheme
  64. #==============================================================================
  65. # High Availability
  66. #==============================================================================
  67. # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
  68. #
  69. # high-availability: zookeeper
  70. # The path where metadata for master recovery is persisted. While ZooKeeper stores
  71. # the small ground truth for checkpoint and leader election, this location stores
  72. # the larger objects, like persisted dataflow graphs.
  73. #
  74. # Must be a durable file system that is accessible from all nodes
  75. # (like HDFS, S3, Ceph, nfs, ...)
  76. #
  77. # high-availability.storageDir: hdfs:///flink/ha/
  78. # The list of ZooKeeper quorum peers that coordinate the high-availability
  79. # setup. This must be a list of the form:
  80. # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
  81. #
  82. # high-availability.zookeeper.quorum: localhost:2181
  83. # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperPrograml#sc_BuiltinACLSchemes
  84. # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
  85. # The default value is "open" and it can be changed to "creator" if ZK security is
  86. #
  87. # high-availability.zookeeper.client.acl: open
  88. #==============================================================================
  89. # Fault tolerance and checkpointing
  90. #==============================================================================
  91. # The backend that will be used to store operator state checkpoints if
  92. # checkpointing is enabled.
  93. #
  94. # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
  95. # <class-name-of-factory>.
  96. #
  97. # state.backend: filesystem
  98. # Directory for checkpoints filesystem, when using any of the default bundled
  99. # state backends.
  100. #
  101. # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
  102. # Default target directory for savepoints, optional.
  103. #
  104. # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
  105. # Flag to enable/disable incremental checkpoints for backends that
  106. # support incremental checkpoints (like the RocksDB state backend).
  107. #
  108. # state.backend.incremental: false
  109. # The failover strategy, i.e., how the job computation recovers from task failures
  110. # Only restart tasks that may have been affected by the task failure, which typicaudes
  111. # downstream tasks and potentially upstream tasks if their produced data is no lonlable for consumption.
  112. jobmanager.execution.failover-strategy: region
  113. #==============================================================================
  114. # Rest & web frontend
  115. #==============================================================================
  116. # The port to which the REST client connects to. If rest.bind-port has
  117. # not been specified, then the server will bind to this port as well.
  118. #
  119. #rest.port: 8081
  120. # The address to which the REST client will connect to
  121. #
  122. #rest.address: 0.0.0.0
  123. # Port range for the REST and web server to bind to.
  124. #
  125. #rest.bind-port: 8080-8090
  126. # The address that the REST & web server binds to
  127. #
  128. #rest.bind-address: 0.0.0.0
  129. # Flag to specify whether job submission is enabled from the web-based
  130. # runtime monitor. Uncomment to disable.
  131. #web.submit.enable: false
  132. #==============================================================================
  133. # Advanced
  134. #==============================================================================
  135. # Override the directories for temporary files. If not specified, the
  136. # system-specific Java temporary directory (java.io.tmpdir property) is taken.
  137. #
  138. # For framework setups on Yarn or Mesos, Flink will automatically pick up the
  139. # containers' temp directories without any need for configuration.
  140. #
  141. # Add a delimited list for multiple directories, using the system directory
  142. # delimiter (colon ':' on unix) or a comma, e.g.:
  143. # /data1/tmp:/data2/tmp:/data3/tmp
  144. #
  145. # Note: Each directory entry is read from and written to by a different I/O
  146. # thread. You can include the same directory multiple times in order to create
  147. # multiple I/O threads against that directory. This is for example relevant for
  148. # high-throughput RAIDs.
  149. #
  150. io.tmp.dirs: /tmp
  151. # The classloading resolve order. Possible values are 'child-first' (Flink's defau
  152. # and 'parent-first' (Java's default).
  153. #
  154. # Child first classloading allows users to use different dependency/library
  155. # versions in their application than those in the classpath. Switching back
  156. # to 'parent-first' may help with debugging dependency issues.
  157. #
  158. # classloader.resolve-order: child-first
  159. # The amount of memory going to the network stack. These numbers usually need
  160. # no tuning. Adjusting them may be necessary in case of an "Insufficient number
  161. # of network buffers" error. The default min is 64MB, the default max is 1GB.
  162. #
  163. # taskmanager.memory.network.fraction: 0.1
  164. # taskmanager.memory.network.min: 64mb
  165. # taskmanager.memory.network.max: 1gb
  166. taskmanager.memory.network.fraction: 0.1
  167. taskmanager.memory.network.min: 1mb
  168. taskmanager.memory.network.max: 256mb
  169. #==============================================================================
  170. # Flink Cluster Security Configuration
  171. #==============================================================================
  172. # Kerberos authentication for various components - Hadoop, ZooKeeper, and connecto
  173. # may be enabled in four steps:
  174. # 1. configure the local krb5.conf file
  175. # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
  176. # 3. make the credentials available to various JAAS login contexts
  177. # 4. configure the connector to use JAAS/SASL
  178. # The below configure how Kerberos credentials are provided. A keytab will be used of
  179. # a ticket cache if the keytab path and principal are set.
  180. # security.kerberos.login.use-ticket-cache: true
  181. # security.kerberos.login.keytab: /path/to/kerberos/keytab
  182. # security.kerberos.login.principal: flink-user
  183. # The configuration below defines which JAAS login contexts
  184. # security.kerberos.login.contexts: Client,KafkaClient
  185. #==============================================================================
  186. # ZK Security Configuration
  187. #==============================================================================
  188. # Below configurations are applicable if ZK ensemble is configured for security
  189. # Override below configuration to provide custom ZK service name if configured
  190. # zookeeper.sasl.service-name: zookeeper
  191. # The configuration below must match one of the values set in "security.kerberos.ltexts"
  192. # zookeeper.sasl.login-context-name: Client
  193. #==============================================================================
  194. # HistoryServer
  195. #==============================================================================
  196. # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
  197. # Directory to upload completed jobs to. Add this directory to the list of
  198. # monitored directories of the HistoryServer as well (see below).
  199. #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
  200. # The address under which the web-based HistoryServer listens.
  201. #historyserver.web.address: 0.0.0.0
  202. # The port under which the web-based HistoryServer listens.
  203. #historyserver.web.port: 8082
  204. # Comma separated list of directories to monitor for completed jobs.
  205. #historyserver.archive.fs.dir: hdfs:///completed-jobs/
  206. # Interval in milliseconds for refreshing the monitored directories.
  207. #historyserver.archive.fs.refresh-interval: 10000

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/549821
推荐阅读
相关标签
  

闽ICP备14008679号