序
本文主要研究一下flink JobManager的heap大小设置
JobManagerOptions
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java
- @PublicEvolving
- public class JobManagerOptions {
- //......
-
- /**
- * JVM heap size for the JobManager with memory size.
- */
- @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
- public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
- key("jobmanager.heap.size")
- .defaultValue("1024m")
- .withDescription("JVM heap size for the JobManager.");
-
- /**
- * JVM heap size (in megabytes) for the JobManager.
- * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
- */
- @Deprecated
- public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
- key("jobmanager.heap.mb")
- .defaultValue(1024)
- .withDescription("JVM heap size (in megabytes) for the JobManager.");
-
- //......
- }
- 复制代码
- jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃
ConfigurationUtils
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java
- public class ConfigurationUtils {
-
- private static final String[] EMPTY = new String[0];
-
- /**
- * Get job manager's heap memory. This method will check the new key
- * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
- * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
- *
- * @param configuration the configuration object
- * @return the memory size of job manager's heap memory.
- */
- public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
- if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
- return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
- } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
- return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
- } else {
- //use default value
- return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
- }
- }
-
- //......
- }
- 复制代码
- ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize
MemorySize
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
- @PublicEvolving
- public class MemorySize implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // ------------------------------------------------------------------------
-
- /** The memory size, in bytes. */
- private final long bytes;
-
- /**
- * Constructs a new MemorySize.
- *
- * @param bytes The size, in bytes. Must be zero or larger.
- */
- public MemorySize(long bytes) {
- checkArgument(bytes >= 0, "bytes must be >= 0");
- this.bytes = bytes;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the memory size in bytes.
- */
- public long getBytes() {
- return bytes;
- }
-
- /**
- * Gets the memory size in Kibibytes (= 1024 bytes).
- */
- public long getKibiBytes() {
- return bytes >> 10;
- }
-
- /**
- * Gets the memory size in Mebibytes (= 1024 Kibibytes).
- */
- public int getMebiBytes() {
- return (int) (bytes >> 20);
- }
-
- /**
- * Gets the memory size in Gibibytes (= 1024 Mebibytes).
- */
- public long getGibiBytes() {
- return bytes >> 30;
- }
-
- /**
- * Gets the memory size in Tebibytes (= 1024 Gibibytes).
- */
- public long getTebiBytes() {
- return bytes >> 40;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return (int) (bytes ^ (bytes >>> 32));
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);
- }
-
- @Override
- public String toString() {
- return bytes + " bytes";
- }
-
- // ------------------------------------------------------------------------
- // Parsing
- // ------------------------------------------------------------------------
-
- /**
- * Parses the given string as as MemorySize.
- *
- * @param text The string to parse
- * @return The parsed MemorySize
- *
- * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
- */
- public static MemorySize parse(String text) throws IllegalArgumentException {
- return new MemorySize(parseBytes(text));
- }
-
- /**
- * Parses the given string with a default unit.
- *
- * @param text The string to parse.
- * @param defaultUnit specify the default unit.
- * @return The parsed MemorySize.
- *
- * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
- */
- public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {
- if (!hasUnit(text)) {
- return parse(text + defaultUnit.getUnits()[0]);
- }
-
- return parse(text);
- }
-
- /**
- * Parses the given string as bytes.
- * The supported expressions are listed under {@link MemorySize}.
- *
- * @param text The string to parse
- * @return The parsed size, in bytes.
- *
- * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
- */
- public static long parseBytes(String text) throws IllegalArgumentException {
- checkNotNull(text, "text");
-
- final String trimmed = text.trim();
- checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
-
- final int len = trimmed.length();
- int pos = 0;
-
- char current;
- while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
- pos++;
- }
-
- final String number = trimmed.substring(0, pos);
- final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
-
- if (number.isEmpty()) {
- throw new NumberFormatException("text does not start with a number");
- }
-
- final long value;
- try {
- value = Long.parseLong(number); // this throws a NumberFormatException on overflow
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException("The value '" + number +
- "' cannot be re represented as 64bit number (numeric overflow).");
- }
-
- final long multiplier;
- if (unit.isEmpty()) {
- multiplier = 1L;
- }
- else {
- if (matchesAny(unit, BYTES)) {
- multiplier = 1L;
- }
- else if (matchesAny(unit, KILO_BYTES)) {
- multiplier = 1024L;
- }
- else if (matchesAny(unit, MEGA_BYTES)) {
- multiplier = 1024L * 1024L;
- }
- else if (matchesAny(unit, GIGA_BYTES)) {
- multiplier = 1024L * 1024L * 1024L;
- }
- else if (matchesAny(unit, TERA_BYTES)) {
- multiplier = 1024L * 1024L * 1024L * 1024L;
- }
- else {
- throw new IllegalArgumentException("Memory size unit '" + unit +
- "' does not match any of the recognized units: " + MemoryUnit.getAllUnits());
- }
- }
-
- final long result = value * multiplier;
-
- // check for overflow
- if (result / multiplier != value) {
- throw new IllegalArgumentException("The value '" + text +
- "' cannot be re represented as 64bit number of bytes (numeric overflow).");
- }
-
- return result;
- }
-
- private static boolean matchesAny(String str, MemoryUnit unit) {
- for (String s : unit.getUnits()) {
- if (s.equals(str)) {
- return true;
- }
- }
- return false;
- }
-
- //......
- }
- 复制代码
- MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
MemoryUnit
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java
- /**
- * Enum which defines memory unit, mostly used to parse value from configuration file.
- *
- * <p>To make larger values more compact, the common size suffixes are supported:
- *
- * <ul>
- * <li>q or 1b or 1bytes (bytes)
- * <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
- * <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
- * <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
- * <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
- * </ul>
- *
- */
- public enum MemoryUnit {
-
- BYTES(new String[] { "b", "bytes" }),
- KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),
- MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),
- GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),
- TERA_BYTES(new String[] { "t", "tb", "tebibytes" });
-
- private String[] units;
-
- MemoryUnit(String[] units) {
- this.units = units;
- }
-
- public String[] getUnits() {
- return units;
- }
-
- public static String getAllUnits() {
- return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());
- }
-
- public static boolean hasUnit(String text) {
- checkNotNull(text, "text");
-
- final String trimmed = text.trim();
- checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
-
- final int len = trimmed.length();
- int pos = 0;
-
- char current;
- while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
- pos++;
- }
-
- final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
-
- return unit.length() > 0;
- }
-
- private static String concatenateUnits(final String[]... allUnits) {
- final StringBuilder builder = new StringBuilder(128);
-
- for (String[] units : allUnits) {
- builder.append('(');
-
- for (String unit : units) {
- builder.append(unit);
- builder.append(" | ");
- }
-
- builder.setLength(builder.length() - 3);
- builder.append(") / ");
- }
-
- builder.setLength(builder.length() - 3);
- return builder.toString();
- }
-
- }
- 复制代码
- MemoryUnit枚举定义了BYTES、KILO_BYTES、MEGA_BYTES、GIGA_BYTES、TERA_BYTES;它有units属性,是一个string数组,用于指定每类单位的文本标识,最后匹配时都是转换为小写来匹配
FlinkYarnSessionCli
flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
- public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {
- //......
-
- private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
- if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
- LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
- }
-
- // TODO: The number of task manager should be deprecated soon
- final int numberTaskManagers;
-
- if (cmd.hasOption(container.getOpt())) {
- numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
- } else {
- numberTaskManagers = 1;
- }
-
- // JobManager Memory
- final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
-
- // Task Managers memory
- final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
-
- int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-
- return new ClusterSpecification.ClusterSpecificationBuilder()
- .setMasterMemoryMB(jobManagerMemoryMB)
- .setTaskManagerMemoryMB(taskManagerMemoryMB)
- .setNumberTaskManagers(numberTaskManagers)
- .setSlotsPerTaskManager(slotsPerTaskManager)
- .createClusterSpecification();
- }
-
- //......
- }
- 复制代码
- FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
config.sh
flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh
- //......
-
- DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
- DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
- DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
- DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
- DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
- DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
- DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
- DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
- DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
-
- //......
-
- # Define FLINK_JM_HEAP if it is not already set
- if [ -z "${FLINK_JM_HEAP}" ]; then
- FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
- fi
-
- # Try read old config key, if new key not exists
- if [ "${FLINK_JM_HEAP}" == 0 ]; then
- FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
- fi
-
- //......
-
- if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
- FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
-
- # Remove leading and ending double quotes (if present) of value
- FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
- fi
-
- if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
- FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
- # Remove leading and ending double quotes (if present) of value
- FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
- fi
-
- //......
-
- # Arguments for the JVM. Used for job and task manager JVMs.
- # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
- # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
- if [ -z "${JVM_ARGS}" ]; then
- JVM_ARGS=""
- fi
-
- //......
- 复制代码
- config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB
- 如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空
- JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
jobmanager.sh
flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh
- #!/usr/bin/env bash
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
-
- # Start/stop a Flink JobManager.
- USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
-
- STARTSTOP=$1
- HOST=$2 # optional when starting multiple instances
- WEBUIPORT=$3 # optional when starting multiple instances
-
- if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
- echo $USAGE
- exit 1
- fi
-
- bin=`dirname "$0"`
- bin=`cd "$bin"; pwd`
-
- . "$bin"/config.sh
-
- ENTRYPOINT=standalonesession
-
- if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
- if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
- echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
- else
- flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
- FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
- fi
-
- if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
- echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
- exit 1
- fi
-
- if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
- export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
- fi
-
- # Add JobManager-specific JVM options
- export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
-
- # Startup parameters
- args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
- if [ ! -z $HOST ]; then
- args+=("--host")
- args+=("${HOST}")
- fi
-
- if [ ! -z $WEBUIPORT ]; then
- args+=("--webui-port")
- args+=("${WEBUIPORT}")
- fi
- fi
-
- if [[ $STARTSTOP == "start-foreground" ]]; then
- exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
- else
- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
- fi
- 复制代码
- jobmanager.sh首先调用config.sh来初始化相关变量(
FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
) - 如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量;如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;然后将FLINK_ENV_JAVA_OPTS_JM(
依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts
)中 - jobmanager.sh最后调用flink-console.sh来启动相关类
flink-console.sh
flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh
- #!/usr/bin/env bash
- ################################################################################
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- ################################################################################
-
- # Start a Flink service as a console application. Must be stopped with Ctrl-C
- # or with SIGTERM by kill or the controlling process.
- USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
-
- SERVICE=$1
- ARGS=("${@:2}") # get remaining arguments as array
-
- bin=`dirname "$0"`
- bin=`cd "$bin"; pwd`
-
- . "$bin"/config.sh
-
- case $SERVICE in
- (taskexecutor)
- CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
- ;;
-
- (historyserver)
- CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
- ;;
-
- (zookeeper)
- CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
- ;;
-
- (standalonesession)
- CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
- ;;
-
- (standalonejob)
- CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
- ;;
-
- (*)
- echo "Unknown service '${SERVICE}'. $USAGE."
- exit 1
- ;;
- esac
-
- FLINK_TM_CLASSPATH=`constructFlinkClassPath`
-
- log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
-
- JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
- # Only set JVM 8 arguments if we have correctly extracted the version
- if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
- if [ "$JAVA_VERSION" -lt 18 ]; then
- JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
- fi
- fi
-
- echo "Starting $SERVICE as a console application on host $HOSTNAME."
- exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
- 复制代码
- flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN
小结
- jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize;MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
- FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
- config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB;如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空;JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
- jobmanager.sh首先调用config.sh来初始化相关变量(
FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS
);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置
)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts
)中;jobmanager.sh最后调用flink-console.sh来启动相关类 - flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN
由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,可以指定FLINK_JM_HEAP环境变量(
比如FLINK_JM_HEAP=512m
),或者在flink-conf.yaml中指定jobmanager.heap.size