当前位置:   article > 正文

聊聊flink JobManager的heap大小设置

flink jobmanager.heap.size

本文主要研究一下flink JobManager的heap大小设置

JobManagerOptions

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java

  1. @PublicEvolving
  2. public class JobManagerOptions {
  3. //......
  4. /**
  5. * JVM heap size for the JobManager with memory size.
  6. */
  7. @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
  8. public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
  9. key("jobmanager.heap.size")
  10. .defaultValue("1024m")
  11. .withDescription("JVM heap size for the JobManager.");
  12. /**
  13. * JVM heap size (in megabytes) for the JobManager.
  14. * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
  15. */
  16. @Deprecated
  17. public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
  18. key("jobmanager.heap.mb")
  19. .defaultValue(1024)
  20. .withDescription("JVM heap size (in megabytes) for the JobManager.");
  21. //......
  22. }
  23. 复制代码
  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃

ConfigurationUtils

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java

  1. public class ConfigurationUtils {
  2. private static final String[] EMPTY = new String[0];
  3. /**
  4. * Get job manager's heap memory. This method will check the new key
  5. * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
  6. * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
  7. *
  8. * @param configuration the configuration object
  9. * @return the memory size of job manager's heap memory.
  10. */
  11. public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
  12. if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
  13. return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
  14. } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
  15. return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
  16. } else {
  17. //use default value
  18. return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
  19. }
  20. }
  21. //......
  22. }
  23. 复制代码
  • ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize

MemorySize

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

  1. @PublicEvolving
  2. public class MemorySize implements java.io.Serializable {
  3. private static final long serialVersionUID = 1L;
  4. // ------------------------------------------------------------------------
  5. /** The memory size, in bytes. */
  6. private final long bytes;
  7. /**
  8. * Constructs a new MemorySize.
  9. *
  10. * @param bytes The size, in bytes. Must be zero or larger.
  11. */
  12. public MemorySize(long bytes) {
  13. checkArgument(bytes >= 0, "bytes must be >= 0");
  14. this.bytes = bytes;
  15. }
  16. // ------------------------------------------------------------------------
  17. /**
  18. * Gets the memory size in bytes.
  19. */
  20. public long getBytes() {
  21. return bytes;
  22. }
  23. /**
  24. * Gets the memory size in Kibibytes (= 1024 bytes).
  25. */
  26. public long getKibiBytes() {
  27. return bytes >> 10;
  28. }
  29. /**
  30. * Gets the memory size in Mebibytes (= 1024 Kibibytes).
  31. */
  32. public int getMebiBytes() {
  33. return (int) (bytes >> 20);
  34. }
  35. /**
  36. * Gets the memory size in Gibibytes (= 1024 Mebibytes).
  37. */
  38. public long getGibiBytes() {
  39. return bytes >> 30;
  40. }
  41. /**
  42. * Gets the memory size in Tebibytes (= 1024 Gibibytes).
  43. */
  44. public long getTebiBytes() {
  45. return bytes >> 40;
  46. }
  47. // ------------------------------------------------------------------------
  48. @Override
  49. public int hashCode() {
  50. return (int) (bytes ^ (bytes >>> 32));
  51. }
  52. @Override
  53. public boolean equals(Object obj) {
  54. return obj == this ||
  55. (obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);
  56. }
  57. @Override
  58. public String toString() {
  59. return bytes + " bytes";
  60. }
  61. // ------------------------------------------------------------------------
  62. // Parsing
  63. // ------------------------------------------------------------------------
  64. /**
  65. * Parses the given string as as MemorySize.
  66. *
  67. * @param text The string to parse
  68. * @return The parsed MemorySize
  69. *
  70. * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
  71. */
  72. public static MemorySize parse(String text) throws IllegalArgumentException {
  73. return new MemorySize(parseBytes(text));
  74. }
  75. /**
  76. * Parses the given string with a default unit.
  77. *
  78. * @param text The string to parse.
  79. * @param defaultUnit specify the default unit.
  80. * @return The parsed MemorySize.
  81. *
  82. * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
  83. */
  84. public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {
  85. if (!hasUnit(text)) {
  86. return parse(text + defaultUnit.getUnits()[0]);
  87. }
  88. return parse(text);
  89. }
  90. /**
  91. * Parses the given string as bytes.
  92. * The supported expressions are listed under {@link MemorySize}.
  93. *
  94. * @param text The string to parse
  95. * @return The parsed size, in bytes.
  96. *
  97. * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
  98. */
  99. public static long parseBytes(String text) throws IllegalArgumentException {
  100. checkNotNull(text, "text");
  101. final String trimmed = text.trim();
  102. checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
  103. final int len = trimmed.length();
  104. int pos = 0;
  105. char current;
  106. while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
  107. pos++;
  108. }
  109. final String number = trimmed.substring(0, pos);
  110. final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
  111. if (number.isEmpty()) {
  112. throw new NumberFormatException("text does not start with a number");
  113. }
  114. final long value;
  115. try {
  116. value = Long.parseLong(number); // this throws a NumberFormatException on overflow
  117. }
  118. catch (NumberFormatException e) {
  119. throw new IllegalArgumentException("The value '" + number +
  120. "' cannot be re represented as 64bit number (numeric overflow).");
  121. }
  122. final long multiplier;
  123. if (unit.isEmpty()) {
  124. multiplier = 1L;
  125. }
  126. else {
  127. if (matchesAny(unit, BYTES)) {
  128. multiplier = 1L;
  129. }
  130. else if (matchesAny(unit, KILO_BYTES)) {
  131. multiplier = 1024L;
  132. }
  133. else if (matchesAny(unit, MEGA_BYTES)) {
  134. multiplier = 1024L * 1024L;
  135. }
  136. else if (matchesAny(unit, GIGA_BYTES)) {
  137. multiplier = 1024L * 1024L * 1024L;
  138. }
  139. else if (matchesAny(unit, TERA_BYTES)) {
  140. multiplier = 1024L * 1024L * 1024L * 1024L;
  141. }
  142. else {
  143. throw new IllegalArgumentException("Memory size unit '" + unit +
  144. "' does not match any of the recognized units: " + MemoryUnit.getAllUnits());
  145. }
  146. }
  147. final long result = value * multiplier;
  148. // check for overflow
  149. if (result / multiplier != value) {
  150. throw new IllegalArgumentException("The value '" + text +
  151. "' cannot be re represented as 64bit number of bytes (numeric overflow).");
  152. }
  153. return result;
  154. }
  155. private static boolean matchesAny(String str, MemoryUnit unit) {
  156. for (String s : unit.getUnits()) {
  157. if (s.equals(str)) {
  158. return true;
  159. }
  160. }
  161. return false;
  162. }
  163. //......
  164. }
  165. 复制代码
  • MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法

MemoryUnit

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

  1. /**
  2. * Enum which defines memory unit, mostly used to parse value from configuration file.
  3. *
  4. * <p>To make larger values more compact, the common size suffixes are supported:
  5. *
  6. * <ul>
  7. * <li>q or 1b or 1bytes (bytes)
  8. * <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
  9. * <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
  10. * <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
  11. * <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
  12. * </ul>
  13. *
  14. */
  15. public enum MemoryUnit {
  16. BYTES(new String[] { "b", "bytes" }),
  17. KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),
  18. MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),
  19. GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),
  20. TERA_BYTES(new String[] { "t", "tb", "tebibytes" });
  21. private String[] units;
  22. MemoryUnit(String[] units) {
  23. this.units = units;
  24. }
  25. public String[] getUnits() {
  26. return units;
  27. }
  28. public static String getAllUnits() {
  29. return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());
  30. }
  31. public static boolean hasUnit(String text) {
  32. checkNotNull(text, "text");
  33. final String trimmed = text.trim();
  34. checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
  35. final int len = trimmed.length();
  36. int pos = 0;
  37. char current;
  38. while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
  39. pos++;
  40. }
  41. final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
  42. return unit.length() > 0;
  43. }
  44. private static String concatenateUnits(final String[]... allUnits) {
  45. final StringBuilder builder = new StringBuilder(128);
  46. for (String[] units : allUnits) {
  47. builder.append('(');
  48. for (String unit : units) {
  49. builder.append(unit);
  50. builder.append(" | ");
  51. }
  52. builder.setLength(builder.length() - 3);
  53. builder.append(") / ");
  54. }
  55. builder.setLength(builder.length() - 3);
  56. return builder.toString();
  57. }
  58. }
  59. 复制代码
  • MemoryUnit枚举定义了BYTES、KILO_BYTES、MEGA_BYTES、GIGA_BYTES、TERA_BYTES;它有units属性,是一个string数组,用于指定每类单位的文本标识,最后匹配时都是转换为小写来匹配

FlinkYarnSessionCli

flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java

  1. public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {
  2. //......
  3. private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
  4. if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
  5. LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
  6. }
  7. // TODO: The number of task manager should be deprecated soon
  8. final int numberTaskManagers;
  9. if (cmd.hasOption(container.getOpt())) {
  10. numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
  11. } else {
  12. numberTaskManagers = 1;
  13. }
  14. // JobManager Memory
  15. final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
  16. // Task Managers memory
  17. final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();
  18. int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
  19. return new ClusterSpecification.ClusterSpecificationBuilder()
  20. .setMasterMemoryMB(jobManagerMemoryMB)
  21. .setTaskManagerMemoryMB(taskManagerMemoryMB)
  22. .setNumberTaskManagers(numberTaskManagers)
  23. .setSlotsPerTaskManager(slotsPerTaskManager)
  24. .createClusterSpecification();
  25. }
  26. //......
  27. }
  28. 复制代码
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB

config.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh

  1. //......
  2. DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
  3. DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
  4. DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
  5. DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
  6. DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
  7. DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
  8. DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
  9. DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
  10. DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
  11. //......
  12. # Define FLINK_JM_HEAP if it is not already set
  13. if [ -z "${FLINK_JM_HEAP}" ]; then
  14. FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
  15. fi
  16. # Try read old config key, if new key not exists
  17. if [ "${FLINK_JM_HEAP}" == 0 ]; then
  18. FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
  19. fi
  20. //......
  21. if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
  22. FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
  23. # Remove leading and ending double quotes (if present) of value
  24. FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
  25. fi
  26. if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
  27. FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
  28. # Remove leading and ending double quotes (if present) of value
  29. FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
  30. fi
  31. //......
  32. # Arguments for the JVM. Used for job and task manager JVMs.
  33. # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
  34. # KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
  35. if [ -z "${JVM_ARGS}" ]; then
  36. JVM_ARGS=""
  37. fi
  38. //......
  39. 复制代码
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB
  • 如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空
  • JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置

jobmanager.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh

  1. #!/usr/bin/env bash
  2. ################################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ################################################################################
  19. # Start/stop a Flink JobManager.
  20. USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"
  21. STARTSTOP=$1
  22. HOST=$2 # optional when starting multiple instances
  23. WEBUIPORT=$3 # optional when starting multiple instances
  24. if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  25. echo $USAGE
  26. exit 1
  27. fi
  28. bin=`dirname "$0"`
  29. bin=`cd "$bin"; pwd`
  30. . "$bin"/config.sh
  31. ENTRYPOINT=standalonesession
  32. if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
  33. if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
  34. echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
  35. else
  36. flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
  37. FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
  38. fi
  39. if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
  40. echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
  41. exit 1
  42. fi
  43. if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
  44. export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
  45. fi
  46. # Add JobManager-specific JVM options
  47. export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
  48. # Startup parameters
  49. args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
  50. if [ ! -z $HOST ]; then
  51. args+=("--host")
  52. args+=("${HOST}")
  53. fi
  54. if [ ! -z $WEBUIPORT ]; then
  55. args+=("--webui-port")
  56. args+=("${WEBUIPORT}")
  57. fi
  58. fi
  59. if [[ $STARTSTOP == "start-foreground" ]]; then
  60. exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
  61. else
  62. "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
  63. fi
  64. 复制代码
  • jobmanager.sh首先调用config.sh来初始化相关变量(FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS)
  • 如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量;如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;然后将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts)中
  • jobmanager.sh最后调用flink-console.sh来启动相关类

flink-console.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh

  1. #!/usr/bin/env bash
  2. ################################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ################################################################################
  19. # Start a Flink service as a console application. Must be stopped with Ctrl-C
  20. # or with SIGTERM by kill or the controlling process.
  21. USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"
  22. SERVICE=$1
  23. ARGS=("${@:2}") # get remaining arguments as array
  24. bin=`dirname "$0"`
  25. bin=`cd "$bin"; pwd`
  26. . "$bin"/config.sh
  27. case $SERVICE in
  28. (taskexecutor)
  29. CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  30. ;;
  31. (historyserver)
  32. CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
  33. ;;
  34. (zookeeper)
  35. CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
  36. ;;
  37. (standalonesession)
  38. CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
  39. ;;
  40. (standalonejob)
  41. CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
  42. ;;
  43. (*)
  44. echo "Unknown service '${SERVICE}'. $USAGE."
  45. exit 1
  46. ;;
  47. esac
  48. FLINK_TM_CLASSPATH=`constructFlinkClassPath`
  49. log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
  50. JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
  51. # Only set JVM 8 arguments if we have correctly extracted the version
  52. if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
  53. if [ "$JAVA_VERSION" -lt 18 ]; then
  54. JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
  55. fi
  56. fi
  57. echo "Starting $SERVICE as a console application on host $HOSTNAME."
  58. exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
  59. 复制代码
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

小结

  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize;MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB;如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空;JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
  • jobmanager.sh首先调用config.sh来初始化相关变量(FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts)中;jobmanager.sh最后调用flink-console.sh来启动相关类
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,可以指定FLINK_JM_HEAP环境变量(比如FLINK_JM_HEAP=512m),或者在flink-conf.yaml中指定jobmanager.heap.size

doc

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/941709
推荐阅读
相关标签
  

闽ICP备14008679号