当前位置:   article > 正文

StreamTask数据流:StreamTask能力概述、Flink处理网络数据逻辑

StreamTask数据流:StreamTask能力概述、Flink处理网络数据逻辑

先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的,这里以OneInputStreamTask为例进行说明。

一. StreamTask核心组件与能力

如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。

OneInputStreamTask
public void init() throws Exception {
   StreamConfig configuration = getConfiguration();
   int numberOfInputs = configuration.getNumberOfInputs();
   if (numberOfInputs > 0) {
      // 创建CheckpointedInputGate
      CheckpointedInputGate inputGate = createCheckpointedInputGate();
      TaskIOMetricGroup taskIOMetricGroup = getEnvironment()
          .getMetricGroup().getIOMetricGroup();
      taskIOMetricGroup.gauge("checkpointAlignmentTime", 
                              inputGate::getAlignmentDurationNanos);
      // 创建DataOutput组件
      DataOutput<IN> output = createDataOutput();
      StreamTaskInput<IN> input = createTaskInput(inputGate, output);
      // 创建StreamOneInputProcessor
      inputProcessor = new StreamOneInputProcessor<>(
         input,
         output,
         getCheckpointLock(),
         operatorChain);
   }
   headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
                                       this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
                                         this.inputWatermarkGauge::getValue);
                                     
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 创建CheckpointedInputGate:CheckpointedInputGate是对InputGate进行封装,实现对CheckpointBarrier对齐的功能。此组件可以接入上游Task实例写入指定InputChannel中的Buffer数据。
  2. 创建DataOutput组件:在StreamTaskInput中会将 接入的数据 通过DataOutput组件输出到算子链的HeaderOperator中。
  3. 创建StreamTaskInput组件:用于接收数据,将InputGate和DataOutput作为内部成员,完成对数据的接入和输出。
  4. 创建StreamOneInputProcessor数据处理器:此组件会被Task线程模型调度并执行,实现周期性地从StreamTaskInput组件中读取数据元素并处理。

小结:

OneInputStreamTask初始化过程中,包括创建StreamTaskInput和DataOutput组件。

 
 
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。

二. OneInputStreamTask接入网络数据并处理

StreamTask.processInput()方法定义了处理数据的主要流程。

  1. 数据最终会通过MailboxProcessor调度与执行
  2. 调用StreamOneInputProcessor.processInput()方法完成数据元素的获取和处理
  3. 调度StreamOneInputProcessor组件,串联并运行StreamTaskInput组件、DataOutput组件和OperatorChain组件,最终完成数据元素的处理操作。
StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) 
    throws Exception {
   InputStatus status = inputProcessor.processInput();
   // 上游如果还有数据,则继续等待执行
   if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
      return;
   }
   // 上游如果没有数据,则发送控制消息到控制器
   if (status == InputStatus.END_OF_INPUT) {
      controller.allActionsCompleted();
      return;
   }
   CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
   MailboxDefaultAction.Suspension suspendedDefaultAction = 
      controller.suspendDefaultAction();
   jointFuture.thenRun(suspendedDefaultAction::resume);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

 
接下来详细看StreamOneInputProcessor.processInput()

emitNext():通过StreamTaskNetworkInput接收数据元素,并返回InputStatus判断数据元素是否全部消费完毕。emitNext()会将DataOutput作为参数传递到方法内部,用于将数据元素输出到算子链中

public InputStatus processInput() throws Exception {
   InputStatus status = input.emitNext(output);
   if (status == InputStatus.END_OF_INPUT) {
      synchronized (lock) {
         operatorChain.endHeadOperatorInput(1);
      }
   }
   return status;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

StreamTaskNetworkInput.emitNext():处理数据逻辑。


//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型,
//比如CheckpointBarrier、TaskEvent等事件。

public InputStatus emitNext(DataOutput<T> output) throws Exception {
   while (true) {
      // 从Deserializer中获取数据元素
      if (currentRecordDeserializer != null) {
         DeserializationResult result = 
             currentRecordDeserializer.getNextRecord(deserializationDelegate);
         // 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer 
         if (result.isBufferConsumed()) {
            currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
            currentRecordDeserializer = null;
         }
         // 如果result是完整的数据元素,则调用processElement()方法进行处理
         if (result.isFullRecord()) {
            processElement(deserializationDelegate.getInstance(), output);
            return InputStatus.MORE_AVAILABLE;
         }
      }
      // 从checkpointedInputGate中拉取数据
      //如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。
      Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
      // 如果有数据则调用processBufferOrEvent()方法进行处理
      if (bufferOrEvent.isPresent()) {
         processBufferOrEvent(bufferOrEvent.get());
      } else {
         // 如果checkpointedInputGate已关闭,则返回END_OF_INPUT
         if (checkpointedInputGate.isFinished()) {
            checkState(checkpointedInputGate.getAvailableFuture().isDone(), 
                       "Finished BarrierHandler should be available");
            if (!checkpointedInputGate.isEmpty()) {
               throw new IllegalStateException(
                   "Trailing data in checkpoint barrier handler.");
            }
            return InputStatus.END_OF_INPUT;
         }
         return InputStatus.NOTHING_AVAILABLE;
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

 

三. 处理数据

1. StreamElement类别

StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。

//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutput<T> output) 
   throws Exception {
   // StreamRecord类型
   if (recordOrMark.isRecord()){
      output.emitRecord(recordOrMark.asRecord());
   // Watermark类型
   } else if (recordOrMark.isWatermark()) {
      statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
   // LatencyMarker类型
   } else if (recordOrMark.isLatencyMarker()) {
      output.emitLatencyMarker(recordOrMark.asLatencyMarker());
   // StreamStatus类型
   } else if (recordOrMark.isStreamStatus()) {
      statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), 
         lastChannel);
   } else {
      throw new UnsupportedOperationException("Unknown type of StreamElement");
   }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

 

2. 业务数据处理逻辑

对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链中进行处理。

如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator

OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {
   synchronized (lock) {
      //累加器计算消费数量
      numRecordsIn.inc();
      //通过算子链处理
      operator.setKeyContextElement1(record);
      operator.processElement(record);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

 

四. 小结

Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。

 
 
《Flink设计与实现:核心原理与源码解析》 – 张利兵

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号