赞
踩
转载:Flink 1.12.2 源码浅析 : StreamTask 浅析
在Task类的doRun方法中, 首先会构建一个运行环境变量RuntimeEnvironment . 然后会调用loadAndInstantiateInvokable方法来加载&实例化task的可执行代码 .
可以看一下loadAndInstantiateInvokable 方法会根据传入的类加载器userCodeClassLoader.asClassLoader()、实例化类的名字nameOfInvokableClass以及构建实例化任务所需要的环境变量信息RuntimeEnvironment.
org.apache.flink.runtime.taskmanager.Task#doRun
private void doRun() { // 初始化状态相关 代码.. // 记载执行代码所需要的各种任务相关... // 请求与初始化用户的代码&方法 // 构建代码执行所需要的环境变量 Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, aggregateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, consumableNotifyingPartitionWriters, inputGates, taskEventDispatcher, checkpointResponder, operatorCoordinatorEventGateway, taskManagerConfig, metrics, this, externalResourceInfoProvider); // 加载&实例化task的可执行代码 // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable( userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env); // 执行代码 invokable.invoke(); // 其他代码略....... }
在这里,我们看一下实例化的类nameOfInvokableClass的主要的四种类型 .
名称 | 描述 |
---|---|
org.apache.flink.streaming.runtime.tasks.SourceStreamTask | Source相关的StreamTask |
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask | 单输入的StreamTask |
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask | 两输入的StreamTask |
org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask | 多输入的StreamTask |
org.apache.flink.streaming.runtime.tasks.StreamIterationHead | A special {@link StreamTask} that is used for executing feedback edges. |
org.apache.flink.streaming.runtime.tasks.StreamIterationTail | A special {@link StreamTask} that is used for executing feedback edges. |
这是TaskManager可以执行的每个任务的抽象基类。
具体的任务扩展了这个类,例如流式处理和批处理任务。
TaskManager在执行任务时调用{@link#invoke()}方法。
任务的所有操作都在此方法中发生(设置输入输出流读写器以及任务的核心操作)。
所有扩展的类都必须提供构造函数{@code MyTask(Environment,TaskStateSnapshot)}.
为了方便起见,总是无状态的任务也只能实现构造函数{@code MyTask(Environment)}.
开发说明:
虽然构造函数不能在编译时强制执行,但我们还没有冒险引入工厂(毕竟它只是一个内部API,对于java8,可以像工厂lambda一样使用 {@code Class::new} )。
注意:
没有接受初始任务状态快照并将其存储在变量中的构造函数。
这是出于目的,因为抽象调用本身不需要状态快照(只有StreamTask等子类需要状态),我们不希望无限期地存储引用,从而防止垃圾收集器清理初始状态结构。
任何支持可恢复状态并参与检查点设置的子类都需要重写
{@link #triggerCheckpointAsync(CheckpointMetaData, CheckpointOptions, boolean)},
{@link #triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointOptions, CheckpointMetricsBuilder)},
{@link #abortCheckpointOnBarrier(long, Throwable)}and {@link #notifyCheckpointCompleteAsync(long)}.
AbstractInvokable 抽象类只有两个属性Environment environment和shouldInterruptOnCancel = true
属性
/**
* 分配给此可调用对象的环境。
* The environment assigned to this invokable.
* */
private final Environment environment;
/**
* 标记取消是否应中断正在执行的线程。
* Flag whether cancellation should interrupt the executing thread.
* */
private volatile boolean shouldInterruptOnCancel = true;
构造方法
构造方法就是传入一个Environment对象.
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public AbstractInvokable(Environment environment) {
this.environment = checkNotNull(environment);
}
Environment 是AbstractInvokable抽象类(以及子类)的构造函数入参. 在构造Task的时候会把环境参数信息封装成Environment的子类.
交给 任务的实现类(比如: SourceStreamTask 或者 OneInputStreamTask 来处理.)
这个没啥可说的,就是封装了一系列的环境引用信息.
private final JobID jobId; private final JobVertexID jobVertexId; private final ExecutionAttemptID executionId; private final TaskInfo taskInfo; private final Configuration jobConfiguration; private final Configuration taskConfiguration; private final ExecutionConfig executionConfig; private final UserCodeClassLoader userCodeClassLoader; private final MemoryManager memManager; private final IOManager ioManager; private final BroadcastVariableManager bcVarManager; private final TaskStateManager taskStateManager; private final GlobalAggregateManager aggregateManager; private final InputSplitProvider splitProvider; private final ExternalResourceInfoProvider externalResourceInfoProvider; private final Map<String, Future<Path>> distCacheEntries; private final ResultPartitionWriter[] writers; private final IndexedInputGate[] inputGates; private final TaskEventDispatcher taskEventDispatcher; private final CheckpointResponder checkpointResponder; private final TaskOperatorEventGateway operatorEventGateway; private final AccumulatorRegistry accumulatorRegistry; private final TaskKvStateRegistry kvStateRegistry; private final TaskManagerRuntimeInfo taskManagerInfo; private final TaskMetricGroup metrics; private final Task containingTask;
核心的方法
名称 | 描述 |
---|---|
invoke | Starts the execution 必须被具体的任务实现所覆盖。当任务的实际执行开始时,task manager 将调用此方法。 |
cancel | 当由于用户中止或执行失败而取消任务时,将调用此方法. 它可以被覆盖以响应正确关闭用户代码。 |
shouldInterruptOnCancel | 设置执行{@link #invoke()}方法的线程是否应在取消过程中中断。 此方法为 initial interrupt 和 repeated interrupt 设置标志。 |
dispatchOperatorEvent | 外部影响task执行的入口. Operator Events |
Checkpoint相关方法
名称 | 描述 |
---|---|
triggerCheckpointAsync | 此方法由检查点协调器异步调用以触发检查点。 |
triggerCheckpointOnBarrier | 在所有 input streams 上接收到检查点屏障而触发检查点时,将调用此方法。 |
abortCheckpointOnBarrier | 在接收一些checkpoint barriers 的结果时, 放弃checkpoint … |
notifyCheckpointCompleteAsync | 通知checkpoint完成 |
notifyCheckpointAbortAsync | 通知notifyCheckpointAbortAsync取消 |
所有流式处理任务的基类。
Task是由TaskManager部署和执行的本地处理单元。
每个任务运行一个或多个{@link StreamOperator},这些{@link StreamOperator}构成任务的操作符 chained 。
chained 接在一起的运算符在同一线程中同步执行,因此在同一流分区上执行。
这些chained的常见情况是连续的map/flatmap/filter任务。
任务 chained 包含一个“head”operator和多个 chained operator。
StreamTask专门用于 head operator 的类型:
one-input : OneInputStreamTask
two-input tasks : TwoInputStreamTask
sources : SourceStreamTask
iteration heads : StreamIterationHead
iteration tails : StreamIterationTail
Task类处理由head操作符读取的流的设置,以及操作符在操作符 chained 的末端生成的流。
注意, chained 可能分叉,因此有多个端部。
任务的生命周期设置如下:
<pre>{@code -- setInitialState -> 提供chain中所有operators的状态 -- invoke() | +----> Create basic utils (config, etc) and load the chain of operators +----> operators.setup() +----> task specific init() +----> initialize-operator-states() +----> open-operators() +----> run() +----> close-operators() +----> dispose-operators() +----> common cleanup +----> task specific cleanup() }</pre>
{@code StreamTask}有一个名为{@code lock}的锁对象。
必须在此锁对象上同步对{@code StreamOperator}上方法的所有调用,以确保没有方法被并发调用。
属性相关
/** The thread group that holds all trigger timer threads. */ public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers"); /** The logger used by the StreamTask and its subclasses. */ protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); // ------------------------------------------------------------------------ /** * 任务之外的所有操作{@link #mailboxProcessor mailbox} , 比如 (i.e. 另一个线程执行) * 必须通过此执行器执行,以确保没有使一致检查点无效的并发方法调用。 * * * * All actions outside of the task {@link #mailboxProcessor mailbox} * (i.e. performed by another thread) * must be executed through this executor to ensure that we don't have concurrent method * calls that void consistent checkpoints. * * <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link * StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor * SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}. */ private final StreamTaskActionExecutor actionExecutor; /** * 输入处理器。在{@link #init()}方法中初始化。 * The input processor. Initialized in {@link #init()} method. */ @Nullable protected StreamInputProcessor inputProcessor; /** * [重要] 使用此任务的输入流的主运算符。 * the main operator that consumes the input streams of this task. * */ protected OP mainOperator; /** * task执行的 OperatorChain * The chain of operators executed by this task. */ protected OperatorChain<OUT, OP> operatorChain; /** * streaming task的配置信息. * The configuration of this streaming task. */ protected final StreamConfig configuration; /** * 我们的状态后端。 * * 我们使用它来创建检查点流和 keyed 状态后端。 * Our state backend. We use this to create checkpoint streams and a keyed state backend. */ protected final StateBackend stateBackend; /** * 子任务 Checkpoint 协调器 */ private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator; /** * 内部{@link TimerService}用于定义当前处理时间(默认值={@code System.currentTimeMillis()}) * 并为将来要执行的任务注册计时器。 * * The internal {@link TimerService} used to define the current processing time (default = * {@code System.currentTimeMillis()}) and register timers for tasks to be executed in the * future. */ protected final TimerService timerService; /** * 当前活动的后台具体线程 * The currently active background materialization threads. * */ private final CloseableRegistry cancelables = new CloseableRegistry(); /** * 异常处理相关 */ private final StreamTaskAsyncExceptionHandler asyncExceptionHandler; /** * 将任务标记为“操作中”的标志,在这种情况下,需要将check初始化为true, * 以便invoke()之前的early cancel()正常工作。 * * Flag to mark the task "in operation", in which case check needs to be initialized to true, so * that early cancel() before invoke() behaves correctly. */ private volatile boolean isRunning; /** * 标识任务被取消. * Flag to mark this task as canceled. */ private volatile boolean canceled; /** * 标识任务失败, 比如在invoke方法中发生异常... * * Flag to mark this task as failing, i.e. if an exception has occurred inside {@link * #invoke()}. */ private volatile boolean failing; /** * ???? 干啥的 */ private boolean disposedOperators; /** Thread pool for async snapshot workers. */ private final ExecutorService asyncOperationsThreadPool; private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter; protected final MailboxProcessor mailboxProcessor; final MailboxExecutor mainMailboxExecutor; /** TODO it might be replaced by the global IO executor on TaskManager level future. */ private final ExecutorService channelIOExecutor; private Long syncSavepointId = null; private Long activeSyncSavepointId = null; private long latestAsyncCheckpointStartDelayNanos;
构造方法就是普通的赋值操作, 需要注意的是 this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
protected StreamTask( Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception { super(environment); this.configuration = new StreamConfig(getTaskConfiguration()); this.recordWriter = createRecordWriterDelegate(configuration, environment); this.actionExecutor = Preconditions.checkNotNull(actionExecutor); this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor); this.mailboxProcessor.initMetric(environment.getMetricGroup()); this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment); this.asyncOperationsThreadPool = Executors.newCachedThreadPool( new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler)); this.stateBackend = createStateBackend(); // ???????????? this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl( stateBackend.createCheckpointStorage(getEnvironment().getJobID()), getName(), actionExecutor, getCancelables(), getAsyncOperationsThreadPool(), getEnvironment(), this, configuration.isUnalignedCheckpointsEnabled(), this::prepareInputSnapshot); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory( TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()); this.timerService = new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory); } else { this.timerService = timerService; } this.channelIOExecutor = Executors.newSingleThreadExecutor( new ExecutorThreadFactory("channel-state-unspilling")); injectChannelStateWriterIntoChannels(); }
invoke是Task的核心方法, 看下都干了啥…
Invoke之前 : beforeInvoke();
Invoke: runMailboxLoop();
Invoke之后: afterInvoke();
清理: cleanUpInvoke();
// map之类的算子... @Override public final void invoke() throws Exception { try { // 初始化行管... beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // [核心] 执行任务... // let the task do its work runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } afterInvoke(); } catch (Throwable invokeException) { failing = !canceled; try { cleanUpInvoke(); } // TODO: investigate why Throwable instead of Exception is used here. catch (Throwable cleanUpException) { Throwable throwable = ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException); ExceptionUtils.rethrowException(throwable); } ExceptionUtils.rethrowException(invokeException); } cleanUpInvoke(); }
构造: OperatorChain 和 执行 实例化Task类的初始化init方法…
protected void beforeInvoke() throws Exception { disposedOperators = false; // Initializing Source: Socket Stream -> Flat Map (1/1)#0. // Initializing Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)#0. LOG.debug("Initializing {}.", getName()); operatorChain = new OperatorChain<>(this, recordWriter); // mainOperator = {StreamSource@6752} // ctx = null // canceledOrStopped = false // hasSentMaxWatermark = false // userFunction = {SocketTextStreamFunction@6754} // functionsClosed = false // chainingStrategy = {ChainingStrategy@6755} "HEAD" // container = {SourceStreamTask@6554} "Source: Socket Stream -> Flat Map (1/1)#0" // config = {StreamConfig@6756} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Source: Socket Stream-1 -> Flat Map-2, typeNumber=0, outputPartitioner=FORWARD, bufferTimeout=-1, outputTag=null)]\nOperator: SimpleUdfStreamOperatorFactory\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{2=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 1\nOutput names: [(Flat Map-2 -> Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-4, typeNumber=0, outputPartitioner=HASH, bufferTimeout=-1, outputTag=null)]\nPartitioning:\n\t4: HASH\nChained subtasks: []\nOperator: SimpleUdfStreamOperatorFactory\nState Monitoring: false}" // output = {CountingOutput@6757} // runtimeContext = {StreamingRuntimeContext@6758} // stateKeySelector1 = null // stateKeySelector2 = null // stateHandler = null // timeServiceManager = null // metrics = {OperatorMetricGroup@6759} // latencyStats = {LatencyStats@6760} // processingTimeService = {ProcessingTimeServiceImpl@6761} // combinedWatermark = -9223372036854775808 // input1Watermark = -9223372036854775808 // input2Watermark = -9223372036854775808 mainOperator = operatorChain.getMainOperator(); // 执行任务初始化操作. // task specific initialization init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // -------- Invoke -------- // Invoking Source: Socket Stream -> Flat Map (1/1)#0 LOG.debug("Invoking {}", getName()); // 我们需要确保open()中安排的所有触发器在所有操作符打开之前都不能执行 // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened actionExecutor.runThrowing( () -> { SequentialChannelStateReader reader = getEnvironment() .getTaskStateManager() .getSequentialChannelStateReader(); // TODO: for UC rescaling, reenable notifyAndBlockOnCompletion for non-iterative // jobs reader.readOutputData(getEnvironment().getAllWriters(), false); operatorChain.initializeStateAndOpenOperators( createStreamTaskStateInitializer()); channelIOExecutor.execute( () -> { try { reader.readInputData(getEnvironment().getAllInputGates()); } catch (Exception e) { asyncExceptionHandler.handleAsyncException( "Unable to read channel state", e); } }); for (InputGate inputGate : getEnvironment().getAllInputGates()) { inputGate .getStateConsumedFuture() .thenRun( () -> mainMailboxExecutor.execute( inputGate::requestPartitions, "Input gate request partitions")); } }); isRunning = true; }
public void runMailboxLoop() throws Exception { // runMailboxLoop ?? // mailboxProcessor.runMailboxLoop(); } // 运行邮箱处理循环。 这是完成主要工作的地方。 /** Runs the mailbox processing loop. This is where the main work is done. */ public void runMailboxLoop() throws Exception { final TaskMailbox localMailbox = mailbox; Preconditions.checkState( localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!"); assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!"; final MailboxController defaultActionContext = new MailboxController(this); // 邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务... while (isMailboxLoopRunning()) { // 在默认操作可用之前,阻塞的`processMail`调用将不会返回。 // The blocking `processMail` call will not return until default action is available. processMail(localMailbox, false); if (isMailboxLoopRunning()) { // 邮箱默认操作在StreamTask构造器中指定,为 processInput mailboxDefaultAction.runDefaultAction( // 根据需要在默认操作中获取锁 // lock is acquired inside default action as needed defaultActionContext); } } }
protected void afterInvoke() throws Exception { LOG.debug("Finished task {}", getName()); getCompletionFuture().exceptionally(unused -> null).join(); final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>(); // 以 chain effect 方式关闭所有运算符 // close all operators in a chain effect way operatorChain.closeOperators(actionExecutor); // 确保没有进一步的检查点和通知操作发生。 //同时,这可以确保在任何“常规”出口时 // make sure no further checkpoint and notification actions happen. // at the same time, this makes sure that during any "regular" exit where still actionExecutor.runThrowing( () -> { // 确保没有新的计时器 // make sure no new timers can come FutureUtils.forward(timerService.quiesce(), timersFinishedFuture); // 让邮箱执行拒绝从这一点开始的所有新信件 // let mailbox execution reject all new letters from this point mailboxProcessor.prepareClose(); // 仅在关闭所有运算符后将StreamTask设置为not running! // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; }); // 处理剩余邮件;无法排队发送新邮件 // processes the remaining mails; no new mails can be enqueued mailboxProcessor.drain(); // 确保所有计时器都完成 // make sure all timers finish timersFinishedFuture.get(); LOG.debug("Closed operators for task {}", getName()); // 确保刷新了所有缓冲数据 // make sure all buffered data is flushed operatorChain.flushOutputs(); // 尝试释放操作符,使dispose调用中的失败仍然会导致计算失败 // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail disposeAllOperators(); }
释放各种资源…
protected void cleanUpInvoke() throws Exception { getCompletionFuture().exceptionally(unused -> null).join(); // clean up everything we initialized isRunning = false; // Now that we are outside the user code, we do not want to be interrupted further // upon cancellation. The shutdown logic below needs to make sure it does not issue calls // that block and stall shutdown. // Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager // process) as a backup in case some shutdown procedure blocks outside our control. setShouldInterruptOnCancel(false); // clear any previously issued interrupt for a more graceful shutdown Thread.interrupted(); // stop all timers and threads Exception suppressedException = runAndSuppressThrowable(this::tryShutdownTimerService, null); // stop all asynchronous checkpoint threads suppressedException = runAndSuppressThrowable(cancelables::close, suppressedException); suppressedException = runAndSuppressThrowable(this::shutdownAsyncThreads, suppressedException); // we must! perform this cleanup suppressedException = runAndSuppressThrowable(this::cleanup, suppressedException); // if the operators were not disposed before, do a hard dispose suppressedException = runAndSuppressThrowable(this::disposeAllOperators, suppressedException); // release the output resources. this method should never fail. suppressedException = runAndSuppressThrowable(this::releaseOutputResources, suppressedException); suppressedException = runAndSuppressThrowable(channelIOExecutor::shutdown, suppressedException); suppressedException = runAndSuppressThrowable(mailboxProcessor::close, suppressedException); if (suppressedException != null) { throw suppressedException; } }
Checkpoint相关方法
名称 | 描述 |
---|---|
triggerCheckpointAsync | 此方法由检查点协调器异步调用以触发检查点。 |
triggerCheckpointOnBarrier | 在所有 input streams 上接收到检查点屏障而触发检查点时,将调用此方法。 |
abortCheckpointOnBarrier | 在接收一些checkpoint barriers 的结果时, 放弃checkpoint … |
notifyCheckpointCompleteAsync | 通知checkpoint完成 |
notifyCheckpointAbortAsync | 通知notifyCheckpointAbortAsync取消 |
通过akka通知触发Checkpoint操作.
@Override public Future<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { CompletableFuture<Boolean> result = new CompletableFuture<>(); mainMailboxExecutor.execute( () -> { latestAsyncCheckpointStartDelayNanos = 1_000_000 * Math.max( 0, System.currentTimeMillis() - checkpointMetaData.getTimestamp()); try { // 触发Checkpoint操作 result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions)); } catch (Exception ex) { // Report the failure both via the Future result but also to the mailbox result.completeExceptionally(ex); throw ex; } }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions); return result; }
triggerCheckpoint
triggerCheckpoint会调用performCheckpoint开始执行Checkpoint .
private boolean triggerCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { try { // 如果我们注入检查点,则不对齐 // No alignment if we inject a checkpoint CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder() .setAlignmentDurationNanos(0L) .setBytesProcessedDuringAlignment(0L); // 初始化Checkpoint subtaskCheckpointCoordinator.initCheckpoint( checkpointMetaData.getCheckpointId(), checkpointOptions); // 执行 Checkpoint操作... boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); if (!success) { declineCheckpoint(checkpointMetaData.getCheckpointId()); } return success; } catch (Exception e) { // propagate exceptions only if the task is still in "running" state if (isRunning) { throw new Exception( "Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e); } else { LOG.debug( "Could not perform checkpoint {} for operator {} while the " + "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e); return false; } } }
private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws Exception { LOG.debug( "Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); if (isRunning) { actionExecutor.runThrowing( () -> { if (checkpointOptions.getCheckpointType().isSynchronous()) { setSynchronousSavepointId( checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType().shouldIgnoreEndOfInput()); if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) { advanceToEndOfEventTime(); } } else if (activeSyncSavepointId != null && activeSyncSavepointId < checkpointMetaData.getCheckpointId()) { activeSyncSavepointId = null; operatorChain.setIgnoreEndOfInput(false); } // 交由subtaskCheckpointCoordinator 进行checkpointState 操作... subtaskCheckpointCoordinator.checkpointState( checkpointMetaData, checkpointOptions, checkpointMetrics, operatorChain, this::isRunning); }); return true; } else { actionExecutor.runThrowing( () -> { // we cannot perform our checkpoint - let the downstream operators know that // they // should not wait for any input from this operator // we cannot broadcast the cancellation markers on the 'operator chain', // because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); recordWriter.broadcastEvent(message); }); return false; } }
@Override public void triggerCheckpointOnBarrier( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws IOException { try { // 执行Checkpoint if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics)) { if (isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) { runSynchronousSavepointMailboxLoop(); } } } catch (CancelTaskException e) { LOG.info( "Operator {} was cancelled while performing checkpoint {}.", getName(), checkpointMetaData.getCheckpointId()); throw e; } catch (Exception e) { throw new IOException( "Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e); } }
取消CheckpointOnBarrier
@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
resetSynchronousSavepointId(checkpointId, false);
subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
}
@Override
public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
return notifyCheckpointOperation(
() -> notifyCheckpointComplete(checkpointId),
String.format("checkpoint %d complete", checkpointId));
}
@Override
public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
return notifyCheckpointOperation(
() -> {
resetSynchronousSavepointId(checkpointId, false);
subtaskCheckpointCoordinator.notifyCheckpointAborted(
checkpointId, operatorChain, this::isRunning);
},
String.format("checkpoint %d aborted", checkpointId));
}
@Override
public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event)
throws FlinkException {
try {
mainMailboxExecutor.execute(
() -> operatorChain.dispatchOperatorEvent(operator, event),
"dispatch operator event");
} catch (RejectedExecutionException e) {
// this happens during shutdown, we can swallow this
}
}
建立 channels, 操作的 的输入和输出进行打通.
private void injectChannelStateWriterIntoChannels() {
final Environment env = getEnvironment();
final ChannelStateWriter channelStateWriter =
subtaskCheckpointCoordinator.getChannelStateWriter();
for (final InputGate gate : env.getAllInputGates()) {
gate.setChannelStateWriter(channelStateWriter);
}
for (ResultPartitionWriter writer : env.getAllWriters()) {
if (writer instanceof ChannelStateHolder) {
((ChannelStateHolder) writer).setChannelStateWriter(channelStateWriter);
}
}
}
在invoke方法中的runMailboxLoop 会调用mailboxProcessor.runMailboxLoop();
获取默认的 mailboxDefaultAction 执行runDefaultAction 操作…
// 运行邮箱处理循环。 这是完成主要工作的地方。 /** Runs the mailbox processing loop. This is where the main work is done. */ public void runMailboxLoop() throws Exception { final TaskMailbox localMailbox = mailbox; Preconditions.checkState( localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!"); assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!"; final MailboxController defaultActionContext = new MailboxController(this); // 邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务... while (isMailboxLoopRunning()) { // 在默认操作可用之前,阻塞的`processMail`调用将不会返回。 // The blocking `processMail` call will not return until default action is available. processMail(localMailbox, false); if (isMailboxLoopRunning()) { // 邮箱默认操作在StreamTask构造器中指定,为 processInput mailboxDefaultAction.runDefaultAction( // 根据需要在默认操作中获取锁 // lock is acquired inside default action as needed defaultActionContext); } } }
在这里有个疑问, runDefaultAction是什么.
在StreamTask构造方法中构造MailboxProcessor的时候, 有指定默认的runDefaultAction
// todo [重点关注]
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
所以默认的实现是processInput .
processInput方法实现任务的默认操作(例如,从输入中处理一个事件)。实现应该(通常)是非阻塞的。
/** * * 此方法实现任务的默认操作(例如,处理来自输入的一个事件)。 (通常)实现应是非阻塞的。 * * This method implements the default action of the task (e.g. processing one event from the * input). Implementations should (in general) be non-blocking. * * 控制器对象,用于操作和流任务之间的协作交互。 * @param controller controller object for collaborative interaction between the action and the stream task. * * @throws Exception on any problems in the action. */ protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { // 获取 输入 Processor // 有三种 : // StreamOneInputProcessor // StreamTwoInputProcessor // StreamMultipleInputProcessor InputStatus status = inputProcessor.processInput(); if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) { return; } if (status == InputStatus.END_OF_INPUT) { controller.allActionsCompleted(); return; } CompletableFuture<?> jointFuture = getInputOutputJointFuture(status); MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction(); assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume)); }
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor#processInput
@Override
public InputStatus processInput() throws Exception {
// StreamTaskInput#emitNext ???
// input 直接发送数据给 output
// StreamTaskNetworkInput#emitNext
// org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#emitNext
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
endOfInputAware.endInput(input.getInputIndex() + 1);
}
return status;
}
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#emitNext @Override public InputStatus emitNext(DataOutput<T> output) throws Exception { while (true) { // get the stream element from the deserializer if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { //todo processElement processElement(deserializationDelegate.getInstance(), output); return InputStatus.MORE_AVAILABLE; } } Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext(); if (bufferOrEvent.isPresent()) { // return to the mailbox after receiving a checkpoint barrier to avoid processing of // data after the barrier before checkpoint is performed for unaligned checkpoint mode if (bufferOrEvent.get().isBuffer()) { processBuffer(bufferOrEvent.get()); } else { processEvent(bufferOrEvent.get()); return InputStatus.MORE_AVAILABLE; } } else { if (checkpointedInputGate.isFinished()) { checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available"); return InputStatus.END_OF_INPUT; } return InputStatus.NOTHING_AVAILABLE; } } }
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput#processElement // 处理任务... private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception { if (recordOrMark.isRecord()) { // [ 重点 ] 如果是数据 // OneInputStreamTask $ StreamTaskNetworkOutput#emitRecord output.emitRecord(recordOrMark.asRecord()); } else if (recordOrMark.isWatermark()) { // 如果是 Watermark ... statusWatermarkValve.inputWatermark( recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output); } else if (recordOrMark.isLatencyMarker()) { // 如果是 迟到的数据 output.emitLatencyMarker(recordOrMark.asLatencyMarker()); } else if (recordOrMark.isStreamStatus()) { // 如果是 StreamStatus statusWatermarkValve.inputStreamStatus( recordOrMark.asStreamStatus(), flattenedChannelIndices.get(lastChannel), output); } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } }
如果是 map 算子, emitRecord 应该在 OneInputStreamTask.java 调用
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
// 转换操作
// 如果是map之类的算子, processElement应该在 StreamMap.java调用
operator.processElement(record);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// userFunction.map(element.getValue()) 就是用户定义的MapFunction里面的map方法
// 将element.getValue() 用用户自定义的map方法里面的内容进行处理...
output.collect(element.replace(userFunction.map(element.getValue())));
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。