当前位置:   article > 正文

聊聊flink TaskManager的memory大小设置

taskmanager.memory.preallocate

本文主要研究一下flink TaskManager的memory大小设置

flink-conf.yaml

flink-release-1.7.2/flink-dist/src/main/resources/flink-conf.yaml

  1. # The heap size for the TaskManager JVM
  2. taskmanager.heap.size: 1024m
  3. # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
  4. taskmanager.numberOfTaskSlots: 1
  5. # Specify whether TaskManager's managed memory should be allocated when starting
  6. # up (true) or when memory is requested.
  7. #
  8. # We recommend to set this value to 'true' only in setups for pure batch
  9. # processing (DataSet API). Streaming setups currently do not use the TaskManager's
  10. # managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
  11. # while the 'memory' and 'filesystem' backends explicitly keep data as objects
  12. # to save on serialization cost.
  13. #
  14. # taskmanager.memory.preallocate: false
  15. # The amount of memory going to the network stack. These numbers usually need
  16. # no tuning. Adjusting them may be necessary in case of an "Insufficient number
  17. # of network buffers" error. The default min is 64MB, teh default max is 1GB.
  18. #
  19. # taskmanager.network.memory.fraction: 0.1
  20. # taskmanager.network.memory.min: 64mb
  21. # taskmanager.network.memory.max: 1gb
  • flink-conf.yaml提供了taskmanager.heap.size来设置taskmanager的memory(heap及offHeap)大小
  • 提供了taskmanager.memory相关配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size)用于设置memory
  • 提供了taskmanager.network.memory相关配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min)用于设置taskmanager的network stack的内存

config.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/config.sh

  1. #!/usr/bin/env bash
  2. # WARNING !!! , these values are only used if there is nothing else is specified in
  3. # conf/flink-conf.yaml
  4. DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
  5. DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
  6. DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
  7. DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
  8. DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
  9. DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
  10. DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
  11. DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
  12. DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
  13. KEY_TASKM_MEM_SIZE="taskmanager.heap.size"
  14. KEY_TASKM_MEM_MB="taskmanager.heap.mb"
  15. KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
  16. KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
  17. KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
  18. KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"
  19. KEY_TASKM_NET_BUF_FRACTION="taskmanager.network.memory.fraction"
  20. KEY_TASKM_NET_BUF_MIN="taskmanager.network.memory.min"
  21. KEY_TASKM_NET_BUF_MAX="taskmanager.network.memory.max"
  22. KEY_TASKM_NET_BUF_NR="taskmanager.network.numberOfBuffers" # fallback
  23. KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
  24. # Define FLINK_TM_HEAP if it is not already set
  25. if [ -z "${FLINK_TM_HEAP}" ]; then
  26. FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
  27. fi
  28. # Try read old config key, if new key not exists
  29. if [ "${FLINK_TM_HEAP}" == 0 ]; then
  30. FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}")
  31. fi
  32. # Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
  33. if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
  34. FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
  35. if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
  36. FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
  37. else
  38. FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
  39. fi
  40. fi
  41. # Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
  42. if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
  43. FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
  44. fi
  45. # Define FLINK_TM_OFFHEAP if it is not already set
  46. if [ -z "${FLINK_TM_OFFHEAP}" ]; then
  47. FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
  48. fi
  49. # Define FLINK_TM_MEM_PRE_ALLOCATE if it is not already set
  50. if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
  51. FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}")
  52. fi
  53. # Define FLINK_TM_NET_BUF_FRACTION if it is not already set
  54. if [ -z "${FLINK_TM_NET_BUF_FRACTION}" ]; then
  55. FLINK_TM_NET_BUF_FRACTION=$(readFromConfig ${KEY_TASKM_NET_BUF_FRACTION} 0.1 "${YAML_CONF}")
  56. fi
  57. # Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
  58. if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
  59. FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
  60. if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then
  61. FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
  62. FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
  63. fi
  64. fi
  65. # Define FLINK_TM_NET_BUF_MIN if it is not already set
  66. if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
  67. # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
  68. FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
  69. FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
  70. fi
  71. # Define FLINK_TM_NET_BUF_MAX if it is not already set
  72. if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
  73. # default: 1GB = 1073741824 bytes
  74. FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
  75. FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
  76. fi
  • config.sh在相关变量没有设置的前提下,初始化了FLINK_TM_HEAP、FLINK_TM_MEM_MANAGED_SIZE、FLINK_TM_MEM_MANAGED_FRACTION、FLINK_TM_OFFHEAP、FLINK_TM_MEM_PRE_ALLOCATE、FLINK_TM_NET_BUF_FRACTION等变量

taskmanager.sh

flink-release-1.7.2/flink-dist/src/main/flink-bin/bin/taskmanager.sh

  1. #!/usr/bin/env bash
  2. # Start/stop a Flink TaskManager.
  3. USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
  4. STARTSTOP=$1
  5. ARGS=("${@:2}")
  6. if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  7. echo $USAGE
  8. exit 1
  9. fi
  10. bin=`dirname "$0"`
  11. bin=`cd "$bin"; pwd`
  12. . "$bin"/config.sh
  13. ENTRYPOINT=taskexecutor
  14. if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
  15. # if memory allocation mode is lazy and no other JVM options are set,
  16. # set the 'Concurrent Mark Sweep GC'
  17. if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
  18. export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
  19. fi
  20. if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
  21. echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, please replace with key \`${KEY_TASKM_MEM_SIZE}\`"
  22. else
  23. flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
  24. FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
  25. fi
  26. if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
  27. echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
  28. exit 1
  29. fi
  30. if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
  31. TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
  32. # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
  33. TM_MAX_OFFHEAP_SIZE="8388607T"
  34. export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
  35. fi
  36. # Add TaskManager-specific JVM options
  37. export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
  38. # Startup parameters
  39. ARGS+=("--configDir" "${FLINK_CONF_DIR}")
  40. fi
  41. if [[ $STARTSTOP == "start-foreground" ]]; then
  42. exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
  43. else
  44. if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
  45. # Start a single TaskManager
  46. "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
  47. else
  48. # Example output from `numactl --show` on an AWS c4.8xlarge:
  49. # policy: default
  50. # preferred node: current
  51. # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
  52. # cpubind: 0 1
  53. # nodebind: 0 1
  54. # membind: 0 1
  55. read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
  56. for NODE_ID in "${NODE_LIST[@]:1}"; do
  57. # Start a TaskManager for each NUMA node
  58. numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
  59. done
  60. fi
  61. fi
  • taskmanager.sh首先调用config.sh初始化相关变量,之后计算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后调用flink-console.sh启动相关类
  • 如果FLINK_TM_MEM_PRE_ALLOCATE为false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都没有设置,则追加-XX:+UseG1GC到JVM_ARGS;之后读取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0则通过calculateTaskManagerHeapSizeMB计算TM_HEAP_SIZE,然后以TM_HEAP_SIZE设置xms及Xmx,以TM_MAX_OFFHEAP_SIZE设置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM则会追加到FLINK_ENV_JAVA_OPTS
  • calculateTaskManagerHeapSizeMB在config.sh中有定义,另外其对应的java代码在TaskManagerServices.calculateHeapSizeMB

TaskManagerServices

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

  1. public class TaskManagerServices {
  2. //......
  3. /**
  4. * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
  5. * based on the total memory to use and the given configuration parameters.
  6. *
  7. * @param totalJavaMemorySizeMB
  8. * overall available memory to use (heap and off-heap)
  9. * @param config
  10. * configuration object
  11. *
  12. * @return heap memory to use (in megabytes)
  13. */
  14. public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
  15. Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
  16. // subtract the Java memory used for network buffers (always off-heap)
  17. final long networkBufMB =
  18. calculateNetworkBufferMemory(
  19. totalJavaMemorySizeMB << 20, // megabytes to bytes
  20. config) >> 20; // bytes to megabytes
  21. final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
  22. // split the available Java memory between heap and off-heap
  23. final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
  24. final long heapSizeMB;
  25. if (useOffHeap) {
  26. long offHeapSize;
  27. String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
  28. if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
  29. try {
  30. offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
  31. } catch (IllegalArgumentException e) {
  32. throw new IllegalConfigurationException(
  33. "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
  34. }
  35. } else {
  36. offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
  37. }
  38. if (offHeapSize <= 0) {
  39. // calculate off-heap section via fraction
  40. double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
  41. offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
  42. }
  43. TaskManagerServicesConfiguration
  44. .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
  45. TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
  46. "Managed memory size too large for " + networkBufMB +
  47. " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
  48. " MB JVM memory");
  49. heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
  50. } else {
  51. heapSizeMB = remainingJavaMemorySizeMB;
  52. }
  53. return heapSizeMB;
  54. }
  55. /**
  56. * Calculates the amount of memory used for network buffers based on the total memory to use and
  57. * the according configuration parameters.
  58. *
  59. * <p>The following configuration parameters are involved:
  60. * <ul>
  61. * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
  62. * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
  63. * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
  64. * <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
  65. * </ul>.
  66. *
  67. * @param totalJavaMemorySize
  68. * overall available memory to use (heap and off-heap, in bytes)
  69. * @param config
  70. * configuration object
  71. *
  72. * @return memory to use for network buffers (in bytes); at least one memory segment
  73. */
  74. @SuppressWarnings("deprecation")
  75. public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
  76. Preconditions.checkArgument(totalJavaMemorySize > 0);
  77. int segmentSize =
  78. checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
  79. final long networkBufBytes;
  80. if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
  81. // new configuration based on fractions of available memory with selectable min and max
  82. float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
  83. long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
  84. long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
  85. TaskManagerServicesConfiguration
  86. .checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);
  87. networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
  88. (long) (networkBufFraction * totalJavaMemorySize)));
  89. TaskManagerServicesConfiguration
  90. .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
  91. "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
  92. "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
  93. TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
  94. TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
  95. "Network buffer memory size too large: " + networkBufBytes + " >= " +
  96. totalJavaMemorySize + " (total JVM memory size)");
  97. TaskManagerServicesConfiguration
  98. .checkConfigParameter(networkBufBytes >= segmentSize,
  99. "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
  100. "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
  101. TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
  102. TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
  103. "Network buffer memory size too small: " + networkBufBytes + " < " +
  104. segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
  105. } else {
  106. // use old (deprecated) network buffers parameter
  107. int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
  108. networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;
  109. TaskManagerServicesConfiguration.checkNetworkConfigOld(numNetworkBuffers);
  110. TaskManagerServicesConfiguration
  111. .checkConfigParameter(networkBufBytes < totalJavaMemorySize,
  112. networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
  113. "Network buffer memory size too large: " + networkBufBytes + " >= " +
  114. totalJavaMemorySize + " (total JVM memory size)");
  115. TaskManagerServicesConfiguration
  116. .checkConfigParameter(networkBufBytes >= segmentSize,
  117. networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
  118. "Network buffer memory size too small: " + networkBufBytes + " < " +
  119. segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
  120. }
  121. return networkBufBytes;
  122. }
  123. //......
  124. }
  • FLINK_TM_HEAP设置的是taskmanager的memory(heap及offHeap)大小,而network buffers总是使用offHeap,因而这里首先要从FLINK_TM_HEAP扣减掉这部分offHeap然后重新计算Xms及Xmx
  • calculateHeapSizeMB先调用calculateNetworkBufferMemory计算networkBufMB,然后从totalJavaMemorySizeMB扣减掉networkBufMB得到remainingJavaMemorySizeMB
  • 之后读取taskmanager.memory.off-heap设置,默认为false,则直接以remainingJavaMemorySizeMB返回;如果为true,则需要计算offHeapSize的值,然后从remainingJavaMemorySizeMB扣减offHeapSize再返回

小结

  • flink-conf.yaml提供了taskmanager.heap.size来设置taskmanager的memory(heap及offHeap)大小;提供了taskmanager.memory相关配置(taskmanager.memory.fraction、taskmanager.memory.off-heap、taskmanager.memory.preallocate、taskmanager.memory.segment-size、taskmanager.memory.size)用于设置memory;提供了taskmanager.network.memory相关配置(taskmanager.network.detailed-metrics、taskmanager.network.memory.buffers-per-channel、taskmanager.network.memory.floating-buffers-per-gate、taskmanager.network.memory.fraction、taskmanager.network.memory.max、taskmanager.network.memory.min)用于设置taskmanager的network stack的内存
  • taskmanager.sh首先调用config.sh初始化相关变量,之后计算并export了JVM_ARGS及FLINK_ENV_JAVA_OPTS,最后调用flink-console.sh启动相关类;如果FLINK_TM_MEM_PRE_ALLOCATE为false且FLINK_ENV_JAVA_OPTS及FLINK_ENV_JAVA_OPTS_TM都没有设置,则追加-XX:+UseG1GC到JVM_ARGS;之后读取FLINK_TM_HEAP到FLINK_TM_HEAP_MB;如果FLINK_TM_HEAP_MB大于0则通过calculateTaskManagerHeapSizeMB计算TM_HEAP_SIZE,然后以TM_HEAP_SIZE设置xms及Xmx,以TM_MAX_OFFHEAP_SIZE设置MaxDirectMemorySize,追加到JVM_ARGS中;而FLINK_ENV_JAVA_OPTS_TM则会追加到FLINK_ENV_JAVA_OPTS;calculateTaskManagerHeapSizeMB在config.sh中有定义,另外其对应的java代码在TaskManagerServices.calculateHeapSizeMB
  • FLINK_TM_HEAP设置的是taskmanager的memory(heap及offHeap)大小,而network buffers总是使用offHeap,因而这里首先要从FLINK_TM_HEAP扣减掉这部分offHeap然后重新计算Xms及Xmx;calculateHeapSizeMB先调用calculateNetworkBufferMemory计算networkBufMB,然后从totalJavaMemorySizeMB扣减掉networkBufMB得到remainingJavaMemorySizeMB;之后读取taskmanager.memory.off-heap设置,默认为false,则直接以remainingJavaMemorySizeMB返回;如果为true,则需要计算offHeapSize的值,然后从remainingJavaMemorySizeMB扣减offHeapSize再返回
由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为taskmanager.sh在FLINK_TM_HEAP_MB大于0的时候,则使用该值计算TM_HEAP_SIZE设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_TM_HEAP_MB则取决于FLINK_TM_HEAP或者taskmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.taskmanager;因而要配置taskmanager的memory( heap及offHeap)大小,可以指定FLINK_TM_HEAP环境变量(比如FLINK_TM_HEAP=512m),或者在flink-conf.yaml中指定taskmanager.heap.size;而最终的Xms及Xmx则是FLINK_TM_HEAP扣减掉offHeap而来,确定使用offHeap为network buffers,其余的看是否开启taskmanager.memory.off-heap,默认为false

doc

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/997445
推荐阅读
相关标签
  

闽ICP备14008679号