赞
踩
先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的
,这里以OneInputStreamTask为例进行说明。
如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。
OneInputStreamTask public void init() throws Exception { StreamConfig configuration = getConfiguration(); int numberOfInputs = configuration.getNumberOfInputs(); if (numberOfInputs > 0) { // 创建CheckpointedInputGate CheckpointedInputGate inputGate = createCheckpointedInputGate(); TaskIOMetricGroup taskIOMetricGroup = getEnvironment() .getMetricGroup().getIOMetricGroup(); taskIOMetricGroup.gauge("checkpointAlignmentTime", inputGate::getAlignmentDurationNanos); // 创建DataOutput组件 DataOutput<IN> output = createDataOutput(); StreamTaskInput<IN> input = createTaskInput(inputGate, output); // 创建StreamOneInputProcessor inputProcessor = new StreamOneInputProcessor<>( input, output, getCheckpointLock(), operatorChain); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
- 创建CheckpointedInputGate:CheckpointedInputGate是对InputGate进行封装,
实现对CheckpointBarrier对齐的功能
。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。- 创建DataOutput组件:在StreamTaskInput中会将
接入的数据 通过DataOutput组件输出到算子链的HeaderOperator
中。- 创建StreamTaskInput组件:用于接收数据,将InputGate和DataOutput作为内部成员,完成对数据的接入和输出。
- 创建StreamOneInputProcessor数据处理器:此组件会被Task线程模型调度并执行,实现周期性地从StreamTaskInput组件中读取数据元素并处理。
小结:
OneInputStreamTask初始化过程中,包括创建StreamTaskInput和DataOutput组件。
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。
StreamTask.processInput()方法定义了处理数据的主要流程。
- 数据最终会通过MailboxProcessor调度与执行
- 调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理
- 调度StreamOneInputProcessor组件,串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件,最终完成数据元素的处理操作。
StreamTask.processInput() protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { InputStatus status = inputProcessor.processInput(); // 上游如果还有数据,则继续等待执行 if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) { return; } // 上游如果没有数据,则发送控制消息到控制器 if (status == InputStatus.END_OF_INPUT) { controller.allActionsCompleted(); return; } CompletableFuture<?> jointFuture = getInputOutputJointFuture(status); MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction(); jointFuture.thenRun(suspendedDefaultAction::resume); }
接下来详细看StreamOneInputProcessor.processInput()
emitNext():通过StreamTaskNetworkInput接收数据元素,并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部,用于将数据元素输出到
算子链中
。
public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
synchronized (lock) {
operatorChain.endHeadOperatorInput(1);
}
}
return status;
}
StreamTaskNetworkInput.emitNext():处理数据逻辑。
//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型, //比如CheckpointBarrier、TaskEvent等事件。 public InputStatus emitNext(DataOutput<T> output) throws Exception { while (true) { // 从Deserializer中获取数据元素 if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); // 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } // 如果result是完整的数据元素,则调用processElement()方法进行处理 if (result.isFullRecord()) { processElement(deserializationDelegate.getInstance(), output); return InputStatus.MORE_AVAILABLE; } } // 从checkpointedInputGate中拉取数据 //如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。 Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext(); // 如果有数据则调用processBufferOrEvent()方法进行处理 if (bufferOrEvent.isPresent()) { processBufferOrEvent(bufferOrEvent.get()); } else { // 如果checkpointedInputGate已关闭,则返回END_OF_INPUT if (checkpointedInputGate.isFinished()) { checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available"); if (!checkpointedInputGate.isEmpty()) { throw new IllegalStateException( "Trailing data in checkpoint barrier handler."); } return InputStatus.END_OF_INPUT; } return InputStatus.NOTHING_AVAILABLE; } } }
StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。
//StreamTaskNetworkInput.processElement() private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception { // StreamRecord类型 if (recordOrMark.isRecord()){ output.emitRecord(recordOrMark.asRecord()); // Watermark类型 } else if (recordOrMark.isWatermark()) { statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel); // LatencyMarker类型 } else if (recordOrMark.isLatencyMarker()) { output.emitLatencyMarker(recordOrMark.asLatencyMarker()); // StreamStatus类型 } else if (recordOrMark.isStreamStatus()) { statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel); } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } }
对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链
中进行处理。
如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator
。
OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {
synchronized (lock) {
//累加器计算消费数量
numRecordsIn.inc();
//通过算子链处理
operator.setKeyContextElement1(record);
operator.processElement(record);
}
}
Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。
《Flink设计与实现:核心原理与源码解析》 – 张利兵
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。