赞
踩
首先我们从 Mysql 数据中使用 Flink CDC,通过 Binlog 完成数据的实时采集
然后在 Flink 中创建 Iceberg 表,Iceberg 的数据保存在 Hive 里
最后我们在 Doris 中创建 Iceberg 外表
再通过 Doris 统一查询入口完成对 Iceberg 里的数据查询分析,供前端应用调用,这里 Iceberg 外表的数据可以和 Doris 内部数据或者 Doris 其他外部数据源的数据进行关联查询分析
Doris 湖仓一体的联邦查询架构如下:
创建 MySql 数据库表并初始化数据
- CREATE DATABASE demo;
- USE demo;
- CREATE TABLE userinfo (
- id int NOT NULL AUTO_INCREMENT,
- name VARCHAR(255) NOT NULL DEFAULT 'flink',
- address VARCHAR(1024),
- phone_number VARCHAR(512),
- email VARCHAR(255),
- PRIMARY KEY (`id`)
- )ENGINE=InnoDB ;
- INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
- INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
- INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
- INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
- INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
- INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
- INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
- INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
- INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);
启动后的界面如下:
bin/sql-client.sh embedded
开启 Checkpoint,每隔 3 秒做一次 Checkpoint
Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,MySql-CDC 在 Binlog 读取阶段开始前,需要等待一个完整的 Checkpoint 来避免 Binlog 记录乱序的问题。
注意:这里是演示环境,Checkpoint 的间隔设置比较短,线上使用,建议设置为3-5分钟一次 Checkpoint。
Flink SQL> SET execution.checkpointing.interval = 3s;
- CREATE CATALOG hive_catalog WITH (
- 'type'='iceberg',
- 'catalog-type'='hive',
- 'uri'='thrift://localhost:9083',
- 'clients'='5',
- 'property-version'='1',
- 'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
- );
-
- Flink SQL> show catalogs;
- +-----------------+
- | catalog name |
- +-----------------+
- | default_catalog |
- | hive_catalog |
- +-----------------+
- 2 rows in set
- CREATE TABLE user_source (
- database_name STRING METADATA VIRTUAL,
- table_name STRING METADATA VIRTUAL,
- `id` DECIMAL(20, 0) NOT NULL,
- name STRING,
- address STRING,
- phone_number STRING,
- email STRING,
- PRIMARY KEY (`id`) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'localhost',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = 'MyNewPass4!',
- 'database-name' = 'demo',
- 'table-name' = 'userinfo'
- );
- ---查看catalog
- show catalogs;
- ---使用catalog
- use catalog hive_catalog;
- --创建数据库
- CREATE DATABASE iceberg_hive;
- --使用数据库
- use iceberg_hive;
-
- CREATE TABLE all_users_info (
- database_name STRING,
- table_name STRING,
- `id` DECIMAL(20, 0) NOT NULL,
- name STRING,
- address STRING,
- phone_number STRING,
- email STRING,
- PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
- ) WITH (
- 'catalog-type'='hive'
- );
开发逻辑:从 CDC 表里插入数据到 Iceberg 表里
- use catalog default_catalog;
-
- insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;
去查询 Iceberg 表
select * from hive_catalog.iceberg_hive.all_users_info
去 HDFS 上可以看到 Hive 目录下的数据及对应的元数据:
在 Hive Shell 下执行:
- SET engine.hive.enabled=true;
- SET iceberg.engine.hive.enabled=true;
- SET iceberg.mr.catalog=hive;
- add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;
创建表
- CREATE EXTERNAL TABLE iceberg_hive(
- `id` int,
- `name` string)
- STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
- LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
- TBLPROPERTIES (
- 'iceberg.mr.catalog'='hadoop',
- 'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
- );
然后在 Flink SQL Client 下执行下面语句将数据插入到 Iceber 表里
- INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');
- INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');');
查询这个表
select * from hive_catalog.iceberg_hive.iceberg_hive
- CREATE TABLE `all_users_info`
- ENGINE = ICEBERG
- PROPERTIES (
- "iceberg.database" = "iceberg_hive",
- "iceberg.table" = "all_users_info",
- "iceberg.hive.metastore.uris" = "thrift://localhost:9083",
- "iceberg.catalog.type" = "HIVE_CATALOG"
- );
参数说明:
ENGINE 需要指定为 ICEBERG
PROPERTIES 属性:
- select * from all_users_info;
- +---------------+------------+-------+----------+-----------+--------------+-------+
- | database_name | table_name | id | name | address | phone_number | email |
- +---------------+------------+-------+----------+-----------+--------------+-------+
- | demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL |
- | demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL |
- | demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL |
- | demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL |
- | demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL |
- | demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL |
- | demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL |
- | demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL |
- | demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |
- +---------------+------------+-------+----------+-----------+--------------+-------+
- 9 rows in set (0.18 sec)
REFRESH
命令同步 Iceberg 外表或数据库。当 Iceberg 表 Schema 发生变更时,可以通过 REFRESH
命令手动同步,该命令会将 Doris 中的 Iceberg 外表删除重建。
- -- 同步 Iceberg 表
- REFRESH TABLE t_iceberg;
-
- -- 同步 Iceberg 数据库
- REFRESH DATABASE iceberg_test_db;
支持的 Iceberg 列类型与 Doris 对应关系如下表:
Iceberg | Doris | 描述 |
---|---|---|
BOOLEAN | BOOLEAN | |
INTEGER | INT | |
LONG | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DATE | DATE | |
TIMESTAMP | DATETIME | Timestamp 转成 Datetime 会损失精度 |
STRING | STRING | |
UUID | VARCHAR | 使用 VARCHAR 来代替 |
DECIMAL | DECIMAL | |
TIME | - | 不支持 |
FIXED | - | 不支持 |
BINARY | - | 不支持 |
STRUCT | - | 不支持 |
LIST | - | 不支持 |
MAP | - | 不支持 |
这里Doris On Iceberg我们只演示了Iceberg单表的查询,你还可以联合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等进行联合查询分析,通过Doris对外提供统一的查询分析入口。
自此我们完整从搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介绍完了,Doris朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作,快快来体验吧。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。