赞
踩
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点。
流式数据入库,是大数据和数据湖的典型应用场景。上游的流式数据,如日志,或增量修改,通过数据总线,经过必要的处理后,汇聚并存储于数据湖,供下游的应用(如报表或者商业智能分析)使用。
上述的应用场景通常有如下的痛点,需要整个流程不断的优化:
为了解决上述痛点,我们引入了 Iceberg 作为数据落地的格式。Iceberg 支持 ACID 事务、修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,很好的满足我们的需求。
同时,为了支持流式数据的写入,我们引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink。
下文主要介绍 Flink Iceberg sink 的实现框架和要点。但在这之前,需要先介绍一些实现中用到的 Flink 基本概念。
Flink 使用 DataFrame API 来统一的处理流和批数据。
一个 Flink 程序由 stream 和 transformation 组成:
当 Flink 程序执行时,其被映射成 Streaming Dataflow,由如下的部分组成:
下图为 Flink 官网的示例,展示了一个以 Kafka 作为输入 Source,经过中间两个 transformation,最终通过 sink 输出到 Flink 之外的过程。
Flink 依靠 checkpoint 和基于 snapshot 的恢复机制,保证程序 state 的一致性,实现容错。
Checkpoint 是对分布式的数据流,以及所有 operator 的 state,打 snapshot 的过程。
■ State
一个 operator 的 state,即它包含的所有用于恢复当前状态的信息,可分为两类:
State 的存储位置,可以分为:
如下图所示:
■ Checkpoint
Flink 做 checkpoint 的过程如下:
如下图所示:
■ Barrier
Barrier 是 Flink 做分布式 snapshot 的重要概念。它作为一个系统标记,被插入到数据流中,随真实数据一起,按照数据流的方向,从上游向下游传递。
由于每个 barrier 唯一对应 checkpoint id,所以数据流中的 record 实际被 barrier 分组,如下图所示,barrier n 和 barrier n-1 之间的 record,属于 checkpoint n。
Barrier 的作用是在分布式的数据流中,将 operator 的多个输入流按照 checkpoint对齐(align),如下图所示:
了解了上述 Flink 的基本概念,这些概念又是如何被应用和映射到 Flink Iceberg sink 当中的呢?
如图,Flink Iceberg sink 有两个主要模块和两个辅助模块组成:
■ Writer
■ Committer
社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的原型代码。下载该 patch 放入 master 分支,编译并构建即可。如下的程序展示了如何将该 sink 嵌入到 Flink 数据流中:
- // Configurate catalog
- org.apache.hadoop.conf.Configuration hadoopConf =
- new org.apache.hadoop.conf.Configuration();
- hadoopConf.set(
- org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
- META_STORE_URIS);
- hadoopConf.set(
- org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
- META_STORE_WAREHOUSE);
-
- Catalog icebergCatalog = new HiveCatalog(hadoopConf);
-
- // Create Iceberg table
- Schema schema = new Schema(
- ...
- );
- PartitionSpec partitionSpec = builderFor(schema)...
- TableIdentifier tableIdentifier =
- TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
- // If needed, check the existence of table by loadTable() and drop it
- // before creating it
- icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
-
- // Obtain an execution environment
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
-
- // Enable checkpointing
- env.enableCheckpointing(...);
-
- // Add Source
- DataStream<Map<String, Object>> dataStream =
- env.addSource(source, typeInformation);
-
- // Configure Ieberg sink
- Configuration conf = new Configuration();
- conf.setString(
- org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
- META_STORE_URIS);
- conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
- conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);
-
- // Append Iceberg sink to data stream
- IcebergSinkAppender<Map<String, Object>> appender =
- new IcebergSinkAppender<Map<String, Object>>(conf, "test")
- .withSerializer(MapAvroSerializer.getInstance())
- .withWriterParallelism(1);
- appender.append(dataStream);
-
- // Trigger the execution
- env.execute("Sink Test");
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作为中间格式;以及在各种失败的情况下是否仍能保证端到端的 exactly-once;按固定时长做 checkpoint,在高低峰时生成不同大小的 DataFile,是否对后续读不友好等。这些问题都在我们的后续规划中,也会全数贡献给社区。
[1] Iceberg 官网:
https://iceberg.apache.org/
[2] Flink 1.10文 档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 设计文档:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容错机制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
# 社区活动推荐 #
普惠全球开发者,这一次,格外与众不同!首个 Apache 顶级项目在线会议 Flink Forward 全球直播中文精华版来啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海内外一线厂商,经典 Flink 应用场景,最新功能、未来规划一览无余。点击下方链接可了解更多大会详情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx
原文链接
本文为云栖社区原创内容,未经允许不得转载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。