序
本文主要研究一下flink TaskManager的managed memory
TaskManagerOptions
flink-core-1.7.2-sources.jar!/org/apache/flink/configuration/TaskManagerOptions.java
- @PublicEvolving
- public class TaskManagerOptions {
- //......
-
- /**
- * JVM heap size for the TaskManagers with memory size.
- */
- @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
- public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
- key("taskmanager.heap.size")
- .defaultValue("1024m")
- .withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
- " the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
- " YARN container, minus a certain tolerance value.");
-
- /**
- * Amount of memory to be allocated by the task manager's memory manager. If not
- * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
- */
- public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
- key("taskmanager.memory.size")
- .defaultValue("0")
- .withDescription("Amount of memory to be allocated by the task manager's memory manager." +
- " If not set, a relative fraction will be allocated.");
-
- /**
- * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
- * not set.
- */
- public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
- key("taskmanager.memory.fraction")
- .defaultValue(0.7f)
- .withDescription("The relative amount of memory (after subtracting the amount of memory used by network" +
- " buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results." +
- " For example, a value of `0.8` means that a task manager reserves 80% of its memory" +
- " for internal data buffers, leaving 20% of free memory for the task manager's heap for objects" +
- " created by user-defined functions. This parameter is only evaluated, if " + MANAGED_MEMORY_SIZE.key() +
- " is not set.");
-
- /**
- * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
- * as well as the network buffers.
- **/
- public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
- key("taskmanager.memory.off-heap")
- .defaultValue(false)
- .withDescription("Memory allocation method (JVM heap or off-heap), used for managed memory of the" +
- " TaskManager as well as the network buffers.");
-
- /**
- * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
- */
- public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
- key("taskmanager.memory.preallocate")
- .defaultValue(false)
- .withDescription("Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.");
-
- //......
- }
- taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(
主要用于sorting,hashing及caching
),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory
TaskManagerServices.calculateHeapSizeMB
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
- public class TaskManagerServices {
- //......
-
- /**
- * Calculates the amount of heap memory to use (to set via <tt>-Xmx</tt> and <tt>-Xms</tt>)
- * based on the total memory to use and the given configuration parameters.
- *
- * @param totalJavaMemorySizeMB
- * overall available memory to use (heap and off-heap)
- * @param config
- * configuration object
- *
- * @return heap memory to use (in megabytes)
- */
- public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
- Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
-
- // subtract the Java memory used for network buffers (always off-heap)
- final long networkBufMB =
- calculateNetworkBufferMemory(
- totalJavaMemorySizeMB << 20, // megabytes to bytes
- config) >> 20; // bytes to megabytes
- final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
-
- // split the available Java memory between heap and off-heap
-
- final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
-
- final long heapSizeMB;
- if (useOffHeap) {
-
- long offHeapSize;
- String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
- if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
- try {
- offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
- } catch (IllegalArgumentException e) {
- throw new IllegalConfigurationException(
- "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
- }
- } else {
- offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
- }
-
- if (offHeapSize <= 0) {
- // calculate off-heap section via fraction
- double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
- offHeapSize = (long) (fraction * remainingJavaMemorySizeMB);
- }
-
- TaskManagerServicesConfiguration
- .checkConfigParameter(offHeapSize < remainingJavaMemorySizeMB, offHeapSize,
- TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
- "Managed memory size too large for " + networkBufMB +
- " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
- " MB JVM memory");
-
- heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
- } else {
- heapSizeMB = remainingJavaMemorySizeMB;
- }
-
- return heapSizeMB;
- }
-
- //......
- }
- taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7
- 如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize
- 如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize
TaskManagerServices.createMemoryManager
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
- public class TaskManagerServices {
- //......
-
- /**
- * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}.
- *
- * @param taskManagerServicesConfiguration to create the memory manager from
- * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
- * @param maxJvmHeapMemory the maximum JVM heap size
- * @return Memory manager
- * @throws Exception
- */
- private static MemoryManager createMemoryManager(
- TaskManagerServicesConfiguration taskManagerServicesConfiguration,
- long freeHeapMemoryWithDefrag,
- long maxJvmHeapMemory) throws Exception {
- // computing the amount of memory to use depends on how much memory is available
- // it strictly needs to happen AFTER the network stack has been initialized
-
- // check if a value has been configured
- long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
-
- MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
-
- final long memorySize;
-
- boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
-
- if (configuredMemory > 0) {
- if (preAllocateMemory) {
- LOG.info("Using {} MB for managed memory." , configuredMemory);
- } else {
- LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory);
- }
- memorySize = configuredMemory << 20; // megabytes to bytes
- } else {
- // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
- float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
-
- if (memType == MemoryType.HEAP) {
- // network buffers allocated off-heap -> use memoryFraction of the available heap:
- long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
- if (preAllocateMemory) {
- LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
- memoryFraction , relativeMemSize >> 20);
- } else {
- LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " +
- "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20);
- }
- memorySize = relativeMemSize;
- } else if (memType == MemoryType.OFF_HEAP) {
- // The maximum heap memory has been adjusted according to the fraction (see
- // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
- // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
- // directMemorySize = jvmTotalNoNet * memoryFraction
- long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
- if (preAllocateMemory) {
- LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
- memoryFraction, directMemorySize >> 20);
- } else {
- LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," +
- " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20);
- }
- memorySize = directMemorySize;
- } else {
- throw new RuntimeException("No supported memory type detected.");
- }
- }
-
- // now start the memory manager
- final MemoryManager memoryManager;
- try {
- memoryManager = new MemoryManager(
- memorySize,
- taskManagerServicesConfiguration.getNumberOfSlots(),
- taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(),
- memType,
- preAllocateMemory);
- } catch (OutOfMemoryError e) {
- if (memType == MemoryType.HEAP) {
- throw new Exception("OutOfMemory error (" + e.getMessage() +
- ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e);
- } else if (memType == MemoryType.OFF_HEAP) {
- throw new Exception("OutOfMemory error (" + e.getMessage() +
- ") while allocating the TaskManager off-heap memory (" + memorySize +
- " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
- } else {
- throw e;
- }
- }
- return memoryManager;
- }
-
- //......
- }
- TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager
- 当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction)
- 当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
TaskManagerServicesConfiguration
flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
- public class TaskManagerServicesConfiguration {
- //......
-
- /**
- * Utility method to extract TaskManager config parameters from the configuration and to
- * sanity check them.
- *
- * @param configuration The configuration.
- * @param remoteAddress identifying the IP address under which the TaskManager will be accessible
- * @param localCommunication True, to skip initializing the network stack.
- * Use only in cases where only one task manager runs.
- * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc.
- */
- public static TaskManagerServicesConfiguration fromConfiguration(
- Configuration configuration,
- InetAddress remoteAddress,
- boolean localCommunication) throws Exception {
-
- // we need this because many configs have been written with a "-1" entry
- int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- if (slots == -1) {
- slots = 1;
- }
-
- final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);
- String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
-
- if (localStateRootDir.length == 0) {
- // default to temp dirs.
- localStateRootDir = tmpDirs;
- }
-
- boolean localRecoveryMode = configuration.getBoolean(
- CheckpointingOptions.LOCAL_RECOVERY.key(),
- CheckpointingOptions.LOCAL_RECOVERY.defaultValue());
-
- final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
- configuration,
- localCommunication,
- remoteAddress,
- slots);
-
- final QueryableStateConfiguration queryableStateConfig =
- parseQueryableStateConfiguration(configuration);
-
- // extract memory settings
- long configuredMemory;
- String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
- if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
- try {
- configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
- } catch (IllegalArgumentException e) {
- throw new IllegalConfigurationException(
- "Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
- }
- } else {
- configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
- }
-
- checkConfigParameter(
- configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||
- configuredMemory > 0, configuredMemory,
- TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
- "MemoryManager needs at least one MB of memory. " +
- "If you leave this config parameter empty, the system automatically " +
- "pick a fraction of the available memory.");
-
- // check whether we use heap or off-heap memory
- final MemoryType memType;
- if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
- memType = MemoryType.OFF_HEAP;
- } else {
- memType = MemoryType.HEAP;
- }
-
- boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
-
- float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
- checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
- TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
- "MemoryManager fraction of the free memory must be between 0.0 and 1.0");
-
- long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis();
-
- return new TaskManagerServicesConfiguration(
- remoteAddress,
- tmpDirs,
- localStateRootDir,
- localRecoveryMode,
- networkConfig,
- queryableStateConfig,
- slots,
- configuredMemory,
- memType,
- preAllocateMemory,
- memoryFraction,
- timerServiceShutdownTimeout,
- ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
- }
-
- //......
- }
- TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP
小结
- TaskManager的managed memory分类heap及offHeap两种类型;taskmanager.memory.size设置的是由task manager memory manager管理的内存大小(
主要用于sorting,hashing及caching
),默认为0;taskmanager.heap.size设置的是taskmanager的heap及offHeap的memory;taskmanager.memory.size值小于等于0的话,则会根据taskmanager.memory.fraction配置来分配,默认为0.7;如果开启了taskmanager.memory.off-heap,则taskmanager.memory.fraction * (taskmanager.heap.size - networkBufMB)得出的值作为task manager memory manager管理的offHeapSize;如果开启了taskmanager.memory.off-heap,则taskmanager的Xmx值为taskmanager.heap.size - networkBufMB - offHeapSize - TaskManagerServices提供了私有静态方法createMemoryManager用于根据配置创建MemoryManager;这里根据MemoryType来重新计算memorySize,然后传递给MemoryManager的构造器,创建MemoryManager;当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.HEAP时,其memorySize为relativeMemSize,relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);当memType为MemoryType.OFF_HEAP时,其memorySize为directMemorySize,directMemorySize = jvmTotalNoNet * memoryFraction,而maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction),因而directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction)
- TaskManagerServicesConfiguration提供了一个静态方法fromConfiguration,用于从Configuration创建TaskManagerServicesConfiguration;其中memType是依据taskmanager.memory.off-heap的配置来,如果为true则为MemoryType.OFF_HEAP,否则为MemoryType.HEAP