当前位置:   article > 正文

吃透Flink架构:一个新版Connector的实现_flink dynamictablefactory

flink dynamictablefactory

前言

Flink 可以说已经是流计算领域的事实标准,其开源社区发展迅速,提出了很多改进计划(Flink Improvement Proposals,简称 FLIP [1])并不断迭代,几乎每个新的版本在功能、性能和使用便捷性上都有所提高。Flink 提供了丰富的数据连接器(connecotr)来连接各种数据源,内置了 kafka [2]、jdbc [3]、hive [4]、hbase [5]、elasticsearch [6]、file system [7] 等常见的 connector,此外 Flink 还提供了灵活的机制方便开发者开发新的 connector。对于 source connector 的开发,有基于传统的 SourceFunction [8] 的方式和基于 Flink 改进计划 FLIP-27 [9] 的 Source [10] 新架构的方式。本文首先介绍基于 SourceFunction 方式的不足,接着介绍 Source 新架构以及其设计上的深层思考,然后基于 Flink 1.13 ,以从零开发一个简单的 FileSource connector 为例,介绍开发 source connector 的基本要素,尽量做到理论与实践相结合加深大家的理解。

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。流计算 Oceanus 提供了便捷的控制台环境,方便用户编写 SQL 分析语句、ETL 作业或者上传运行自定义 JAR 包,支持作业运维管理。欢迎大家试用,目前还有新用户 1 元秒杀活动,机会难得,不容错过。

Source 旧架构

在 Flink 1.12 之前,开发一个 source connector 通过实现 SourceFunction [8] 接口来完成,官方给出的通用的实现模式如下。当 source 开始发送数据时,run 方法被调用,其参数 SourceContext 用于发送数据。run 方法是一个无限循环,通过一个标识 isRunning 来跳出循环结束 source。批模式和流模式通常需要不同的处理逻辑,例如示例的批模式通过一个计数器来结束批数据。此外,还需要通过 checkpoint 锁来保证状态更新和数据发送的原子性。值得一提的是,Flink 在 SourceFunction 之上抽象出了 InputFormatSourceFunction,开发者只需要实现 InputFormat,批模式 source connector(如 HBase)通常基于 InputFormat 实现,当然 InputFormat 也可以用于流模式,在一定程度上体现了批流融合的思想,但整体上来看至少在接口层面上流批并没有完全一致。

  1. public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
  2. private long count = 0L;
  3. private volatile boolean isRunning = true;
  4. private transient ListState<Long> checkpointedCount;
  5. public void run(SourceContext<T> ctx) {
  6. while (isRunning && count < 1000) {
  7. // this synchronized block ensures that state checkpointing,
  8. // internal state updates and emission of elements are an atomic operation
  9. synchronized (ctx.getCheckpointLock()) {
  10. ctx.collect(count);
  11. count++;
  12. }
  13. }
  14. }
  15. public void cancel() {
  16. isRunning = false;
  17. }
  18. public void initializeState(FunctionInitializationContext context) {
  19. this.checkpointedCount = context
  20. .getOperatorStateStore()
  21. .getListState(new ListStateDescriptor<>("count", Long.class));
  22. if (context.isRestored()) {
  23. for (Long count : this.checkpointedCount.get()) {
  24. this.count += count;
  25. }
  26. }
  27. }
  28. public void snapshotState(
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/429437
推荐阅读
相关标签
  

闽ICP备14008679号