当前位置:   article > 正文

(十一)Flink Datastream API 编程指南 数据源(Data Sources)_flink的数据源

flink的数据源

本页描述了Flink的数据源API及其背后的概念和架构。如果您对Flink中的数据源如何工作感兴趣,或者想实现一个新的数据源,请阅读本文。

如果您正在寻找预定义的源连接器,请查看连接器文档

数据源的概念

核心组件

一个数据源有三个核心组件:Splits、SplitEnumerator和SourceReader。

  • Split是源使用的数据的一部分,比如文件或日志分区。拆分是源分配工作和并行读取数据的粒度。
  • SourceReader请求Split并处理它们,例如通过读取Split表示的文件或日志分区。SourceReader在sourceoperator中的任务管理器上并行运行,并生成并行的事件/记录流。
  • SplitEnumerator生成split并将它们分配给sourcereader。它作为JobManager(作业管理器)上的单个实例运行,负责维护待处理的拆分积压,并以平衡的方式将它们分配给读者。

Source类是将上述三个组件联系在一起的API入口点。
在这里插入图片描述

统一跨流和批处理

数据源API以统一的方式同时支持无界流源和有界批处理源。
这两种情况之间的差别很小:在有界/批处理情况下,Enumerator生成固定的拆分集合,并且每个split必须是有限的。在无界流的情况下,两者之一不为真(splits不是有限的,或者枚举器不断生成新的splits)。

举例

下面是一些简化的概念性示例,以说明数据源组件在流和批处理情况下是如何交互的。
注意,这并没有准确描述Kafka和File source实现是如何工作的;为了便于说明,部分内容被简化。

有界的源文件

源具有要读取的目录的URI/Path,以及定义如何解析文件的Format。

  • Split是一个文件,或者文件的一个区域(如果数据格式支持对文件进行分割)。
  • SplitEnumerator列出给定目录路径下的所有文件。它将Split分配给下一个请求Split的读取器。一旦分配了所有的split,它就用NoMoreSplits响应请求。
  • SourceReader请求一个Split,并读取分配的Split(文件或文件区域),并使用给定的格式解析它。如果它没有得到另一个Split消息,而是一个NoMoreSplits消息,它将结束。

无界流源文件

这个源的工作方式与上面描述的相同,只是SplitEnumerator从不响应NoMoreSplits,并定期列出给定URI/Path下的内容,以检查新文件。一旦它找到了新文件,它就会为它们生成新的split,并将它们分配给可用的sourcereader。

无界流Kafka源

该源有一个Kafka Topic(或Topic或Topic regex列表)和一个反序列化器来解析记录。

  • Split是一个Kafka Topic分区。
  • SplitEnumerator连接到代理,列出订阅主题中涉及的所有主题分区。枚举器可以有选择地重复此操作以发现新添加的主题/分区。
  • SourceReader使用KafkaConsumer读取分配的Splits(Topic Partitions),并使用提供的反序列化器反序列化记录。拆分(Topic Partitions)没有结束,因此读取器永远不会到达数据的结尾。

有界流kafka源

与上面相同,不同的是每个Split (Topic Partition)都有一个定义好的结束偏移量。一旦SourceReader到达Split的结束偏移量,它就完成Split。一旦所有分配的拆分完成,SourceReader也完成。

数据源API

本节描述FLIP-27中引入的新Source API的主要接口,并为开发人员提供关于Source开发的提示。

Source

Source API是一个工厂风格的接口,用于创建以下组件。

  • Split Enumerator
  • Source Reader
  • Split Serializer
  • Enumerator Checkpoint Serializer

除此之外,Source还提供了Source的有界属性,以便Flink可以选择合适的模式来运行Flink作业。
Source实现应该是可序列化的,因为Source实例在运行时被序列化并上传到Flink集群。

SplitEnumerator

SplitEnumerator有望成为Source的“大脑”。SplitEnumerator的典型实现如下:

  • SourceReader登记处理
  • SourceReader失败处理
    • 当SourceReader失败时,将调用addSplitsBack()方法。SplitEnumerator应该收回未被失败的SourceReader确认的split赋值。
  • SourceEvent处理
    • SourceEvents是SplitEnumerator和SourceReader之间发送的自定义事件。实现可以利用这种机制来执行复杂的协调。
  • Split发现和分配
    • SplitEnumerator可以将split分配给SourceReader来响应各种事件,包括发现新的split、新SourceReader注册、SourceReader失败等。

SplitEnumerator可以在SplitEnumeratorContext的帮助下完成上述工作,SplitEnumerator在创建或恢复SplitEnumerator时被提供给Source。SplitEnumeratorContext允许SplitEnumerator检索阅读器的必要信息并执行协调操作。Source实现期望将SplitEnumeratorContext传递给SplitEnumerator实例。

当SplitEnumerator实现只在调用其方法时采取协调操作时,它可以很好地以反应式的方式工作,但有些SplitEnumerator实现可能希望采取主动的操作。例如,SplitEnumerator可能希望定期运行拆分发现,并将新的拆分分配给sourcereader。这样的实现可能会发现SplitEnumeratorContext的callAsync()方法很方便。下面的代码片段展示了SplitEnumerator实现如何在不维护自己线程的情况下实现这一点。

官网伪代码

class MySplitEnumerator implements SplitEnumerator<MySplit> {
    private final long DISCOVER_INTERVAL = 60_000L;

    /**
     * A method to discover the splits.
     */
    private List<MySplit> discoverSplits() {...}
    
    @Override
    public void start() {
        ...
        enumContext.callAsync(this::discoverSplits, splits -> {
            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
            int parallelism = enumContext.currentParallelism();
            for (MockSourceSplit split : splits) {
                int owner = split.splitId().hashCode() % parallelism;
                assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);
            }
            enumContext.assignSplits(new SplitsAssignment<>(assignments));
        }, 0L, DISCOVER_INTERVAL);
        ...
    }
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

SourceReader

SourceReader是在任务管理器中运行的一个组件,用于使用来自拆分的记录。

SourceReader公开了一个基于拉的消费接口。Flink任务在循环中不断调用pollNext(ReaderOutput)来轮询SourceReader中的记录。pollNext(ReaderOutput)方法的返回值指示源读取器的状态。

  • MORE_AVAILABLE - SourceReader立即有更多可用的记录。
  • NOTHING_AVAILABLE - SourceReader此时没有更多可用的记录,但将来可能有更多的记录。
  • END_OF_INPUT - SourceReader耗尽了所有记录,到达了数据的末端。这意味着SourceReader可以关闭。

出于性能的考虑,向pollNext(ReaderOutput)方法提供了一个ReaderOutput,因此如果需要,SourceReader可以在一次调用pollNext()中发出多条记录。例如,有时外部系统以块的粒度工作。一个块可以包含多个记录,但是源只能在块边界处检查点。在这种情况下,SourceReader可以一次将一个块中的所有记录发送给ReaderOutput。但是,除非必要,SourceReader实现应该避免在单个pollNext(ReaderOutput)调用中发出多条记录。这是因为从SourceReader轮询的任务线程在事件循环中工作,不能被阻塞。

SourceReader的所有状态都应该在snapshotState()调用时返回的SourceSplits中维护。这样做允许SourceSplits在需要时被重新分配给其他sourcereader。

SourceReaderContext在SourceReader创建时提供给Source。期望Source将上下文传递给SourceReader实例。SourceReader可以通过SourceReaderContext将SourceEvent发送到它的SplitEnumerator。Source的一个典型设计模式是让sourcereader向SplitEnumerator报告他们的本地信息,SplitEnumerator拥有全局视图来做出决策。

SourceReader API是一个低级API,它允许用户手动处理拆分,并拥有自己的线程模型来获取和移交记录。为了方便SourceReader的实现,Flink提供了一个SourceReaderBase类,它大大减少了编写SourceReader所需的工作量。强烈建议连接器开发人员利用SourceReaderBase,而不是从头编写sourcereader。更多细节请查看拆分阅读器API部分。

使用源

为了从源创建一个DataStream,需要将源传递给StreamExecutionEnvironment。例如,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Source mySource = new MySource(...);

DataStream<Integer> stream = env.fromSource(
        mySource,
        WatermarkStrategy.noWatermarks(),
        "MySourceName");
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Split Reader API (拆分阅读器API)

核心SourceReader API是完全异步的,需要实现手动管理异步拆分读取。然而,在实践中,大多数源执行阻塞操作,如阻塞客户端poll()调用(例如KafkaConsumer),或阻塞分布式文件系统(HDFS, S3,…)上的I/O操作。为了使其与异步Source API兼容,这些阻塞(同步)操作需要在单独的线程中发生,这些线程将数据传递给读取器的异步部分。

SplitReader是用于简单的同步读取/轮询源实现的高级API,比如文件读取、Kafka等。

核心是SourceReaderBase类,它接受SplitReader并创建运行SplitReader的获取线程,支持不同的消耗线程模型。

SplitReader

SplitReader API只有三个方法:

  • 一个阻塞获取方法返回一个RecordsWithSplitIds
  • 处理拆分更改的非阻塞方法。
  • 一个非阻塞唤醒方法,用来唤醒阻塞获取操作。

SplitReader只专注于从外部系统读取记录,因此与SourceReader相比要简单得多。请查看该类的Java文档以获得更多详细信息。

SourceReaderBase

常见的SourceReader实现如下:

  • 拥有一个线程池,以阻塞的方式从外部系统的拆分中获取。
  • 处理内部获取线程和其他方法调用(如pollNext(ReaderOutput))之间的同步。
  • 保持每个分割的水印以使水印对齐。
  • 为检查点维护每个分割的状态。

为了减少编写一个新的SourceReader的工作量,Flink提供了一个SourceReaderBase类作为SourceReader的基本实现。SourceReaderBase已经开箱即用完成了上述所有工作。要编写一个新的SourceReader,只需让SourceReader实现从SourceReaderBase继承,填充一些方法并实现一个高级SplitReader。

SplitFetcherManager

SourceReaderBase支持一些开箱即用的线程模型,这取决于它所使用的SplitFetcherManager的行为。SplitFetcherManager帮助创建和维护一个splitfetcher池,每个splitfetcher都使用SplitReader进行抓取。它还决定了如何为每个拆分获取器分配拆分。

例如,如下所示,SplitFetcherManager可能有固定数量的线程,每个线程从分配给SourceReader的一些拆分中抓取。
在这里插入图片描述

下面的代码片段实现了这个线程模型。

/**
 * A SplitFetcherManager that has a fixed size of split fetchers and assign splits 
 * to the split fetchers based on the hash code of split IDs.
 */
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
        extends SplitFetcherManager<E, SplitT> {
    private final int numFetchers;

    public FixedSizeSplitFetcherManager(
            int numFetchers,
            FutureNotifier futureNotifier,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        super(futureNotifier, elementsQueue, splitReaderSupplier);
        this.numFetchers = numFetchers;
        // Create numFetchers split fetchers.
        for (int i = 0; i < numFetchers; i++) {
            startFetcher(createSplitFetcher());
        }
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // Group splits by their owner fetchers.
        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
        splitsToAdd.forEach(split -> {
            int ownerFetcherIndex = split.hashCode() % numFetchers;
            splitsByFetcherIndex
                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
                    .add(split);
        });
        // Assign the splits to their owner fetcher.
        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

使用这个线程模型的SourceReader可以像下面这样创建:

public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    public FixedFetcherSizeSourceReader(
            FutureNotifier futureNotifier,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                futureNotifier,
                elementsQueue,
                new FixedSizeSplitFetcherManager<>(
                        config.getInteger(SourceConfig.NUM_FETCHERS),
                        futureNotifier,
                        elementsQueue,
                        splitFetcherSupplier),
                recordEmitter,
                config,
                context);
    }

    @Override
    protected void onSplitFinished(Collection<String> finishedSplitIds) {
        // Do something in the callback for the finished splits.
    }

    @Override
    protected SplitStateT initializedState(SplitT split) {
        ...
    }

    @Override
    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
        ...
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

SourceReader实现还可以在SplitFetcherManager和SourceReaderBase上轻松地实现它们自己的线程模型。

Event Time and Watermarks

事件时间分配和水印生成作为数据源的一部分发生。离开源读取器的事件流具有事件时间戳,并且(在流执行期间)包含水印。有关事件时间和水印的介绍,请参阅及时流处理。

基于遗留SourceFunction的应用程序通常通过stream. assigntimestampsandwatermark (WatermarkStrategy)在单独的后续步骤中生成时间戳和水印。这个函数不应该与新的源一起使用,因为时间戳已经被分配,并且它将覆盖以前的分裂感知水印。

API

在DataStream API的创建过程中,WatermarkStrategy被传递给Source,并创建TimestampAssigner和WatermarkGenerator。

environment.fromSource(
    Source<OUT, ?, ?> source,
    WatermarkStrategy<OUT> timestampsAndWatermarks,
    String sourceName);
  • 1
  • 2
  • 3
  • 4

TimestampAssigner和WatermarkGenerator作为ReaderOutput(或SourceOutput)的一部分透明地运行,因此源代码实现者不必实现任何时间戳提取和水印生成代码。

事件的时间戳

事件时间戳的分配分为两个步骤:

  • SourceReader可以通过调用SourceOutput将源记录的时间戳附加到事件上。收集(事件、时间戳)。这只适用于基于记录和有时间戳的数据源,如Kafka、Kinesis、Pulsar或Pravega。不基于带有时间戳的记录(如文件)的源没有源记录时间戳。此步骤是源连接器实现的一部分,并没有由使用源的应用程序参数化。
  • 由应用程序配置的TimestampAssigner分配最终的时间戳。TimestampAssigner会看到原始源记录的时间戳和事件。赋值器可以使用源记录时间戳或访问事件的某个字段,获得最终的事件时间戳。

这个两步方法允许用户引用源系统的时间戳和事件数据中的时间戳作为事件时间戳。

注意:当使用没有源记录时间戳(如文件)的数据源并选择源记录时间戳作为最终事件时间戳时,事件将获得一个默认时间戳等于LONG_MIN(=-9,223,372,036,854,775,808)。

水印生成

水印生成器仅在流执行期间活动。批处理执行使水印生成器失效;下面描述的所有相关操作都变成了有效的无操作操作。
数据源API支持每个分割单独运行水印生成器。这使得Flink可以观察每次拆分的事件时间进度,这对于正确处理事件时间倾斜和防止空闲分区阻碍整个应用程序的事件时间进度非常重要。

在这里插入图片描述
当使用Split Reader API实现源连接器时,这是自动处理的。所有基于Split Reader API的实现都具有分离感知的开箱即用水印。

对于使用分离感知水印生成的较低级SourceReader API的实现,该实现必须从不同的拆分输出事件到不同的输出:Split-local SourceOutputs。可以通过createOutputForSplit(splitId)和releaseOutputForSplit(splitId)方法在主ReaderOutput上创建和释放Split-local输出。详细信息请参考类和方法的javadoc。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号