赞
踩
本页描述了Flink的数据源API及其背后的概念和架构,不涉及代码。
source有三个核心的组件组成: Splits, SplitEnumerator,SourceReader.
**
总体流程:SplitEnumerator-> [split01,split02…] <-SourceReader发起请求
**
有界source读取的时候,由SplitEnumerator生成数据分片集合,集合的分片数量是有限的。
无解的source读取的时候,由SplitEnumerator生成数据分片的集合也是无限的,但是SplitEnumerator会源源不断生成分片,并将分片放入集合。
sourceReader去分片集合中读取数据,读取是并行的,见上图。
source代码具有要读取的目录的 URI/路径,以及定义如何解析文件的格式(format)。
此源的工作方式与上述相同,除了SplitEnumerator从不响应NoMoreSplits并定期列出给定 URI/Path 下的内容以检查新文件。一旦找到新文件,它就会为它们生成新的拆分,并可以将它们分配给可用的 SourceReader。
这意味着SplitEnumerator和SourceReader保持了类似于socket长链接的功能。
source需要指定订阅的topic列表(支持正则),且定义一个反序列化器来解析kafka中的数据。
与上面相同,只是每个split分片(主题分区)都有一个定义的结束偏移量。一旦SourceReader到达 Split 的结束偏移量,它就会完成该 Split。所有的split都达到偏移量位置的时候,读取结束。
Source API 是一个工厂风格的接口,用于创建以下组件。
1.Split Enumerator
2.Source Reader
3.Split Serializer
4.Enumerator Checkpoint Serialize
除此之外,Source 还提供了源的有界属性,以便 Flink 可以选择合适的模式来运行 Flink 作业。
Source 的实现是可序列化的,因为 Source 实例在运行时被序列化并上传到 Flink 集群。
首先应该明确一点,SplitEnumerator应该是 Source 的“大脑”。
class MySplitEnumerator implements SplitEnumerator<MySplit> { private final long DISCOVER_INTERVAL = 60_000L; /** * A method to discover the splits. */ private List<MySplit> discoverSplits() {...} @Override public void start() { ... enumContext.callAsync(this::discoverSplits, splits -> { Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>(); int parallelism = enumContext.currentParallelism(); for (MockSourceSplit split : splits) { int owner = split.splitId().hashCode() % parallelism; assignments.computeIfAbsent(owner, new ArrayList<>()).add(split); } enumContext.assignSplits(new SplitsAssignment<>(assignments)); }, 0L, DISCOVER_INTERVAL); ... } ... }
SourceReader是一个在task Manager中运行的组件,用于处理一个分片split.
SourceReader公开了一个基于pull的消费接口。Flink task线程任务不断在循环中调用pollNext(ReaderOutput)获取sourceReade的数据. 从pollNext的返回值可以看出sourceReader的状态,状态如下:
1. MORE_AVAILABLE:SourceReader 可立即获得更多记录。
2. NOTHING_AVAILABLE-:SourceReader 目前没有更多可用记录,但将来可能会有更多记录。
3. END_OF_INPUT: SourceReader 到达数据末尾。这意味着可以关闭 SourceReader。
需要的话,SourceReader可以在一次 pollNext() 调用中发出多条记录的。例如,有时外部系统以块的粒度工作。一个块可能包含多条记录,但源只能在块边界处检查点。在这种情况下,SourceReader可以一次将一个块中的所有记录发送到ReaderOutput. 但是,除非必要,否则SourceReader实现应避免在单个pollNext(ReaderOutput)调用中发出多个记录。这是因为从SourceReader事件循环中轮询的任务线程不能阻塞。
SourceReader的所有的state都应该在snapshotState()调用时返回的SourceSplits中维护。 这样做允许SourceSplits在需要时被重新分配给其他sourcereader。
SourceReaderContext在SourceReader创建时提供给Source。Source将上下文传递给SourceReader实例。
SourceReader可以通过SourceReaderContext将SourceEvent发送到它的SplitEnumerator,
Source的一个典型设计模式是让sourcereader向SplitEnumerator报告他们的本地信息,SplitEnumerator拥有全局视图来做出决策。
SourceReader API是一个低级API,它允许用户手动处理拆分,并拥有自己的线程模型来获取和移交记录。为了方便SourceReader的实现,Flink提供了一个SourceReaderBase类,它大大减少了编写SourceReader所需的工作量。强烈建议连接器开发人员利用SourceReaderBase,而不是从头编写sourcereader。更多细节请查看 Split Reader AP部分。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Source mySource = new MySource(...);
DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");
...
注意:在DataStream API的创建过程中,WatermarkStrategy被传递给Source,并创建TimestampAssigner和WatermarkGenerator。
The TimestampAssigner and WatermarkGenerator run transparently as part of the ReaderOutput(or SourceOutput) so source implementors do not have to implement any timestamp extraction and watermark generation code.
事件事件的提取有两种:
核心 SourceReader API 是完全异步的,需要实现手动管理异步拆分读取。然而,在实践中,大多数源执行阻塞操作,例如阻塞客户端(例如)上的poll()KafkaConsumer调用,或阻塞分布式文件系统(HDFS、S3 等)上的 I/O 操作。为了使其与异步 Source API 兼容,这些阻塞(同步)操作需要在单独的线程中发生,这些线程将数据交给阅读器的异步部分。
SplitReader是用于简单同步读取/基于轮询的源实现的高级API,例如文件读取、Kafka 等。
核心是SourceReaderBase类,它接受SplitReader并创建运行 SplitReader 的 fetcher 线程,支持不同的消费线程模型。
SplitReader只有三个方法。
SourceReaderBase获取splits应该是多线程的方式,每个线程处理一个split分片,线程模型的选择依赖于SplitFetcherManager ,SplitFetcherManager 创建以及管理一个SplitFetchers 线程池,池中每个splitfetcher都使用一个SplitReader进行读取一个split分片。
SourceReaderBase的实现称为一个SplitReader。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。