赞
踩
我们使用如下的参数提交了Flink on YARN作业(per-job模式)。
- /opt/flink-1.9.0/bin/flink run \
- --detached \
- --jobmanager yarn-cluster \
- --yarnname "x.y.z" \
- --yarnjobManagerMemory 2048 \
- --yarntaskManagerMemory 4096 \
- --yarnslots 2 \
- --parallelism 20 \
- --class x.y.z \
- xyz-1.0.jar
该作业启动了10个TaskManager,并正常运行。来到该任务的Web界面,随便打开一个TaskManager页面,看看它的内存情况。
可见,虽然我们在参数中设置了TaskManager的内存为4GB大,但是上图显示的JVM堆大小只有2.47GB,另外还有一项“Flink Managed Memory”为1.78GB。在用VisualVM监控YarnTaskExecutorRunner时,会发现其JVM内存参数被如下设置:
显然Xmx+MaxDirectMemorySize才是我们在启动参数中设定的TM内存大小(4GB)。那么为什么会这样设置?“Flink Managed Memory”又是什么鬼?下面就来弄懂这些问题。
如下图所示。
为了减少object overhead,Flink主要采用序列化的方式存储各种对象。序列化存储的最小单位叫做MemorySegment,底层为字节数组,大小由taskmanager.memory.segment-size参数指定,默认32KB大。下面分别介绍各块内存:
- # 网络缓存占TM内存的默认比例,默认0.1
- taskmanager.network.memory.fraction: 0.15
- # 网络缓存的最小值和最大值 ,默认64MB和1GB
- taskmanager.network.memory.min: 128mb
- taskmanager.network.memory.max: 1gb
- # 堆内托管内存占TM堆内内存的比例,默认0.7
- taskmanager.memory.fraction: 0.7
- # 是否允许分配堆外托管内存,默认不允许
- taskmanager.memory.off-heap: false
由此也可见,Flink的内存管理不像Spark一样区分Storage和Execution内存,而是直接合二为一,更加灵活。
如果我们想知道文章开头的问题中各块内存的大小是怎么来的,最好的办法自然是去读源码。下面以Flink 1.9.0源码为例来探索。
YARN per-job集群的启动入口位于o.a.f.yarn.YarnClusterDescriptor类中。
- public ClusterClient<ApplicationId> deployJobCluster(
- ClusterSpecification clusterSpecification,
- JobGraph jobGraph,
- boolean detached) throws ClusterDeploymentException {
-
- // this is required because the slots are allocated lazily
- jobGraph.setAllowQueuedScheduling(true);
-
- try {
- return deployInternal(
- clusterSpecification,
- "Flink per-job cluster",
- getYarnJobClusterEntrypoint(),
- jobGraph,
- detached);
- } catch (Exception e) {
- throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
- }
- }
其中,ClusterSpecification对象持有该集群的4个基本参数:JobManager内存大小、TaskManager内存大小、TaskManager数量、每个TaskManager的slot数。而deployInternal()方法在开头调用了o.a.f.yarn.AbstractYarnClusterDescriptor抽象类的validateClusterSpecification()方法,用于校验ClusterSpecification是否合法。
- private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
- try {
- final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
- // We do the validation by calling the calculation methods here
- // Internally these methods will check whether the cluster can be started with the provided
- // ClusterSpecification and the configured memory requirements
- final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
- TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
- } catch (IllegalArgumentException iae) {
- throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
- "cluster specification. Please increase the memory of the cluster.", iae);
- }
- }
ClusterSpecification.getTaskManagerMemoryMB()方法返回的就是-ytm/--yarntaskManagerMemory参数设定的内存,最终反映在Flink代码中都是taskmanager.heap.size配置项的值。
接下来首先调用ContaineredTaskManagerParameters.calculateCutoffMB()方法,它负责计算一个承载TM的YARN Container需要预留多少内存给TM之外的逻辑来使用。
- public static long calculateCutoffMB(Configuration config, long containerMemoryMB) {
- Preconditions.checkArgument(containerMemoryMB > 0);
-
- // (1) check cutoff ratio
- final float memoryCutoffRatio = config.getFloat(
- ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
- if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
- throw new IllegalArgumentException("The configuration value '"
- + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
- + memoryCutoffRatio);
- }
-
- // (2) check min cutoff value
- final int minCutoff = config.getInteger(
- ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
- if (minCutoff >= containerMemoryMB) {
- throw new IllegalArgumentException("The configuration value '"
- + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
- + "' is larger than the total container memory " + containerMemoryMB);
- }
-
- // (3) check between heap and off-heap
- long cutoff = (long) (containerMemoryMB * memoryCutoffRatio);
- if (cutoff < minCutoff) {
- cutoff = minCutoff;
- }
- return cutoff;
- }
该方法的执行流程如下:
由此可见,在Flink on YARN时,我们设定的TM内存实际上是Container的内存。也就是说,一个TM能利用的总内存(包含堆内和堆外)是:
tm_total_memory = taskmanager.heap.size - max[containerized.heap-cutoff-min, taskmanager.heap.size * containerized.heap-cutoff-ratio]
用文章开头给的参数实际计算一下:
tm_total_memory = 4096 - max[600, 4096 * 0.25] = 3072
接下来看TaskManagerServices.calculateHeapSizeMB()方法。
- public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
- Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
-
- // all values below here are in bytes
-
- final long totalProcessMemory = megabytesToBytes(totalJavaMemorySizeMB);
- final long networkReservedMemory = getReservedNetworkMemory(config, totalProcessMemory);
- final long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;
-
- if (config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
- final long managedMemorySize = getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
-
- ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize,
- TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
- "Managed memory size too large for " + (networkReservedMemory >> 20) +
- " MB network buffer memory and a total of " + totalJavaMemorySizeMB +
- " MB JVM memory");
-
- return bytesToMegabytes(heapAndManagedMemory - managedMemorySize);
- }
- else {
- return bytesToMegabytes(heapAndManagedMemory);
- }
- }
为了简化问题及符合我们的实际应用,就不考虑开启堆外托管内存的情况了。这里涉及到了计算Network buffer大小的方法NettyShuffleEnvironmentConfiguration.calculateNetworkBufferMemory()。
- public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
- final int segmentSize = ConfigurationParserUtils.getPageSize(config);
-
- final long networkBufBytes;
- if (hasNewNetworkConfig(config)) {
- float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
- long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
- networkBufBytes = calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize);
- } else {
- // use old (deprecated) network buffers parameter
- // 旧版逻辑,不再看了
- }
-
- return networkBufBytes;
- }
-
- private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {
- float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
- long networkBufMin = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
- long networkBufMax = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
-
- int pageSize = ConfigurationParserUtils.getPageSize(config);
- checkNewNetworkConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
- long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, networkBufSize));
-
- ConfigurationParserUtils.checkConfigParameter(/*...*/);
-
- return networkBufBytes;
- }
由此可见,网络缓存的大小这样确定:
network_buffer_memory = min[taskmanager.network.memory.max, max(askmanager.network.memory.min, tm_total_memory * taskmanager.network.memory.fraction)]
代入数值:
network_buffer_memory = min[1024, max(128, 3072 * 0.15)] = 460.8
也就是说,TM真正使用的堆内内存为:
tm_heap_memory = tm_total_memory - network_buffer_memory = 3072 - 460.8 ≈ 2611
这完全符合VisualVM截图中的-Xms/-Xmx设定。
同理,可以看一下TaskManager UI中的网络缓存MemorySegment计数。
通过计算得知,网络缓存的实际值与上面算出来的network_buffer_memory值是非常接近的。
那么堆内托管内存的值是怎么计算出来的呢?前面提到了托管内存由MemoryManager管理,来看看TaskManagerServices.createMemoryManager()方法,它用设定好的参数来初始化一个MemoryManager。
- private static MemoryManager createMemoryManager(
- TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception {
- long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
- MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
-
- final long memorySize;
-
- boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
-
- if (configuredMemory > 0) {
- if (preAllocateMemory) {
- LOG.info(/*...*/);
- } else {
- LOG.info(/*...*/);
- }
- memorySize = configuredMemory << 20; // megabytes to bytes
- } else {
- // similar to #calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig)
- float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
-
- if (memType == MemoryType.HEAP) {
- long freeHeapMemoryWithDefrag = taskManagerServicesConfiguration.getFreeHeapMemoryWithDefrag();
- // network buffers allocated off-heap -> use memoryFraction of the available heap:
- long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction);
- if (preAllocateMemory) {
- LOG.info(/*...*/);
- } else {
- LOG.info(/*...*/);
- }
- memorySize = relativeMemSize;
- } else if (memType == MemoryType.OFF_HEAP) {
- long maxJvmHeapMemory = taskManagerServicesConfiguration.getMaxJvmHeapMemory();
- // The maximum heap memory has been adjusted according to the fraction (see
- // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
- // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
- // directMemorySize = jvmTotalNoNet * memoryFraction
- long directMemorySize = (long) (maxJvmHeapMemory / (1.0 - memoryFraction) * memoryFraction);
- if (preAllocateMemory) {
- LOG.info(/*...*/);
- } else {
- LOG.info(/*...*/);
- }
- memorySize = directMemorySize;
- } else {
- throw new RuntimeException("No supported memory type detected.");
- }
- }
-
- // now start the memory manager
- final MemoryManager memoryManager;
- try {
- memoryManager = new MemoryManager(
- memorySize,
- taskManagerServicesConfiguration.getNumberOfSlots(),
- taskManagerServicesConfiguration.getPageSize(),
- memType,
- preAllocateMemory);
- } catch (OutOfMemoryError e) {
- // ...
- }
- return memoryManager;
- }
简要叙述一下流程:
一般来讲我们都不会简单粗暴地设置taskmanager.memory.size。所以:
flink_managed_memory = tm_heap_memory * taskmanager.memory.fraction = 2611 * 0.7 ≈ 1827
这就是TaskManager UI中显示的托管内存大小了。
晚安晚安。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。