当前位置:   article > 正文

Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖

flink-sql-connector-mysql-cdc-2.1.0

▼ 关注「Apache Flink」,获取更多技术干货 ▼

摘要:本篇教程将展示如何使用 Flink CDC 构建实时数据湖,并处理分库分表合并同步的场景。

Flink-CDC 项目地址:

https://github.com/ververica/flink-cdc-connectors

Tips:点击「阅读原文」查看更多技术干货~

在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。

但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展示如何使用 Flink CDC 构建实时数据湖来应对这种场景,本教程的演示基于 Docker,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE,你可以很方便地在自己的电脑上完成本教程的全部内容。

接下来将以数据从 MySQL 同步到 Iceberg[1] 为例展示整个流程,架构图如下所示:

cef015a613bca61e7682b20c4aee572a.png

一、准备阶段


准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。

1.1 准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

使用下面的内容创建一个 docker-compose.yml 文件:

 
 
  1. version: '2.1'
  2. services:
  3. sql-client:
  4. user: flink:flink
  5. image: yuxialuo/flink-sql-client:1.13.2.v1
  6. depends_on:
  7. - jobmanager
  8. - mysql
  9. environment:
  10. FLINK_JOBMANAGER_HOST: jobmanager
  11. MYSQL_HOST: mysql
  12. volumes:
  13. - shared-tmpfs:/tmp/iceberg
  14. jobmanager:
  15. user: flink:flink
  16. image: flink:1.13.2-scala_2.11
  17. ports:
  18. - "8081:8081"
  19. command: jobmanager
  20. environment:
  21. - |
  22. FLINK_PROPERTIES=
  23. jobmanager.rpc.address: jobmanager
  24. volumes:
  25. - shared-tmpfs:/tmp/iceberg
  26. taskmanager:
  27. user: flink:flink
  28. image: flink:1.13.2-scala_2.11
  29. depends_on:
  30. - jobmanager
  31. command: taskmanager
  32. environment:
  33. - |
  34. FLINK_PROPERTIES=
  35. jobmanager.rpc.address: jobmanager
  36. taskmanager.numberOfTaskSlots: 2
  37. volumes:
  38. - shared-tmpfs:/tmp/iceberg
  39. mysql:
  40. image: debezium/example-mysql:1.1
  41. ports:
  42. - "3306:3306"
  43. environment:
  44. - MYSQL_ROOT_PASSWORD=123456
  45. - MYSQL_USER=mysqluser
  46. - MYSQL_PASSWORD=mysqlpw
  47. volumes:
  48. shared-tmpfs:
  49. driver: local
  50. driver_opts:
  51. type: "tmpfs"
  52. device: "tmpfs"

该 Docker Compose 中包含的容器有:

  • SQL-Client:Flink SQL Client, 用来提交 SQL 查询和查看 SQL 的执行结果;

  • Flink Cluster:包含 Flink JobManager 和 Flink TaskManager,用来执行 Flink SQL;

  • MySQL:作为分库分表的数据源,存储本教程的 user 表。

在 docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

 
 
docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8081/ 来查看 Flink 是否运行正常。

7f34c5ca0726d6bdd2c5575e659be943.png

注意:

1. 本教程接下来用到的容器相关的命令都需要在 docker-compose.yml 所在目录下执行。

2. 为了简化整个教程,本教程需要的 jar 包都已经被打包进 SQL-Client 容器中了,镜像的构建脚本可以在 GitHub[2] 上找到。

如果你想要在自己的 Flink 环境运行本教程,需要下载下面列出的包并且把它们放在 Flink 所在目录的 lib 目录下,即 FLINK_HOME/lib/

  • flink-sql-connector-mysql-cdc-2.1.0.jar

    https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.0/flink-sql-connector-mysql-cdc-2.1.0.jar

  • flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

    https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

  • iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

    https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar

截止目前支持 Flink 1.13 的 iceberg-flink-runtime jar 包还没有发布,所以我们在这里提供了一个支持 Flink 1.13 的 iceberg-flink-runtime jar 包,这个 jar 包是基于 Iceberg 的 master 分支打包的。

当 Iceberg 0.13.0 版本发布后,你也可以在 apache official repository[3] 下载到支持 Flink 1.13 的 iceberg-flink-runtime jar 包。


1.2 准备数据

进入 MySQL 容器中:

docker-compose exec mysql mysql -uroot -p123456

创建数据和表,并填充数据。

创建两个不同的数据库,并在每个数据库中创建两个表,作为 user 表分库分表下拆分出的表。

 
 
  1. CREATE DATABASE db_1;
  2. USE db_1;
  3. CREATE TABLE user_1 (
  4. id INTEGER NOT NULL PRIMARY KEY,
  5. name VARCHAR(255) NOT NULL DEFAULT 'flink',
  6. address VARCHAR(1024),
  7. phone_number VARCHAR(512),
  8. email VARCHAR(255)
  9. );
  10. INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
  11. CREATE TABLE user_2 (
  12. id INTEGER NOT NULL PRIMARY KEY,
  13. name VARCHAR(255) NOT NULL DEFAULT 'flink',
  14. address VARCHAR(1024),
  15. phone_number VARCHAR(512),
  16. email VARCHAR(255)
  17. );
  18. INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
  1. CREATE DATABASE db_2;
  2. USE db_2;
  3. CREATE TABLE user_1 (
  4. id INTEGER NOT NULL PRIMARY KEY,
  5. name VARCHAR(255) NOT NULL DEFAULT 'flink',
  6. address VARCHAR(1024),
  7. phone_number VARCHAR(512),
  8. email VARCHAR(255)
  9. );
  10. INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);
  11. CREATE TABLE user_2 (
  12. id INTEGER NOT NULL PRIMARY KEY,
  13. name VARCHAR(255) NOT NULL DEFAULT 'flink',
  14. address VARCHAR(1024),
  15. phone_number VARCHAR(512),
  16. email VARCHAR(255)
  17. );
  18. INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");


二、在 Flink SQL CLI 中

使用 Flink DDL 创建表


首先,使用如下的命令进入 Flink SQL CLI 容器中:

 
 
docker-compose exec sql-client ./sql-client

我们可以看到如下界面:

0176c91763a3174fc3e8af75e1dd0721.png

然后,进行如下步骤:

1. 开启 checkpoint

Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。

 
 
  1. -- Flink SQL
  2. -- 每隔 3 秒做一次 checkpoint
  3. Flink SQL> SET execution.checkpointing.interval = 3s;

2. 创建 MySQL 分库分表 source 表

创建 source 表 user_source 来捕获MySQL中所有 user 表的数据,在表的配置项 database-nametable-name 使用正则表达式来匹配这些表。并且,user_source 表也定义了 metadata 列来区分数据是来自哪个数据库和表。

 
 
  1. -- Flink SQL
  2. Flink SQL> CREATE TABLE user_source (
  3. database_name STRING METADATA VIRTUAL,
  4. table_name STRING METADATA VIRTUAL,
  5. `id` DECIMAL(20, 0) NOT NULL,
  6. name STRING,
  7. address STRING,
  8. phone_number STRING,
  9. email STRING,
  10. PRIMARY KEY (`id`) NOT ENFORCED
  11. ) WITH (
  12. 'connector' = 'mysql-cdc',
  13. 'hostname' = 'mysql',
  14. 'port' = '3306',
  15. 'username' = 'root',
  16. 'password' = '123456',
  17. 'database-name' = 'db_[0-9]+',
  18. 'table-name' = 'user_[0-9]+'
  19. );

3. 创建 Iceberg sink 表

创建 sink 表 all_users_sink,用来将数据加载至 Iceberg 中。在这个 sink 表,考虑到不同的 MySQL 数据库表的 id 字段的值可能相同,我们定义了复合主键 (database_name, table_name, id)。

 
 
  1. -- Flink SQL
  2. Flink SQL> CREATE TABLE all_users_sink (
  3. database_name STRING,
  4. table_name STRING,
  5. `id` DECIMAL(20, 0) NOT NULL,
  6. name STRING,
  7. address STRING,
  8. phone_number STRING,
  9. email STRING,
  10. PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  11. ) WITH (
  12. 'connector'='iceberg',
  13. 'catalog-name'='iceberg_catalog',
  14. 'catalog-type'='hadoop',
  15. 'warehouse'='file:///tmp/iceberg/warehouse',
  16. 'format-version'='2'
  17. );

三、流式写入 Iceberg  


1. 使用下面的 Flink SQL 语句将数据从 MySQL 写入 Iceberg 中:

 
 
  1. -- Flink SQL
  2. Flink SQL> INSERT INTO all_users_sink select * from user_source;

上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。在 Flink UI (http://localhost:8081/#/job/running)上可以看到这个运行的作业:

19b98acd2ebb1eb07e0b1016984e8948.png

然后我们就可以使用如下的命令看到 Iceberg 中的写入的文件:

 
 
docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

如下所示:

3efc6bff0a069bece915880d29bcbd82.png

在你的运行环境中,实际的文件可能与上面的截图不相同,但是整体的目录结构应该相似。

2. 使用下面的 Flink SQL 语句查询表 all_users_sink 中的数据:

 
 
  1. -- Flink SQL
  2. Flink SQL> SELECT * FROM all_users_sink;

在 Flink SQL CLI 中我们可以看到如下查询结果:

7d7a50010b911f5651ee3262eb79da31.png

修改 MySQL 中表的数据,Iceberg 中的表 all_users_sink 中的数据也将实时更新:

(3.1) 在 db_1.user_1 表中插入新的一行

 
 
  1. --- db_1
  2. INSERT INTO db_1.user_1 VALUES
  3. (111,"user_111","Shanghai","123567891234","user_111@foo.com");

(3.2) 更新 db_1.user_2 表的数据

 
 
  1. --- db_1
  2. UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;

(3.3) 在 db_2.user_2 表中删除一行

 
 
  1. --- db_2
  2. DELETE FROM db_2.user_2 WHERE id=220;

每执行一步,我们就可以在 Flink Client CLI 中使用 SELECT * FROM all_users_sink 查询表 all_users_sink 来看到数据的变化。

最后的查询结果如下所示:

dfa9613ed79448a8f5c310a2dfc55829.png

从 Iceberg 的最新结果中可以看到新增了 (db_1, user_1, 111) 的记录,(db_1, user_2, 120) 的地址更新成了Beijing,且 (db_2, user_2, 220) 的记录被删除了,与我们在 MySQL 做的数据更新完全一致。

四、环境清理


本教程结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

五、总结


在本文中,我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最后希望通过本文,能够帮助读者快速上手 Flink CDC 。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

048e6175bdb57c83f467224c6a12bdaf.png

注释:

[1] https://iceberg.apache.org/

[2] https://github.com/luoyuxia/flink-cdc-tutorial/tree/main/flink-cdc-iceberg-demo/sql-client

[3] https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/


相关文章

Flink Forward Asia 2021 

2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。

大会官网:

https://flink-forward.org.cn

大会线上观看地址 (记得预约哦):

https://developer.aliyun.com/special/ffa2021/live


2225ad6a7cd20ba47b8c2d33eabd56ab.png

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

987b0ef054b404abebecc0840edcbe49.png

 7607b78caa0f9f93b934605bd2af672e.gif  戳我,查看更多技术干货~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/395961
推荐阅读
相关标签
  

闽ICP备14008679号