当前位置:   article > 正文

聊聊flink TaskManager的managed memory

flink managed memory 100%

本文主要研究一下flink TaskManager的managed memory

TaskManagerOptions

flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java

  1. @PublicEvolving
  2. public class TaskManagerOptions {
  3. //......
  4. /**
  5. * JVM heap size for the TaskManagers with memory size.
  6. */
  7. @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
  8. public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
  9. key("taskmanager.heap.size")
  10. .defaultValue("1024m")
  11. .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
  12. " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
  13. " YARN container, minus a certain tolerance value.");
  14. /**
  15. * Amount of memory to be allocated by the task manager's memory manager. If not
  16. * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
  17. */
  18. public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
  19. key("taskmanager.memory.size")
  20. .defaultValue("0")
  21. .withDescription("Amount of memory to be allocated by the task manager's memory manager." +
  22. " If not set, a relative fraction will be allocated.");
  23. /**
  24. * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
  25. * not set.
  26. */
  27. public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
  28. key("taskmanager.memory.fraction")
  29. .defaultValue(0.7f)
  30. .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
  31. " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
  32. " For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
  33. " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
  34. " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
  35. " is not set.");
  36. /**
  37. * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
  38. * as well as the network buffers.
  39. **/
  40. public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
  41. key("taskmanager.memory.off-heap")
  42. .defaultValue(false)
  43. .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
  44. " TaskManager as well as the network buffers.");
  45. /**
  46. * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
  47. */
  48. public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
  49. key("taskmanager.memory.preallocate")
  50. .defaultValue(false)
  51. .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");
  52. //......
  53. }
  • taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory

TaskManagerServices.calculateHeapSizeMB

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

  1. public class TaskManagerServices {
  2. //......
  3. /**
  4. * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
  5. * based on the total memory to use and the given configuration parameters.
  6. *
  7. * @param totalJavaMemorySizeMB
  8. * overall available memory to use (heap and off-heap)
  9. * @param config
  10. * configuration object
  11. *
  12. * @return heap memory to use (in megabytes)
  13. */
  14. public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
  15. Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
  16. // subtract the Java memory used for network buffers (always off-heap)
  17. final long networkBufMB =
  18. calculateNetworkBufferMemory(
  19. totalJavaMemorySizeMB << 20, // megabytes to bytes
  20. config) >> 20; // bytes to megabytes
  21. final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
  22. // split the available Java memory between heap and off-heap
  23. final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
  24. final long heapSizeMB;
  25. if (useOffHeap) {
  26. long offHeapSize;
  27. String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
  28. if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
  29. try {
  30. offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
  31. } catch (IllegalArgumentException e) {
  32. throw new IllegalConfigurationException(
  33. "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
  34. }
  35. } else {
  36. offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
  37. }
  38. if (offHeapSize <= 0) {
  39. // calculate off-heap section via fraction
  40. double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
  41. offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
  42. }
  43. TaskManagerServicesConfiguration
  44. .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
  45. TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
  46. "Managed memory size too large for " + networkBufMB +
  47. " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
  48. " MB JVM memory");
  49. heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
  50. } else {
  51. heapSizeMB = remainingJavaMemorySizeMB;
  52. }
  53. return heapSizeMB;
  54. }
  55. //......
  56. }
  • taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
  • 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
  • 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize

TaskManagerServices.createMemoryManager

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

  1. public class TaskManagerServices {
  2. //......
  3. /**
  4. * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
  5. *
  6. * @param taskManagerServicesConfiguration to create the memory manager from
  7. * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
  8. * @param maxJvmHeapMemory the maximum JVM heap size
  9. * @return Memory manager
  10. * @throws Exception
  11. */
  12. private static MemoryManager createMemoryManager(
  13. TaskManagerServicesConfiguration taskManagerServicesConfiguration,
  14. long freeHeapMemoryWithDefrag,
  15. long maxJvmHeapMemory) throws Exception {
  16. // computing the amount of memory to use depends on how much memory is available
  17. // it strictly needs to happen AFTER the network stack has been initialized
  18. // check if a value has been configured
  19. long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
  20. MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
  21. final long memorySize;
  22. boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
  23. if (configuredMemory > 0) {
  24. if (preAllocateMemory) {
  25. LOG.info("Using {} MB for managed memory." , configuredMemory);
  26. } else {
  27. LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
  28. }
  29. memorySize = configuredMemory << 20; // megabytes to bytes
  30. } else {
  31. // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
  32. float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
  33. if (memType == MemoryType.HEAP) {
  34. // network buffers allocated off-heap -> use memoryFraction of the available heap:
  35. long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
  36. if (preAllocateMemory) {
  37. LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
  38. memoryFraction , relativeMemSize >> 20);
  39. } else {
  40. LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
  41. "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
  42. }
  43. memorySize = relativeMemSize;
  44. } else if (memType == MemoryType.OFF_HEAP) {
  45. // The maximum heap memory has been adjusted according to the fraction (see
  46. // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
  47. // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
  48. // directMemorySize = jvmTotalNoNet * memoryFraction
  49. long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
  50. if (preAllocateMemory) {
  51. LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
  52. memoryFraction, directMemorySize >> 20);
  53. } else {
  54. LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
  55. " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
  56. }
  57. memorySize = directMemorySize;
  58. } else {
  59. throw new RuntimeException("No supported memory type detected.");
  60. }
  61. }
  62. // now start the memory manager
  63. final MemoryManager memoryManager;
  64. try {
  65. memoryManager = new MemoryManager(
  66. memorySize,
  67. taskManagerServicesConfiguration.getNumberOfSlots(),
  68. taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
  69. memType,
  70. preAllocateMemory);
  71. } catch (OutOfMemoryError e) {
  72. if (memType == MemoryType.HEAP) {
  73. throw new Exception("OutOfMemory error (" + e.getMessage() +
  74. ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
  75. } else if (memType == MemoryType.OFF_HEAP) {
  76. throw new Exception("OutOfMemory error (" + e.getMessage() +
  77. ") while allocating the TaskManager off-heap memory (" + memorySize +
  78. " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
  79. } else {
  80. throw e;
  81. }
  82. }
  83. return memoryManager;
  84. }
  85. //......
  86. }
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
  • 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
  • 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)

TaskManagerServicesConfiguration

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

  1. public class TaskManagerServicesConfiguration {
  2. //......
  3. /**
  4. * Utility method to extract TaskManager config parameters from the configuration and to
  5. * sanity check them.
  6. *
  7. * @param configuration The configuration.
  8. * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
  9. * @param localCommunication True, to skip initializing the network stack.
  10. * Use only in cases where only one task manager runs.
  11. * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
  12. */
  13. public static TaskManagerServicesConfiguration fromConfiguration(
  14. Configuration configuration,
  15. InetAddress remoteAddress,
  16. boolean localCommunication) throws Exception {
  17. // we need this because many configs have been written with a "-1" entry
  18. int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
  19. if (slots == -1) {
  20. slots = 1;
  21. }
  22. final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
  23. String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
  24. if (localStateRootDir.length == 0) {
  25. // default to temp dirs.
  26. localStateRootDir = tmpDirs;
  27. }
  28. boolean localRecoveryMode = configuration.getBoolean(
  29. CheckpointingOptions.LOCAL_RECOVERY.key(),
  30. CheckpointingOptions.LOCAL_RECOVERY.defaultValue());
  31. final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
  32. configuration,
  33. localCommunication,
  34. remoteAddress,
  35. slots);
  36. final QueryableStateConfiguration queryableStateConfig =
  37. parseQueryableStateConfiguration(configuration);
  38. // extract memory settings
  39. long configuredMemory;
  40. String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
  41. if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
  42. try {
  43. configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
  44. } catch (IllegalArgumentException e) {
  45. throw new IllegalConfigurationException(
  46. "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
  47. }
  48. } else {
  49. configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
  50. }
  51. checkConfigParameter(
  52. configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||
  53. configuredMemory > 0, configuredMemory,
  54. TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
  55. "MemoryManager needs at least one MB of memory. " +
  56. "If you leave this config parameter empty, the system automatically " +
  57. "pick a fraction of the available memory.");
  58. // check whether we use heap or off-heap memory
  59. final MemoryType memType;
  60. if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
  61. memType = MemoryType.OFF_HEAP;
  62. } else {
  63. memType = MemoryType.HEAP;
  64. }
  65. boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
  66. float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
  67. checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
  68. TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
  69. "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
  70. long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
  71. return new TaskManagerServicesConfiguration(
  72. remoteAddress,
  73. tmpDirs,
  74. localStateRootDir,
  75. localRecoveryMode,
  76. networkConfig,
  77. queryableStateConfig,
  78. slots,
  79. configuredMemory,
  80. memType,
  81. preAllocateMemory,
  82. memoryFraction,
  83. timerServiceShutdownTimeout,
  84. ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
  85. }
  86. //......
  87. }
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

小结

  • TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(主要用于sorting,hashing及caching),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
  • TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
  • TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP

doc

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/696385
推荐阅读
相关标签
  

闽ICP备14008679号