当前位置:   article > 正文

Flink内存优化_优化flink内存占用

优化flink内存占用

flink 分别提供了通用和细粒度的内存配置,来满足不同用户的需求。

一、Flink内存模型

在这里插入图片描述

  • Total Process Memory: 包括 flink 应用消耗的内存(Total Flink Memory) 和 JVM 消耗的内存
  • Total Flink Memory: 包括 JVM heap, managed memory 和 direct memory

如果用户只是使用了 local 模式运行 flink(比如通过idea),那么只有部分内存配置是生效的,具体配置后面有讲到。

对于集群上运行的大部分程序来说,最简单的是配置下面任意一个

  • Total Flink memory (taskmanager.memory.flink.size)
  • Total process memory (taskmanager.memory.process.size)

其余的内存组件将根据默认值或额外配置的选项自动调整。关于其他内存组件的更多细节,可以阅读下一章节。

对于 standalone 部署模式来说,因为我们要声明分配给 flink 自身的内存大小,所以配置 Total Flink memory 更合适。

对于容器化部署模式来说,配置 Total Process Memory 更好。因为 Total Process Memory 实际上代表 Flink JVM 进程占用的总内存,也是容器允许 TM 占用内存的上线,超过这个值就会被容器杀掉。

另一种方法是配置 task heap (taskmanager.memory.task.heap.size) 和 managed memory(taskmanager.memory.managed.size). 这是一种细粒度的配置方式。

注意:上面提到的三种配置方式,必须至少选择一种进行配置(官方建议只选一种),否则 Flink 会启动失败。这三种配置没有默认值,必须选择一种配置:

  • taskmanager.memory.flink.size
  • taskmanager.memory.process.size
  • taskmanager.memory.task.heap.size and taskmanager.memory.managed.size

注意:不建议同时显式地配置 Total Flink memory 和整个 Total Process Memory 。由于潜在的内存配置冲突,它可能导致部署失败。其他内存组件的额外配置也需要谨慎,因为它可能会产生更多的配置冲突。


二、配置 Task Heap 和 Managed Memory

除了上一小节提到的总内存配置,用户可以显示的指定 task heap 和 managed memory。例如当我们需要确定的内存时,可以直接通过这种配置指定,不再需要像之前那样,从总内存中计算出 task heap.
其余的内存组件将根据默认值或额外配置的选项自动调整。



2.1、Task (Operator) Heap Memory

如果你想确保你的代码有一定数量的JVM堆可用,可以显式地设置任务堆内存(taskmanager.memory.task.heap.size)。它将被添加到JVM堆大小中,并将专用于运行用户代码的Flink操作符。
直接影响 task executor 的 -Xmx and -Xms 参数:

-Xmx and -Xms  =  flink framework Heap(默认 128M) + Task Heap



2.2、Managed Memory

Managed memory 由 flink 管理,并且使用的 native memory (off-heap). 使用 Managed memory 的有:

  • Streaming 作业的 RocksDB state backend
  • Batch 作业可以使用它对中间结果进行排序(sort)、散列表(hash)和缓存(caching )

Managed memory 有两种配置方法:

  • 直接 taskmanager.memory.managed.size 配置
  • 计算 Total Flink Memory * taskmanager.memory.managed.fraction(默认0.4)

taskmanager.memory.managed.size 会覆盖通过 fraction 计算得出的大小


三、配置 Off-Heap Memory (direct or native)

用户申请的 off-heap 被算做 task off-heap memory,通过 taskmanager.memory.task.off-heap.size 配置。
注意:用户也可以调整 framework off-heap memory,即 flink 框架使用的堆外内存。这个是高级配置,最好确定需要时才进行调整。

flink 将 framework off-heap memory 和 task off-heap memory 纳入 JVM 的 direct memory 限制参数中:

-XX:MaxDirectMemorySize = Framework  Off-heap + Task Off-Heap + Network Memory

四、配置flink-conf.yaml示例 

  1. # JobManager runs.
  2. jobmanager.rpc.address: www.slave2.asiainfo.com
  3. # The RPC port where the JobManager is reachable.
  4. jobmanager.rpc.port: 6123
  5. # The heap size for the JobManager JVM
  6. #jobmanager.heap.size: 2048m
  7. # The total process memory size for the TaskManager.
  8. #
  9. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
  10. #taskmanager.memory.process.size: 4096m
  11. # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
  12. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
  13. #
  14. taskmanager.memory.flink.size: 16384m
  15. taskmanager.memory.jvm-metaspace.size: 1024m
  16. # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
  17. taskmanager.numberOfTaskSlots: 4
  18. # The parallelism used for programs that did not specify and other parallelism.
  19. parallelism.default: 4
  20. # The default file system scheme and authority.
  21. #
  22. # By default file paths without scheme are interpreted relative to the local
  23. # root file system 'file:///'. Use this to override the default and interpret
  24. # relative paths relative to a different file system,
  25. # for example 'hdfs://mynamenode:12345'
  26. #
  27. # fs.default-scheme
  28. #==============================================================================
  29. # High Availability
  30. #==============================================================================
  31. # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
  32. #
  33. # high-availability: zookeeper
  34. # The path where metadata for master recovery is persisted. While ZooKeeper stores
  35. # the small ground truth for checkpoint and leader election, this location stores
  36. # the larger objects, like persisted dataflow graphs.
  37. #
  38. # Must be a durable file system that is accessible from all nodes
  39. # (like HDFS, S3, Ceph, nfs, ...)
  40. #
  41. # high-availability.storageDir: hdfs:///flink/ha/
  42. # The list of ZooKeeper quorum peers that coordinate the high-availability
  43. # setup. This must be a list of the form:
  44. # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
  45. #
  46. # high-availability.zookeeper.quorum: localhost:2181
  47. # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
  48. # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
  49. # The default value is "open" and it can be changed to "creator" if ZK security is enabled
  50. #
  51. # high-availability.zookeeper.client.acl: open
  52. #==============================================================================
  53. # Fault tolerance and checkpointing
  54. #==============================================================================
  55. # The backend that will be used to store operator state checkpoints if
  56. # checkpointing is enabled.
  57. #
  58. # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
  59. # <class-name-of-factory>.
  60. #
  61. # state.backend: filesystem
  62. # Directory for checkpoints filesystem, when using any of the default bundled
  63. # state backends.
  64. #
  65. # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
  66. # Default target directory for savepoints, optional.
  67. #
  68. # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
  69. # Flag to enable/disable incremental checkpoints for backends that
  70. # support incremental checkpoints (like the RocksDB state backend).
  71. #
  72. # state.backend.incremental: false
  73. # The failover strategy, i.e., how the job computation recovers from task failures.
  74. # Only restart tasks that may have been affected by the task failure, which typically includes
  75. # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
  76. jobmanager.execution.failover-strategy: region
  77. #==============================================================================
  78. # Rest & web frontend
  79. #==============================================================================
  80. # The port to which the REST client connects to. If rest.bind-port has
  81. # not been specified, then the server will bind to this port as well.
  82. #
  83. #rest.port: 8081
  84. # The address to which the REST client will connect to
  85. #
  86. #rest.address: 0.0.0.0
  87. # Port range for the REST and web server to bind to.
  88. #
  89. #rest.bind-port: 8080-8090
  90. # The address that the REST & web server binds to
  91. #
  92. #rest.bind-address: 0.0.0.0
  93. # Flag to specify whether job submission is enabled from the web-based
  94. # runtime monitor. Uncomment to disable.
  95. #web.submit.enable: false
  96. #==============================================================================
  97. # Advanced
  98. #==============================================================================
  99. # Override the directories for temporary files. If not specified, the
  100. # system-specific Java temporary directory (java.io.tmpdir property) is taken.
  101. #
  102. # For framework setups on Yarn or Mesos, Flink will automatically pick up the
  103. # containers' temp directories without any need for configuration.
  104. #
  105. # Add a delimited list for multiple directories, using the system directory
  106. # delimiter (colon ':' on unix) or a comma, e.g.:
  107. # /data1/tmp:/data2/tmp:/data3/tmp
  108. #
  109. # Note: Each directory entry is read from and written to by a different I/O
  110. # thread. You can include the same directory multiple times in order to create
  111. # multiple I/O threads against that directory. This is for example relevant for
  112. # high-throughput RAIDs.
  113. #
  114. # io.tmp.dirs: /tmp
  115. # The classloading resolve order. Possible values are 'child-first' (Flink's default)
  116. # and 'parent-first' (Java's default).
  117. #
  118. # Child first classloading allows users to use different dependency/library
  119. # versions in their application than those in the classpath. Switching back
  120. # to 'parent-first' may help with debugging dependency issues.
  121. #
  122. # classloader.resolve-order: child-first
  123. # The amount of memory going to the network stack. These numbers usually need
  124. # no tuning. Adjusting them may be necessary in case of an "Insufficient number
  125. # of network buffers" error. The default min is 64MB, the default max is 1GB.
  126. #
  127. taskmanager.memory.network.fraction: 0.5
  128. # taskmanager.memory.network.min: 256mb
  129. # taskmanager.memory.network.max: 4gb
  130. #==============================================================================
  131. # Flink Cluster Security Configuration
  132. #==============================================================================
  133. # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
  134. # may be enabled in four steps:
  135. # 1. configure the local krb5.conf file
  136. # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
  137. # 3. make the credentials available to various JAAS login contexts
  138. # 4. configure the connector to use JAAS/SASL
  139. # The below configure how Kerberos credentials are provided. A keytab will be used instead of
  140. # a ticket cache if the keytab path and principal are set.
  141. # security.kerberos.login.use-ticket-cache: true
  142. # security.kerberos.login.keytab: /path/to/kerberos/keytab
  143. # security.kerberos.login.principal: flink-user
  144. # The configuration below defines which JAAS login contexts
  145. # security.kerberos.login.contexts: Client,KafkaClient
  146. #==============================================================================
  147. # ZK Security Configuration
  148. #==============================================================================
  149. # Below configurations are applicable if ZK ensemble is configured for security
  150. # Override below configuration to provide custom ZK service name if configured
  151. # zookeeper.sasl.service-name: zookeeper
  152. # The configuration below must match one of the values set in "security.kerberos.login.contexts"
  153. # zookeeper.sasl.login-context-name: Client
  154. #==============================================================================
  155. # HistoryServer
  156. #==============================================================================
  157. # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
  158. # Directory to upload completed jobs to. Add this directory to the list of
  159. # monitored directories of the HistoryServer as well (see below).
  160. #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
  161. # The address under which the web-based HistoryServer listens.
  162. #historyserver.web.address: 0.0.0.0
  163. # The port under which the web-based HistoryServer listens.
  164. #historyserver.web.port: 8082
  165. # Comma separated list of directories to monitor for completed jobs.
  166. #historyserver.archive.fs.dir: hdfs:///completed-jobs/
  167. # Interval in milliseconds for refreshing the monitored directories.
  168. #historyserver.archive.fs.refresh-interval: 10000

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/696370
推荐阅读
相关标签
  

闽ICP备14008679号