赞
踩
什么是数据湖?
数据湖是一个集中式存储库,允许以任意规模存储所有结构化和非结构化数据。您可以按原样存储数据(无需先对数据进行结构化处理),并运行不同类型的分析 – 从控制面板和可视化到大数据处理、实时分析和机器学习,以指导做出更好的决策。
关于数据湖的定义确实是一个业界有较多争议的地方。狭义的数据湖指的是数据湖存储,即可以存放海量数据(各种格式)的地方,包括 Hadoop 的文件系统 HDFS 或者云上的对象存储系统 S3 都属于这个范畴。广义的数据湖除了数据湖存储,还包括数据湖的管理和分析,即提供一整套工具,提供数据目录(Data Catalog)服务以及统一的数据访问。
业界经常会将数据湖与数据仓库做对比,它们可满足不同需求。
数据仓库是一个优化的数据库,用于分析来自事务系统和业务线应用程序的关系数据。事先定义数据结构和 Schema 以优化快速 SQL 查询,其中结果通常用于操作报告和分析。数据经过了清理、丰富和转换,因此可以充当用户可信任的“单一信息源”。
数据湖有所不同,因为它存储来自业务线应用程序的关系数据,以及来自移动应用程序、IoT 设备和社交媒体的非关系数据。捕获数据时,未定义数据结构或 Schema。这意味着您可以存储所有数据,而不需要精心设计也无需知道将来您可能需要哪些问题的答案。您可以对数据使用不同类型的分析(如 SQL 查询、大数据分析、全文搜索、实时分析和机器学习)来获得见解。
特性 | 数据仓库 | 数据湖 |
数据 | 来自事务系统、运营数据库和业务线应用程序的关系数据 | 来自 IoT 设备、网站、移动应用程序、社交媒体和企业应用程序的非关系和关系数据 |
Schema | 设计在数据仓库实施之前(写入型 Schema) | 写入在分析时(读取型 Schema) |
性价比 | 更快查询结果会带来较高存储成本 | 更快查询结果只需较低存储成本 |
数据质量 | 可作为重要事实依据的高度监管数据 | 任何可以或无法进行监管的数据(例如原始数据) |
用户 | 业务分析师 | 数据科学家、数据开发人员和业务分析师(使用监管数据) |
分析 | 批处理报告、BI 和可视化 | 机器学习、预测分析、数据发现和分析 |
数据湖优先的设计拥有更高的灵活性。数据湖的数据存储形式和结构可以不预先定义,可以是结构化的,也可以是半结构化的。计算引擎可以根据不同的场景读写数据湖中存储的数据,这意味着我们在对数据进行分析和处理时能获取到数据全部的初始信息,使用也更灵活,高效。
而数据仓库优先的设计,能够做到更加规范化的数据管理。数据进入数据仓库前,通常预先定义 schema,数据开发需要预先根据业务进行建模,构建数据模型,用户通过数据服务接口或者计算引擎访问数据模型来获取干净和规范的数据。
Apache Iceberg is an open table format for huge analytic datasets.
Iceberg是大型分析数据集的一个开源的表格式,基于MVCC的设计,在HDFS或S3等存储上支持使用Trino、Spark、Flink等引擎进行操作,且提供了ACID语义支持。
下面是基于Iceberg构建的数据湖架构图示例:
Iceberg 基于MVCC设计,每次commit都会生成一个快照(Snapshot),该快照包含唯一的snapshotId、时间戳timestamp及对应的manifest文件。
如下图所示,最新 snapshot 拥有该表的全局视图,每个snapshot包含多个manifest文件,每个manifest文件中记录本次事务中记录写入文件与分区的对应关系,且包含一些文件记录的统计信息(如lower_bound、upper_bound、added_rows_count、deleted_rows_count)用来快速筛选文件。一定程度上可以把 manifest 文件理解为索引文件。
基于Snapshot的设计,用户需要通过snapshot来访问iceberg中的数据,如果数据写入但没有commit成功,就不会生成新的snapshot,也因此不会访问到这部分不完整数据。
文件在HDFS上布局如下图所示,顶层为database,然后是table。每张表包含两个目录,第一个目录metadata保存snapshot、manifest文件;第二个目录data保存数据文件,按照分区进行划分。
所有应用在iceberg上的删除与更新操作并不会真正的物理删除,而是append一条记录到文件。Iceberg中数据文件分为两类,一类是插入操作对应Data File, 另一类是删除操作对应Delete File,更新操作则会被拆为Insert + delete操作。
实现细节可以参考Flink 如何实时分析 Iceberg 数据湖的 CDC 数据。
Iceberg表是基于MOR(Merge on Read)来实现的,在查询时根据指定的snapshot查找对应的manifest文件,然后根据查询条件过滤出需要查询的Data File及其对应的Delete File(可能有多个),然后做join操作得出最终的查询结果。
接入iceberg、创建db、配置环境请参考Iceberg 接入文档(最新版) 。
在读写Iceberg示例 中我们演示建表、flink sql写iceberg、spark sql读iceberg等相关内容。
Iceberg表分为V1表和V2表,即format V1和format v2。
Format V1:支持日志数据(即只有append的数据),默认版本。
Format V2:支持Change Log , 即支持行级别删除(Row level delete)、更新(Update)等操作,例如binlog等数据需要创建format v2表来支持存储。
目前两种表在创建时需要在表的options中显示指定,如果不指定则默认为V1版本。
参考文档:Table Spec
目前建表操作只能使用spark sql来,可以在Miquery或者beeline来进行执行DDL等建表操作。
接入流程如下:
输入DDL进行建表操作,如下所示在iceberg_catalog,flink的database下创建表名为sample表,通过'format.version'='2' 来指定该表为V2表,按照 id 进行分桶分区。其他参数介绍参见表参数及注意事项及Spark DDL语法。
- CREATE TABLE iceberg_tjwqstaging_hdd.flink.sample(
-
- `id` bigint COMMENT NOT NULL 'unique id',
-
- `data` string
-
- )
-
- USING iceberg
-
- PARTITIONED BY(bucket(16, id))
-
- TBLPROPERTIES(
- 'format.version'='2',
- 'equality.field.columns'='id',
- 'write.distribution-mode'='hash',
-
- 'read.split.target-size'='1073741824',
-
- 'read.parquet.vectorization.enabled'='true'
-
- );
建表中常用参数如下,参数可以通过Spark DDL进行修改。
更多参数参考Table properties。
参数 | 默认值 | 可选值 | 说明 |
equality.field.columns | null | 无 | 主键,多个采用逗号分隔,注意主键列不可为NULL |
format.version | 1 | 1、2 | 表 format版本(1、2),参考Format V1、V2表 |
write.distribution-mode | none | none、hash、range | 是否开起分桶,建议使用可以减少小文件数目、避免数据倾斜 |
read.split.target-size | 134217728 | 文件split大小,该参数会影响查询性能,建议调大 | |
read.parquet.vectorization.enabled | false | 是否开启向量化查询 | |
read.parquet.vectorization.batch-size | 5000 |
功能项 | 是否支持 |
Flink批式读\写日志数据 | 支持 |
Flink流式读\写日志数据 | 支持 |
Flink流写CDC数据 | 支持 |
增量读CDC数据 | 支持 |
Spark读\写日志数据 | 支持 |
Spark读CDC数据 | 支持 |
Spark MERGE INTO\DELETE FROM | 支持 |
计算引擎 | 版本支持 | 备注 |
Spark | 3.1.1 | 2.3、3.0版本不支持 |
Flink | 1.11 | 1.11以下版本不支持 |
Trino | 352 |
目前iceberg已经在武清和中经云集群上线,支持使用flink、spark、trino进行读写操作。
对应版本及环境地址如下:
集群 | tjwqstaging-hdd | zjyprc-hadoop |
Miquery链接、可用队列 | https://cloud.d.xiaomi.net/#/olap/query?_k=97qq4h root.production.cloud_group.computing.iceberg | https://cloud.d.xiaomi.net/#/olap/query?_k=lexeab root.production.cloud_group.computing.iceberg |
metastore 地址(uri) | thrift://tj1-hadoop-staging-hive03.kscn:58202,thrift://tj1-hadoop-staging-hive04.kscn:58202 | thrift://zjy-hadoop-metastore01.bj:58202,thrift://zjy-hadoop-metastore02.bj:58202 |
仓库地址(warehouse ) | hdfs://tjwqstaging-hdd/user/h_data_platform/datalake | hdfs://zjyprc-hadoop/user/h_data_platform/datalake |
hive metastore配置(hive-conf-dir) | hdfs://tjwqstaging-hdd/user/s_flink/flink-sql/iceberg-meta-store-conf | hdfs://zjyprc-hadoop/user/s_flink/flink-sql/iceberg-meta-store-conf |
目前支持使用spark、trino查询iceberg表,flink、spark进行ETL等操作。
当前只支持在Miquery平台执行DDL,支持建表、修改表、修改分区、修改参数等操作。
语法参见官方文档
Spark查询元数据
由于目前Miquery存在字段转换问题,需要使用将made_current_at进行转换才可以展示(beeline环境不存在该问题,可直接select * from xxx)。
select cast(made_current_at as String),snapshot_id,parent_id,is_current_ancestor from iceberg_catalog.flink.iceberg_v1_tests.history;
+-------------------------+---------------------+---------------------+---------------------+
| made_current_at | snapshot_id | parent_id | is_current_ancestor |
+-------------------------+---------------------+---------------------+---------------------+
| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true |
| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true |
| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false |
| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true |
| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true |
| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |
+-------------------------+---------------------+---------------------+---------------------+
select * from iceberg_tjwqstaging_hdd.flink.ads_sales_org_sku_day.snapshots
其他table 元数据查询请参考Inspecting tables.
Spark查询
可以在Miquery使用spark sql或DataFrames进行查询:
select * from iceberg.sample;
val df = spark.table("iceberg.sample")
Time travel
指定时间戳或者snapshotId,查询时间戳和snapshotId请。
目前Spark sql不支持time travel,可使用DataFrames API:
- // time travel to October 26, 1986 at 01:21:00
-
- spark.read
-
- .option("as-of-timestamp", "499162860000")
-
- .format("iceberg")
-
- .load("path/to/table")
-
- // time travel to snapshot with ID 10963874102873L
-
- spark.read
-
- .option("snapshot-id", 10963874102873L)
-
- .format("iceberg")
-
- .load("path/to/table")
Spark写入
当前支持INSERT INTO、MERGE INTO 、INSERT OVERWRITE、DELETE FROM 等操作,详见官方文档。
目前可以在作业管理平台提交Flink作业,iceberg可以作为source表或sink表。
创建catalog
metastore及warehouse地址见集群环境章节。
flink 1.11 版本创建catalog:
- CREATE CATALOG hive_catalog WITH (
-
- 'type'='iceberg',
-
- 'catalog-type'='hive',
-
- 'clients'='5',
-
- 'warehouse'='hdfs://tjwqstaging-hdd/user/h_data_platform/datalake',
-
- 'uri'='thrift://tj1-hadoop-staging-hive03.kscn:58202,thrift://tj1-hadoop-staging-hive04.kscn:58202'
-
- );
flink1.12 版本创建catalog:
- CREATE CATALOG hive_catalog WITH (
-
- 'type'='iceberg',
-
- 'catalog-type'='hive',
-
- 'clients'='5',
-
- 'hive-conf-dir'='hdfs://zjyprc-hadoop/user/s_flink/flink-sql/iceberg-meta-store-conf'
-
- );
其他配置参考文档。
建表
使用Spark SQL建表。创建表成功后不需要在flink sql中进行声明,可直接使用,使用方式为hive_catalog.db.table 。
Flink读
flink支持批式消费和流式消费,示例如下:
-- Execute the flink job in batch mode for current session context
SET execution.type = batch ;
SELECT * FROM hive_catalog.db.table;
开启动态配置来传参
-- Submit the flink job in streaming mode for current session.
SET execution.type = streaming ;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM hive_catalog.db.table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM hive_catalog.db.table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
参数 | 说明 | 默认值 |
streaming | 是否开启流式消费 | false |
monitor-interval | 流式消费监控commit数据的周期,仅流模式 | 10s |
start-snapshot-id | 消费起始snapshotId | null |
end-snapshot-id | 消费终止snapshotId, 仅批模式 | null |
case-sensitive | schema是否大小写敏感 | false |
as-of-timestamp | 消费起始timestamp,单位ms | null |
TableLoader配置:
-
- // Hadoop配置
-
- Configuration hadoopConf = new Configuration();
-
- Map<String, String> properties = new HashMap<>();
-
- properties.put("clients", "5");
-
- // metastore地址, 当配置了hive-conf-dir后可不配uri和warehouse
-
- //properties.put("uri", "thrift://tj1-hadoop-staging-hive03.kscn:58202,thrift://tj1-hadoop-staging-hive04.kscn:58202");
-
- //properties.put("warehouse", "hdfs://tjwqstaging-hdd/user/h_data_platform/datalake");
-
- properties.put("hive-conf-dir", "hdfs://tjwqstaging-hdd/user/s_flink/flink-sql/iceberg-meta-store-conf");
-
- // Catalog配置
-
- CatalogLoader hiveCatalogLoader = CatalogLoader.hive("iceberg_tjwqstaging_hdd", hadoopConf, properties);
-
- // 库名 表名
-
- TableLoader tableLoader = TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("flink", "iceberg_test"));
-
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
- TableLoader tableLoader = //见上面TableLoader配置
-
- DataStream<RowData> batch = FlinkSource.forRowData()
-
- .env(env)
-
- .tableLoader(loader)
-
- .streaming(false)
-
- .build();
-
- // Print all records to stdout.
-
- batch.print();
-
- // Submit and execute this batch read job.
-
- env.execute("Test Iceberg Batch Read");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
- TableLoader tableLoader = //见上面TableLoader配置
-
- DataStream<RowData> stream = FlinkSource.forRowData()
-
- .env(env)
-
- .tableLoader(loader)
-
- .streaming(true)
-
- .startSnapshotId(3821550127947089987)
-
- .build();
-
- // Print all records to stdout.
-
- stream.print();
-
- // Submit and execute this streaming read job.
-
- env.execute("Test Iceberg Batch Read");
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
-
- TableLoader tableLoader = //见上面TableLoader配置
-
- TableSchema projectedSchema = TableSchema.builder()
-
- .field("id", DataTypes.INT())
-
- .field("data", DataTypes.STRING())
-
- .build();
-
- DataStream<PojoData> result = FlinkSource.forRowData()
-
- .env(env)
-
- .tableLoader(tableLoader())
-
- .project(projectedSchema)
-
- .build(PojoData.class); //PojoData是需要你提供的Pojo类,他的字段与构建的projectedSchema一致,
-
- // Print all records to stdout.
-
- stream.print();
-
- // Submit and execute this streaming read job.
-
- env.execute("Test Iceberg Batch Read");
Flink写
支持流式和批式写入
INSERT INTO hive_catalog.default.sample VALUES (1, 'a');
INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;
仅批模式.
覆盖表:
INSERT OVERWRITE sample VALUES (1, 'a');
覆盖指定分区:
INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
- public static FlinkSink.Builder forRowData(DataStream<RowData> input)
-
-
- FlinkSink.forRowData(dataStream)
-
- .table(table)
-
- .tableLoader(tableLoader)
-
- .writeParallelism(parallelism)
-
- .build();
示例详见unit test
-
- public static FlinkSink.Builder forRow(DataStream<Row> input, TableSchema tableSchema)
-
-
- //schema构建示例
-
- public static final TableSchema FLINK_SCHEMA = TableSchema.builder()
-
- .field("id", DataTypes.BIGINT())
-
- .field("data", DataTypes.STRING())
-
- .build();
示例详见unit test
public static <T> FlinkSink.Builder builderFor(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType)
-
- //Iceberg从Hive MetaStore加载catalog:
-
- StreamExecutionEnvironment env = ...;
-
- TableLoader tableLoader = //见上面TableLoader配置
-
- List<PojoData> pojos = Lists.newArrayList(
-
- new PojoData(1, "Hogwarts"),
-
- new PojoData(2, "Tom Marvolo Riddle"),
-
- new PojoData(3, "Harry Potter")
-
- );
-
- public static final TableSchema FLINK_SCHEMA = TableSchema.builder()
-
- .field("id", DataTypes.INT())
-
- .field("data", DataTypes.STRING())
-
- .build();
-
- DataStream<PojoData> pojoDataStream = env.fromCollection(pojos);
-
- FlinkSink.forData(pojoDataStream, PojoData.class, FLINK_SCHEMA)
-
- .table(table)
-
- .tableLoader(tableLoader)
-
- .writeParallelism(parallelism)
-
- .build();
-
- env.execute("Test Iceberg DataStream of POJO.");
- //Iceberg从Hive MetaStore加载catalog:
-
- StreamExecutionEnvironment env = ...;
-
- DataStream<RowData> input = ... ;
-
- TableLoader tableLoader = //见上面TableLoader配置
-
- FlinkSink.forRowData(input)
-
- .tableLoader(tableLoader)
-
- .hadoopConf(hadoopConf)
-
- .build();
-
- env.execute("Test Iceberg DataStream");
-
-
- //Iceberg从hive metastore加载catalog,:
-
- StreamExecutionEnvironment env = ...;
-
- DataStream<RowData> input = ... ;
-
- Configuration hadoopConf = new Configuration();
-
- hadoopConf.set("hive.metastore.sasl.enabled", "true");
-
- Map<String, String> properties = new HashMap<>();
-
- properties.put("clients", "5");
-
- // metastore地址, 当配置了hive-conf-dir后可不配uri和warehouse
-
- //properties.put("uri", "thrift://tj1-hadoop-staging-hive03.kscn:58202,thrift://tj1-hadoop-staging-hive04.kscn:58202");
-
- //properties.put("warehouse", "hdfs://tjwqstaging-hdd/user/h_data_platform/datalake");
-
- properties.put("hive-conf-dir", "hdfs://tjwqstaging-hdd/user/s_flink/flink-sql/iceberg-meta-store-conf");
-
- // Catalog集群:
-
- CatalogLoader hiveCatalogLoader = CatalogLoader.hive("iceberg_tjwqstaging_hdd", hadoopConf, properties);
-
- // 库名 表名
-
- TableLoader tableLoader = TableLoader.fromCatalog(hiveCatalogLoader, TableIdentifier.of("flink", "iceberg_test"));
-
- FlinkSink.forRowData(input)
-
- .tableLoader(tableLoader)
-
- .build();
-
- env.execute("Test Iceberg DataStream");
- //Iceberg从hive metastore加载catalog:
-
- StreamExecutionEnvironment env = ...;
-
- DataStream<RowData> input = ... ;
-
- Configuration hadoopConf = new Configuration();
-
- TableLoader tableLoader = //见上面TableLoader配置
-
- FlinkSink.forRowData(input)
-
- .tableLoader(tableLoader)
-
- .overwrite(true)
-
- .hadoopConf(hadoopConf)
-
- .build();
-
- env.execute("Test Iceberg DataStream");
待补充
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。