当前位置:   article > 正文

Flink推测机制

Flink推测机制

1、配置

    execution.batch.speculative.enabled:false,推测机制开关,必须在AdaptiveBatchScheduler模式下使用

    execution.batch.speculative.max-concurrent-executions:2,同时最多几次执行

    execution.batch.speculative.block-slow-node-duration:1分钟,慢速节点会如黑名单,控制在黑名单中的时长

    slow-task-detector.check-interval:1秒,慢任务检查间隔

    slow-task-detector.execution-time.baseline-lower-bound:1分钟,慢任务检测基线的下限

    slow-task-detector.execution-time.baseline-ratio:0.75,开始检测慢任务基线的任务完成率,即有75%任务完成后,开始计算剩下的任务是否为慢任务

    slow-task-detector.execution-time.baseline-multiplier:1.5,慢任务基线乘数

2、SpeculativeScheduler

    推测机制在AdaptiveBatchScheduler模式下使用,在AdaptiveBatchSchedulerFactory当中,创建调度器时,如果开启了推测机制,会创建SpeculativeScheduler

  1. if (enableSpeculativeExecution) {
  2.     return new SpeculativeScheduler(
  3.             log,
  4.             jobGraph,
  5.             ioExecutor,
  6.             jobMasterConfiguration,

2.1、启动

    调度器启动时有三个操作:1、注册指标;2、父类通用的启动流程,会有算子的一些初始化;3、启动慢任务检测任务

  1. protected void startSchedulingInternal() {
  2.     registerMetrics(jobManagerJobMetricGroup);
  3.     super.startSchedulingInternal();
  4.     slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
  5. }

2.2、SlowTaskDetector

    SlowTaskDetector负责检测慢任务,实现类是ExecutionTimeBasedSlowTaskDetector,基于schedule进行检测

  1. this.scheduledDetectionFuture =
  2.         mainThreadExecutor.schedule(
  3.                 () -> {
  4.                     listener.notifySlowTasks(findSlowTasks(executionGraph));
  5.                     scheduleTask(executionGraph, listener, mainThreadExecutor);
  6.                 },
  7.                 checkIntervalMillis,
  8.                 TimeUnit.MILLISECONDS);

    核心是findSlowTasks,首先是获取需要校验的拓扑集

  1. private List<ExecutionJobVertex> getJobVerticesToCheck(final ExecutionGraph executionGraph) {
  2.     return IterableUtils.toStream(executionGraph.getVerticesTopologically())
  3.             .filter(ExecutionJobVertex::isInitialized)
  4.             .filter(ejv -> ejv.getAggregateState() != ExecutionState.FINISHED)
  5.             .filter(ejv -> getFinishedRatio(ejv) >= baselineRatio)
  6.             .collect(Collectors.toList());
  7. }

    getFinishedRatio就是获取完成任务数超过基线比率的,就是拓扑集中完成任务数和总任务数的比值

  1. private double getFinishedRatio(final ExecutionJobVertex executionJobVertex) {
  2.     checkState(executionJobVertex.getTaskVertices().length > 0);
  3.     long finishedCount =
  4.             Arrays.stream(executionJobVertex.getTaskVertices())
  5.                     .filter(ev -> ev.getExecutionState() == ExecutionState.FINISHED)
  6.                     .count();
  7.     return (double) finishedCount / executionJobVertex.getTaskVertices().length;
  8. }

    接下来是获取基线和在基线基础上计算慢速任务的,接口是getBaseline和findExecutionsExceedingBaseline,本质就是执行时间和基线的对比,注意这里不仅用到了时间,还用到了输入字节数,所以慢任务的检测可能是基于吞吐来的

  1. private ExecutionTimeWithInputBytes getBaseline(
  2.         final ExecutionJobVertex executionJobVertex, final long currentTimeMillis) {
  3.     final ExecutionTimeWithInputBytes weightedExecutionTimeMedian =
  4.             calculateFinishedTaskExecutionTimeMedian(executionJobVertex, currentTimeMillis);
  5.     long multipliedBaseline =
  6.             (long) (weightedExecutionTimeMedian.getExecutionTime() * baselineMultiplier);
  7.     return new ExecutionTimeWithInputBytes(
  8.             multipliedBaseline, weightedExecutionTimeMedian.getInputBytes());
  9. }
  10. return Double.compare(
  11.         (double) executionTime / Math.max(inputBytes, Double.MIN_VALUE),
  12.         (double) other.getExecutionTime()
  13.                 / Math.max(other.getInputBytes(), Double.MIN_VALUE));

2.3、notifySlowTasks

    获取慢速任务以后,SlowTaskDetector会触发监听器,监听器的处理实现在SpeculativeScheduler的notifySlowTasks接口

    首先把节点加入黑名单

  1. // add slow nodes to blocklist before scheduling new speculative executions
  2. blockSlowNodes(slowTasks, currentTimestamp);

    这边会检测任务是否支持推测,默认是支持

  1. if (!executionVertex.isSupportsConcurrentExecutionAttempts()) {
  2.     continue;
  3. }

    基于时间戳,对慢任务新建Execution

  1. final Collection<Execution> attempts =
  2.         IntStream.range(0, newSpeculativeExecutionsToDeploy)
  3.                 .mapToObj(
  4.                         i ->
  5.                                 executionVertex.createNewSpeculativeExecution(
  6.                                         currentTimestamp))
  7.                 .collect(Collectors.toList());

    之后会进行一系列的配置,加入监控

  1. setupSubtaskGatewayForAttempts(executionVertex, attempts);
  2. verticesToDeploy.add(executionVertexId);
  3. newSpeculativeExecutions.addAll(attempts);

    最后发起调度

  1. executionDeployer.allocateSlotsAndDeploy(
  2.         newSpeculativeExecutions,
  3.         executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));

3、任务结束

    任务结束主要核心在DefaultExecutionGraph的jobFinished,判断在上层ExecutionJobVertex.executionVertexFinished,这里是通过任务并行度来判断的,所有子任务完成则认为job完成

  1. void executionVertexFinished() {
  2.     checkState(isInitialized());
  3.     numExecutionVertexFinished++;
  4.     if (numExecutionVertexFinished == parallelismInfo.getParallelism()) {
  5.         getGraph().jobVertexFinished();
  6.     }
  7. }

    这个的调用是由Execution触发的,也就是每个子任务完成会去调用一次

  1. if (transitionState(current, FINISHED)) {
  2.     try {
  3.         finishPartitionsAndUpdateConsumers();
  4.         updateAccumulatorsAndMetrics(userAccumulators, metrics);
  5.         releaseAssignedResource(null);
  6.         vertex.getExecutionGraphAccessor().deregisterExecution(this);
  7.     } finally {
  8.         vertex.executionFinished(this);
  9.     }
  10.     return;
  11. }

    最终一个jobVertex(对应Job的一个任务,任务根据并行度有子任务)完成的时候会通知所有子任务完成

  1. public void jobVertexFinished() {
  2.     assertRunningInJobMasterMainThread();
  3.     final int numFinished = ++numFinishedJobVertices;
  4.     if (numFinished == numJobVerticesTotal) {
  5.         FutureUtils.assertNoException(
  6.                 waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished()));
  7.     }
  8. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/824800
推荐阅读
相关标签
  

闽ICP备14008679号