当前位置:   article > 正文

Flink1.19源码深度解析-ClusterEntrypoint_flink 1.19

flink 1.19

 源码视频课程(连载中):

https://edu.csdn.net/course/detail/39418

1.本节课目的

--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown

  1. 1.Flink集群启动模式
  2. 2.深入解析StandaloneSessionClusterEntrypoint类、ClusterEntrypoint.runClusterEntrypoint()/startCluster()方法

2.本节内容

1.集群启动模式

2.集群启动和初始化

3、本节详细内容

1.集群启动模式

ClusterEntrypoint是Flink集群入口点的基类,该类是抽象类,实现着有

SessionClusterEntrypoint、JobClusterEntrypoint、ApplicationClusterEntryPoint

1.Per-job

每一个提交的Job单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

任务之间的资源隔离性好。已经被标记为不推荐使用状态

2.Session

Session集群能够运行多个Flink作业,切这些作业共享运行中的Dispatcher、ResourceManager等组件服务。

集群资源使用率高

3.Application

对于per-job模式,jar包的解析、生成JobGraph是在客户端上执行的,如果任务特别多的话,那么这些生成JobGraph会对客户端服务器有压力。

Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。

如果一个main()方法中有多个env.execute()/executeAsync(),在Application模式下,这些作业会被视为属于同一个应用,在同一个集群中执行(如果在Per-Job模式下,就会启动多个集群)。

Application模式弥补了per-job的不足。

config.yaml

--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown

  1. # JobManager 的主机地址
  2. jobmanager.rpc.address: localhost
  3. # The RPC port where the JobManager is reachable.
  4. #可访问JobManager的RPC端口
  5. jobmanager.rpc.port: 6123
  6. #jobmanager绑定ip,
  7. jobmanager.bind-host: localhost
  8. #JVM 进程总内存
  9. jobmanager.memory.process.size: 1600m
  10. #taskmanager绑定ip,
  11. taskmanager.bind-host: localhost
  12. #唯一标识 window下必须配置
  13. taskmanager.resource-id: q1
  14. taskmanager.cpu.cores: 1
  15. #任务的堆内存
  16. taskmanager.memory.task.heap.size: 512m
  17. #托管内存
  18. taskmanager.memory.managed.size: 512m
  19. #网络内存(Network Memory)
  20. taskmanager.memory.network.min: 128m
  21. taskmanager.memory.network.max: 128m
  22. #任务堆外内存
  23. taskmanager.memory.task.off-heap.size: 0m
  24. #框架内存
  25. taskmanager.memory.framework.heap.size: 256m
  26. #框架堆外内存
  27. taskmanager.memory.framework.off-heap.size: 128m
  28. #JVM Metaspace
  29. taskmanager.memory.jvm-metaspace.size: 128m
  30. #JVM 开销
  31. taskmanager.memory.jvm-overhead.min: 128m
  32. taskmanager.memory.jvm-overhead.max: 128m
  33. #心跳参数
  34. heartbeat.timeout: 50000
  35. heartbeat.interval: 10000
  36. taskmanager.host: localhost
  37. # 进程总内存大小
  38. taskmanager.memory.process.size: 1728m
  39. #每个 TaskManager上并发的 slot 数
  40. taskmanager.numberOfTaskSlots: 1
  41. # The parallelism used for programs that did not specify and other parallelism.
  42. #用于未指定的程序的并行度和其他并行度。
  43. parallelism.default: 1
  44. jobmanager.execution.failover-strategy: region
  45. rest.address: localhost
  46. rest.bind-address: localhost

StandaloneSessionClusterEntrypoint入口类

--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown

  1. 参数1:-Dlog.file=./log/flink-jobmanager-1.local.log -Dlog4j.configuration=./a_conf/log4j.properties -Dlog4j.configurationFile=./a_conf/log4j.properties -Dlogback.configurationFile=./a_conf/logback.xml
  2. 参数2:-c config
  3. 参数3:添加a_lib下的jar

image.png

--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown

  1. package org.apache.flink.runtime.entrypoint;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
  4. import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
  5. import org.apache.flink.runtime.util.EnvironmentInformation;
  6. import org.apache.flink.runtime.util.JvmShutdownSafeguard;
  7. import org.apache.flink.runtime.util.SignalHandler;
  8. /** Entry point for the standalone session cluster. */
  9. /** Session集群的入口点 */
  10. public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
  11. /** main函数中进行构造传入Configuration */
  12. public StandaloneSessionClusterEntrypoint(Configuration configuration) {
  13. super(configuration);
  14. }
  15. /**
  16. * 构造DefaultDispatcherResourceManagerComponentFactory对象
  17. * ClusterEntrypoint.createDispatcherResourceManagerComponentFactory 初始化
  18. * Dispatcher,ResourceManager时候会被调用
  19. * 这个在初始化组件的时候需要会被调用
  20. * @param configuration
  21. * @return
  22. */
  23. @Override
  24. protected DefaultDispatcherResourceManagerComponentFactory
  25. createDispatcherResourceManagerComponentFactory(Configuration configuration) {
  26. return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
  27. StandaloneResourceManagerFactory.getInstance());
  28. }
  29. /** args=["-c","a_conf"] */
  30. public static void main(String[] args) {
  31. // startup checks and logging
  32. /**检查参数和启动日志*/
  33. EnvironmentInformation.logEnvironmentInfo(
  34. LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
  35. /**
  36. * SignalHandler信号注册器
  37. * Flink 进程启动时的一部分初始化步骤以确保在接收到终止信号时能够优雅地关闭。
  38. * 如,当 Flink 作业管理器(JobManager)或任务管理器(TaskManager)接收到 SIGTERM 信号时,
  39. * 信号处理器可能会被触发,以开始关闭进程并释放资源。
  40. */
  41. SignalHandler.register(LOG);
  42. /**
  43. * 注册一个安全关闭钩子。
  44. * 当 JVM 接收到终止信号(如 SIGTERM 或 SIGINT)或调用 System.exit() 方法时,
  45. * 可以确保在 JVM 关闭前执行必要的清理和释放资源操作。
  46. * 注册一个安全关闭钩子。JVM允许花费的最长时间被杀死前的关机时间是5秒。
  47. * 5秒之内不关闭则强制调用Runtime.getRuntime().halt(EXIT_CODE)关闭see JvmShutdownSafeguard.run
  48. */
  49. JvmShutdownSafeguard.installAsShutdownHook(LOG);
  50. /**
  51. * 设置配置的目录
  52. * configDir=a_conf
  53. */
  54. final EntrypointClusterConfiguration entrypointClusterConfiguration =
  55. ClusterEntrypointUtils.parseParametersOrExit(
  56. args,
  57. new EntrypointClusterConfigurationParserFactory(),
  58. StandaloneSessionClusterEntrypoint.class);
  59. /**
  60. * 加载配置参数
  61. * flink-conf.yaml 中配置的参数
  62. */
  63. Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
  64. /**configuration 传入 StandaloneSessionClusterEntrypoint 创建对象*/
  65. StandaloneSessionClusterEntrypoint entrypoint =
  66. new StandaloneSessionClusterEntrypoint(configuration);
  67. /**
  68. * 最终会调用 ClusterEntrypoint.runClusterEntrypoint
  69. * 所有的初始化最终都会在ClusterEntrypoint 进行
  70. */
  71. ClusterEntrypoint.runClusterEntrypoint(entrypoint);
  72. }
  73. }

ClusterEntrypoint源码分析

--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown

  1. /**
  2. * ClusterEntrypoint Flink集群入口点的基类
  3. * 调用ClusterEntrypoint.startCluster方法进行初始化
  4. * @param clusterEntrypoint
  5. */
  6. public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
  7. /** StandaloneSessionClusterEntrypoint */
  8. final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
  9. try {
  10. clusterEntrypoint.startCluster();
  11. } catch (ClusterEntrypointException e) {
  12. LOG.error(
  13. String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
  14. e);
  15. System.exit(STARTUP_FAILURE_RETURN_CODE);
  16. }
  17. int returnCode;
  18. Throwable throwable = null;
  19. try {
  20. returnCode = clusterEntrypoint.getTerminationFuture().get().processExitCode();
  21. } catch (Throwable e) {
  22. throwable = ExceptionUtils.stripExecutionException(e);
  23. returnCode = RUNTIME_FAILURE_RETURN_CODE;
  24. }
  25. LOG.info(
  26. "Terminating cluster entrypoint process {} with exit code {}.",
  27. clusterEntrypointName,
  28. returnCode,
  29. throwable);
  30. System.exit(returnCode);
  31. }
  32. public void startCluster() throws ClusterEntrypointException {
  33. LOG.info("Starting {}.", getClass().getSimpleName());
  34. try {
  35. /**
  36. * FlinkSecurityManager Flink 做一些安全的检查
  37. *cluster.intercept-user-system-exit配置默认DISABLED
  38. *cluster.processes.halt-on-fatal-error默认为false
  39. *
  40. * 1.是否拦截用户作业中的 System.exit() 调用当这个配置选项被设置为 true 时,
  41. * Flink 会拦截用户作业中的 System.exit() 调用,并防止作业因为此类调用而意外终止。
  42. * 2.如果Flink遇到了不能恢复的错误,是否直接让JobManager进程终止。
  43. * 默认不做任何处理
  44. */
  45. FlinkSecurityManager.setFromConfiguration(configuration);
  46. /**
  47. * PluginManager flink插件管理类。在 Apache Flink 的上下文中,PluginManager 负责加载和管理 Flink 插件。
  48. * PluginManager负责管理集群插件,这些插件是使用单独的类加载器加载的,以便它们的依赖关系,不干涉Flink的依赖关系。
  49. * 比如我们自己实现一个写s3、或者我们自己定义数据源 插件以JAR的形式存在
  50. * 通过如下参数配置FLINK_PLUGINS_DIR、plugins
  51. * 总结:FLINK_PLUGINS_DIR、plugins 两个插件配置
  52. */
  53. PluginManager pluginManager =
  54. PluginUtils.createPluginManagerFromRootFolder(configuration);
  55. /**
  56. * 配置文件系统,configureFileSystems(configuration, pluginManager) 方法通常用于配置 Flink 集群中使用的文件系统(FileSystems)。
  57. * Flink 支持多种类型的文件系统,如 Hadoop 的分布式文件系统(HDFS)、本地文件系统、云存储服务等,用于读取和写入数据。
  58. * 初始化文件系统设置
  59. * 本地:file
  60. * hadoop:hdfs
  61. *FileSystem类进行初始化
  62. */
  63. configureFileSystems(configuration, pluginManager);
  64. /**
  65. * 初始化安全上下文环境 默认HadoopSecurityContext
  66. * Hadoop安全上下文,使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos
  67. * 总结:初始化安全环境,创建安全环境的时候会做一系列的检查
  68. * security.module.factory.classes:包含哪些SecurityModuleFactory,org.apache.flink.runtime.security.modules.HadoopModuleFactory
  69. * 值为HadoopModuleFactory,JaasModuleFactory,ZookeeperModuleFactory
  70. * security.context.factory.classes:包含哪些 SecurityContextFactory,HadoopSecurityContextFactory 或者 NoOpSecurityContextFactory
  71. */
  72. SecurityContext securityContext = installSecurityContext(configuration);
  73. /**
  74. * cluster.uncaught-exception-handling 默认Log
  75. * 根据配置为当前线程设置未捕获的异常处理程序。 默认为LOG打印
  76. * 比如程序遇到了未捕获的异常则交给log 也就是打印出来
  77. */
  78. ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
  79. securityContext.runSecured(
  80. (Callable<Void>)
  81. () -> {
  82. /** 真正组件初始化的地方 */
  83. runCluster(configuration, pluginManager);
  84. return null;
  85. });
  86. } catch (Throwable t) {
  87. final Throwable strippedThrowable =
  88. ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
  89. try {
  90. // clean up any partial state
  91. shutDownAsync(
  92. ApplicationStatus.FAILED,
  93. ShutdownBehaviour.GRACEFUL_SHUTDOWN,
  94. ExceptionUtils.stringifyException(strippedThrowable),
  95. false)
  96. .get(
  97. INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(),
  98. TimeUnit.MILLISECONDS);
  99. } catch (InterruptedException | ExecutionException | TimeoutException e) {
  100. strippedThrowable.addSuppressed(e);
  101. }
  102. throw new ClusterEntrypointException(
  103. String.format(
  104. "Failed to initialize the cluster entrypoint %s.",
  105. getClass().getSimpleName()),
  106. strippedThrowable);
  107. }
  108. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/461095
推荐阅读
相关标签
  

闽ICP备14008679号