赞
踩
execution.batch.speculative.enabled:false,推测机制开关,必须在AdaptiveBatchScheduler模式下使用
execution.batch.speculative.max-concurrent-executions:2,同时最多几次执行
execution.batch.speculative.block-slow-node-duration:1分钟,慢速节点会如黑名单,控制在黑名单中的时长
slow-task-detector.check-interval:1秒,慢任务检查间隔
slow-task-detector.execution-time.baseline-lower-bound:1分钟,慢任务检测基线的下限
slow-task-detector.execution-time.baseline-ratio:0.75,开始检测慢任务基线的任务完成率,即有75%任务完成后,开始计算剩下的任务是否为慢任务
slow-task-detector.execution-time.baseline-multiplier:1.5,慢任务基线乘数
推测机制在AdaptiveBatchScheduler模式下使用,在AdaptiveBatchSchedulerFactory当中,创建调度器时,如果开启了推测机制,会创建SpeculativeScheduler
- if (enableSpeculativeExecution) {
- return new SpeculativeScheduler(
- log,
- jobGraph,
- ioExecutor,
- jobMasterConfiguration,
调度器启动时有三个操作:1、注册指标;2、父类通用的启动流程,会有算子的一些初始化;3、启动慢任务检测任务
- protected void startSchedulingInternal() {
- registerMetrics(jobManagerJobMetricGroup);
-
- super.startSchedulingInternal();
- slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
- }
SlowTaskDetector负责检测慢任务,实现类是ExecutionTimeBasedSlowTaskDetector,基于schedule进行检测
- this.scheduledDetectionFuture =
- mainThreadExecutor.schedule(
- () -> {
- listener.notifySlowTasks(findSlowTasks(executionGraph));
- scheduleTask(executionGraph, listener, mainThreadExecutor);
- },
- checkIntervalMillis,
- TimeUnit.MILLISECONDS);
核心是findSlowTasks,首先是获取需要校验的拓扑集
- private List<ExecutionJobVertex> getJobVerticesToCheck(final ExecutionGraph executionGraph) {
- return IterableUtils.toStream(executionGraph.getVerticesTopologically())
- .filter(ExecutionJobVertex::isInitialized)
- .filter(ejv -> ejv.getAggregateState() != ExecutionState.FINISHED)
- .filter(ejv -> getFinishedRatio(ejv) >= baselineRatio)
- .collect(Collectors.toList());
- }
getFinishedRatio就是获取完成任务数超过基线比率的,就是拓扑集中完成任务数和总任务数的比值
- private double getFinishedRatio(final ExecutionJobVertex executionJobVertex) {
- checkState(executionJobVertex.getTaskVertices().length > 0);
- long finishedCount =
- Arrays.stream(executionJobVertex.getTaskVertices())
- .filter(ev -> ev.getExecutionState() == ExecutionState.FINISHED)
- .count();
- return (double) finishedCount / executionJobVertex.getTaskVertices().length;
- }
接下来是获取基线和在基线基础上计算慢速任务的,接口是getBaseline和findExecutionsExceedingBaseline,本质就是执行时间和基线的对比,注意这里不仅用到了时间,还用到了输入字节数,所以慢任务的检测可能是基于吞吐来的
- private ExecutionTimeWithInputBytes getBaseline(
- final ExecutionJobVertex executionJobVertex, final long currentTimeMillis) {
- final ExecutionTimeWithInputBytes weightedExecutionTimeMedian =
- calculateFinishedTaskExecutionTimeMedian(executionJobVertex, currentTimeMillis);
- long multipliedBaseline =
- (long) (weightedExecutionTimeMedian.getExecutionTime() * baselineMultiplier);
-
- return new ExecutionTimeWithInputBytes(
- multipliedBaseline, weightedExecutionTimeMedian.getInputBytes());
- }
-
-
- return Double.compare(
- (double) executionTime / Math.max(inputBytes, Double.MIN_VALUE),
- (double) other.getExecutionTime()
- / Math.max(other.getInputBytes(), Double.MIN_VALUE));
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
获取慢速任务以后,SlowTaskDetector会触发监听器,监听器的处理实现在SpeculativeScheduler的notifySlowTasks接口
首先把节点加入黑名单
- // add slow nodes to blocklist before scheduling new speculative executions
- blockSlowNodes(slowTasks, currentTimestamp);
这边会检测任务是否支持推测,默认是支持
- if (!executionVertex.isSupportsConcurrentExecutionAttempts()) {
- continue;
- }
基于时间戳,对慢任务新建Execution
- final Collection<Execution> attempts =
- IntStream.range(0, newSpeculativeExecutionsToDeploy)
- .mapToObj(
- i ->
- executionVertex.createNewSpeculativeExecution(
- currentTimestamp))
- .collect(Collectors.toList());
之后会进行一系列的配置,加入监控
- setupSubtaskGatewayForAttempts(executionVertex, attempts);
- verticesToDeploy.add(executionVertexId);
- newSpeculativeExecutions.addAll(attempts);
最后发起调度
- executionDeployer.allocateSlotsAndDeploy(
- newSpeculativeExecutions,
- executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy));
任务结束主要核心在DefaultExecutionGraph的jobFinished,判断在上层ExecutionJobVertex.executionVertexFinished,这里是通过任务并行度来判断的,所有子任务完成则认为job完成
- void executionVertexFinished() {
- checkState(isInitialized());
- numExecutionVertexFinished++;
- if (numExecutionVertexFinished == parallelismInfo.getParallelism()) {
- getGraph().jobVertexFinished();
- }
- }
这个的调用是由Execution触发的,也就是每个子任务完成会去调用一次
- if (transitionState(current, FINISHED)) {
- try {
- finishPartitionsAndUpdateConsumers();
- updateAccumulatorsAndMetrics(userAccumulators, metrics);
- releaseAssignedResource(null);
- vertex.getExecutionGraphAccessor().deregisterExecution(this);
- } finally {
- vertex.executionFinished(this);
- }
- return;
- }
最终一个jobVertex(对应Job的一个任务,任务根据并行度有子任务)完成的时候会通知所有子任务完成
- public void jobVertexFinished() {
- assertRunningInJobMasterMainThread();
- final int numFinished = ++numFinishedJobVertices;
- if (numFinished == numJobVerticesTotal) {
- FutureUtils.assertNoException(
- waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished()));
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。