当前位置:   article > 正文

Doris 整合 Iceberg+Flink CDC 构建实时湖仓一体的联邦查询分析架构_iceberg doris

iceberg doris

一、系统架构

  1. 首先我们从 Mysql 数据中使用 Flink CDC,通过 Binlog 完成数据的实时采集

  2. 然后在 Flink 中创建 Iceberg 表,Iceberg 的数据保存在 Hive

  3. 最后我们在 Doris 中创建 Iceberg 外表

  4. 再通过 Doris 统一查询入口完成对 Iceberg 里的数据查询分析,供前端应用调用,这里 Iceberg 外表的数据可以和 Doris 内部数据或者 Doris 其他外部数据源的数据进行关联查询分析

Doris 湖仓一体的联邦查询架构如下:

  1. Doris 通过 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
  2. 同时支持 Elasticsearch 外表
  3. 1.0 版本支持 Hive 外表
  4. 1.1 版本支持 Iceberg 外表
  5. 1.2 版本支持 Hudi 外表

二、数据准备

1、Mysql数据准备

创建 MySql 数据库表并初始化数据

  1. CREATE DATABASE demo;
  2. USE demo;
  3. CREATE TABLE userinfo (
  4. id int NOT NULL AUTO_INCREMENT,
  5. name VARCHAR(255) NOT NULL DEFAULT 'flink',
  6. address VARCHAR(1024),
  7. phone_number VARCHAR(512),
  8. email VARCHAR(255),
  9. PRIMARY KEY (`id`)
  10. )ENGINE=InnoDB ;
  11. INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
  12. INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
  13. INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
  14. INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
  15. INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
  16. INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
  17. INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
  18. INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
  19. INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);

2、Flink准备

启动后的界面如下:

进入 Flink SQL Client

bin/sql-client.sh embedded

开启 Checkpoint,每隔 3 秒做一次 Checkpoint

Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,MySql-CDC 在 Binlog 读取阶段开始前,需要等待一个完整的 Checkpoint 来避免 Binlog 记录乱序的问题。

注意:这里是演示环境,Checkpoint 的间隔设置比较短,线上使用,建议设置为3-5分钟一次 Checkpoint。

Flink SQL> SET execution.checkpointing.interval = 3s;

创建 Iceberg Catalog
  1. CREATE CATALOG hive_catalog WITH (
  2. 'type'='iceberg',
  3. 'catalog-type'='hive',
  4. 'uri'='thrift://localhost:9083',
  5. 'clients'='5',
  6. 'property-version'='1',
  7. 'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
  8. );
  9. Flink SQL> show catalogs;
  10. +-----------------+
  11. | catalog name |
  12. +-----------------+
  13. | default_catalog |
  14. | hive_catalog |
  15. +-----------------+
  16. 2 rows in set
  创建 MySql CDC 表
  1. CREATE TABLE user_source (
  2. database_name STRING METADATA VIRTUAL,
  3. table_name STRING METADATA VIRTUAL,
  4. `id` DECIMAL(20, 0) NOT NULL,
  5. name STRING,
  6. address STRING,
  7. phone_number STRING,
  8. email STRING,
  9. PRIMARY KEY (`id`) NOT ENFORCED
  10. ) WITH (
  11. 'connector' = 'mysql-cdc',
  12. 'hostname' = 'localhost',
  13. 'port' = '3306',
  14. 'username' = 'root',
  15. 'password' = 'MyNewPass4!',
  16. 'database-name' = 'demo',
  17. 'table-name' = 'userinfo'
  18. );
 创建 Iceberg 表
  1. ---查看catalog
  2. show catalogs;
  3. ---使用catalog
  4. use catalog hive_catalog;
  5. --创建数据库
  6. CREATE DATABASE iceberg_hive;
  7. --使用数据库
  8. use iceberg_hive;
  9. CREATE TABLE all_users_info (
  10.   database_name STRING,
  11.   table_name   STRING,
  12.   `id`         DECIMAL(200NOT NULL,
  13.   name         STRING,
  14.   address       STRING,
  15.   phone_number STRING,
  16.   email         STRING,
  17.   PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  18. WITH (
  19.   'catalog-type'='hive'
  20. );

开发逻辑:从 CDC 表里插入数据到 Iceberg 表里

  1. use catalog default_catalog;
  2. insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

 

去查询 Iceberg 表

select * from hive_catalog.iceberg_hive.all_users_info

 去 HDFS 上可以看到 Hive 目录下的数据及对应的元数据:

也可以通过Hive建好Iceberg表,然后通过Flink将数据插入到表里

在 Hive Shell 下执行:

  1. SET engine.hive.enabled=true;
  2. SET iceberg.engine.hive.enabled=true;
  3. SET iceberg.mr.catalog=hive;
  4. add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

创建表

  1. CREATE EXTERNAL TABLE iceberg_hive(
  2.  `id` int,
  3.  `name` string)
  4. STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  5. LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
  6. TBLPROPERTIES (
  7.  'iceberg.mr.catalog'='hadoop',
  8. 'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
  9.  );

然后在 Flink SQL Client 下执行下面语句将数据插入到 Iceber 表里

  1. INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2'c');
  2. INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3'zhangfeng');');

查询这个表

select * from hive_catalog.iceberg_hive.iceberg_hive

 

三、Doris 查询 Iceberg

建 Iceberg 外表
  1. CREATE TABLE `all_users_info`
  2. ENGINE = ICEBERG
  3. PROPERTIES (
  4. "iceberg.database" = "iceberg_hive",
  5. "iceberg.table" = "all_users_info",
  6. "iceberg.hive.metastore.uris" = "thrift://localhost:9083",
  7. "iceberg.catalog.type" = "HIVE_CATALOG"
  8. );

参数说明:

ENGINE 需要指定为 ICEBERG

PROPERTIES 属性:

  • iceberg.hive.metastore.uris:Hive Metastore 服务地址
  • iceberg.database:挂载 Iceberg 对应的数据库名
  • iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。
  • iceberg.catalog.type:Iceberg 中使用的 Catalog 方式,默认为 HIVE_CATALOG,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。
  1. select * from all_users_info;
  2. +---------------+------------+-------+----------+-----------+--------------+-------+
  3. | database_name | table_name | id | name | address | phone_number | email |
  4. +---------------+------------+-------+----------+-----------+--------------+-------+
  5. | demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL |
  6. | demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL |
  7. | demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL |
  8. | demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL |
  9. | demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL |
  10. | demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL |
  11. | demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL |
  12. | demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL |
  13. | demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |
  14. +---------------+------------+-------+----------+-----------+--------------+-------+
  15. 9 rows in set (0.18 sec)
Doris同步挂在 Iceberg 表 Schema 变更不会自动同步,需要在 Doris 中通过 REFRESH 命令同步 Iceberg 外表或数据库。

当 Iceberg 表 Schema 发生变更时,可以通过 REFRESH 命令手动同步,该命令会将 Doris 中的 Iceberg 外表删除重建。

  1. -- 同步 Iceberg 表
  2. REFRESH TABLE t_iceberg;
  3. -- 同步 Iceberg 数据库
  4. REFRESH DATABASE iceberg_test_db;
Doris 和 Iceberg 数据类型对应关系

支持的 Iceberg 列类型与 Doris 对应关系如下表:

IcebergDoris描述
BOOLEANBOOLEAN
INTEGERINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMESTAMPDATETIMETimestamp 转成 Datetime 会损失精度
STRINGSTRING
UUIDVARCHAR使用 VARCHAR 来代替
DECIMALDECIMAL
TIME-不支持
FIXED-不支持
BINARY-不支持
STRUCT-不支持
LIST-不支持
MAP-不支持

 

四、总结

        这里Doris On Iceberg我们只演示了Iceberg单表的查询,你还可以联合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等进行联合查询分析,通过Doris对外提供统一的查询分析入口。

自此我们完整从搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介绍完了,Doris朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作,快快来体验吧。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/504476
推荐阅读
相关标签
  

闽ICP备14008679号