赞
踩
本flink版本基于1.12
在做flink开发过程中会遇到内存参数设置问题,如果设置过小会造成flink任务无法提交,那到底我们应该把flink的taskmanager设置多大才不会报错呢?我们先看下FlinkUI,taskmanager组成如图所示:
图一:flink taskmanager内存模型
JVM 特定内存: JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head1)JVM metaspace:JVM 元空间 :taskmanager.memory.jvm-metaspace.size,默认 256mb2)JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存。taskmanager.memory.jvm-overhead.fraction,默认 0.1taskmanager.memory.jvm-overhead.min,默认 192mbtaskmanager.memory.jvm-overhead.max,默认 1gb总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小⚫ 框架内存: Flink 框架,即 TaskManager 本身所占用的内存, 不计入 Slot 的资源中。Framework Heap( 堆内):taskmanager.memory.framework.heap.size,默认 128MBFramework Off-Heap( 堆外):taskmanager.memory.framework.off-heap.size,默认 128MB⚫ Task 内存: Task 执行用户代码时所使用的内存Task Heap( 堆内):taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。Task Off-Heap( 堆外):taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存⚫ Network( 网络内存):网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区堆外:taskmanager.memory.network.fraction,默认 0.1taskmanager.memory.network.min,默认 64mbtaskmanager.memory.network.max,默认 1gbFlink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max大小⚫ Managed Memory( 托管内存):用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果。堆外:taskmanager.memory.managed.fraction,默认 0.4taskmanager.memory.managed.size,默认 none如果 size 没指定,则等于 Flink 内存*fraction
我们先看一个例子,我设置taskmanager.memory.process.size=800mb,就是taskmanager进程总内存为800M,如果不做其他配置的情况下是报错的
flink run -t yarn-per-job -D yarn.application.queue=test -Dtaskmanager.memory.process.size=800mb -c com.test.Test /data/shan/flink-example-1.0-SNAPSHOT.jar
报错如下,从错误上我们得知分配的taskmanager进程内存不足。
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: TaskManager memory configuration failed: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (140.800mb (147639503 bytes)) and Network Memory (64.000mb (67108864 bytes)) exceed configured Total Flink Memory (352.000mb (369098752 bytes)).
那我们到底需要设置多少的内存才行呢?我们分析下上述错误的内存,首先Total Flink Memory为352M,如图一所示,由taskmanager进程内存800M减去 JVM metaspace再减去JVM over-head 执行开销,计算为 800M - 256M - 192M = 352M
Total Flink Memory 中内存为Framework Heap(128M) + Framework Off-Heap(128M) + Off-Heap Memory(0M) + Network Memory(64M) + (Total Flink Memory(352M) * 0.4 = 140.800M) > 352M的 ,所以最后报错
所以我们分配内存就有如下公式 ,假设taskmanager.memory.process.size为x
(x- JVM metaspace【256M】 - JVM over-head【192M】)* 0.4 +Framework Heap(128M) + Framework Off-Heap(128M) + Off-Heap Memory(0M) + Network Memory(64M) <(x- JVM metaspace【256M】 - JVM over-head【192M】)
最后算出x为981,最少taskmanager.memory.process.size设置为1000M才行 。
flink run -t yarn-per-job -D yarn.application.queue=test -Dtaskmanager.memory.process.size=1000mb -c com.test.Test /data/shan/flink-example-1.0-SNAPSHOT.jar
或者如果不需要使用RocksDB State Backend通过参数设置taskmanager.memory.managed.size=0,最后taskmanager.memory.process.size为800M也行
flink run -t yarn-per-job -D yarn.application.queue=test -Dtaskmanager.memory.process.size=800m -Dtaskmanager.memory.managed.size=0 -c com.test.Test /data/shan/flink-example-1.0-SNAPSHOT.jar
在实际开发中我们不建议taskmanager设置太小,通过下面的参数设置,使用Generic CLI 模式提交,通过设置下面的这个参数
jobmanager.memory.process.size:TaskExecutors 的总进程内存大小
通常-Dtaskmanager.memory.process.size=4096mb (2G到8G就够了)
如果想更精确,先预估一个,去flinkUI上看taskmanager的使用率,如果数据量不大的时候使用率达到90%左右,就需要调大taskmanager,需要压测,模拟(预估)高峰的数据,看高峰的时候内存使用率,最后设置合适的内存
最后的提交参数为
bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=4096mb \ 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个 TM 的 slot 数 -c com.test.flink.Uv \
/opt/module/flink-1.12.1/myjar/flink-test-1.0-SNAPSHOT.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。