当前位置:   article > 正文

Flink CDC 系列 - 实现 MySQL 数据实时写入 Apache Doris

mysql 写入 apache druid

▼ 关注「ApacheFlink」视频号,遇见更多大咖 ▼

摘要:本文通过实例来演示怎么通过 Flink CDC 结合 Doris 的 Flink Connector 实现从 Mysql 数据库中监听数据并实时入库到 Doris 数仓对应的表中。主要内容包括:

  1. 什么是 CDC

  2. Flink CDC

  3. 什么是 Flink Doris Connector

  4. 用法示例

Tips:FFA 大会以及 Hackathon 比赛重磅开启,点击「阅读原文」了解详情~

一、什么是 CDC


CDC 是变更数据捕获 (Change Data Capture) 技术的缩写,它可以将源数据库 (Source) 的增量变动记录,同步到一个或多个数据目的 (Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组 (GROUP BY)、多表的关联 (JOIN) 等。

例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到 Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移,后续 D 部门、E 部门也会有数据分析的需求,这种场景下,传统的拷贝分发多个副本方法很不灵活,而 CDC 可以实现一份变动记录,实时处理并投递到多个目的地。

CDC 的应用场景

  • 数据同步:用于备份,容灾;

  • 数据分发:一个数据源分发给多个下游系统;

  • 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。

CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:

  • 基于查询的 CDC:

    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;

    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;

    • 不保障实时性,基于离线调度存在天然的延迟。

  • 基于日志的 CDC:

    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;

    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;

    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。

二、Flink CDC


Flink 在 1.11 版本中新增了 CDC 的特性,简称改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看 CDC 的内容。

ea47919c977aefd4c447e5960adc414f.png

以上是之前的 mysq binlog 日志处理流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时消费 Kakfa 的数据实现 mysql 数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段:

  1. Mysql 开启 binlog;

  2. Canal 同步 binlog 数据写入到 Kafka;

  3. Flink 读取 Kakfa 中的 binlog 数据进行相关的业务处理。

整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC 可以直接从数据库获取到 binlog 供下游进行业务计算分析

Flink Connector Mysql CDC 2.0 特性

提供 MySQL CDC 2.0,核心 feature 包括:

  • 并发读取,全量数据的读取性能可以水平扩展;

  • 全程无锁,不对线上业务产生锁的风险;

  • 断点续传,支持全量阶段的 checkpoint。

网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段:

  • MySQL CDC 2.0 用时 13 分钟;

  • MySQL CDC 1.4 用时 89 分钟;

  • 读取性能提升 6.8 倍。

三、什么是 Flink Doris Connector


Flink Doris Connector 是 Doris 社区为了方便用户使用 Flink 读写 Doris 数据表的一个扩展,目前 Doris 支持 Flink 1.11.x ,1.12.x,1.13.x;Scala 版本:2.12.x。

目前 Flink Doris connector 目前控制入库通过两个参数:

  1. sink.batch.size:每多少条写入一次,默认 100 条;

  2. sink.batch.interval :每个多少秒写入一下,默认 1 秒。

这两参数同时起作用,哪个条件先到就触发写 Doris 表操作,

注意:

这里注意的是要启用 http v2 版本,具体在 fe.conf 中配置 enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。

四、用法示例


4.1 Flink Doris Connector 编译

首先我们要编译 Doris 的 Flink connector,也可以通过下面的地址进行下载:

https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar

注意:

这里因为 Doris 的 Flink Connector 是基于 Scala 2.12.x 版本进行开发的,所以你在使用 Flink 的时候请选择对应 Scala 2.12 的版本,如果你使用上面地址下载了相应的 jar,请忽略下面的编译内容部分。

在 Doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的 JDK 版本是 11,会存在编译问题。

在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 Flink 的 ClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn 集群模式运行的 Flink,则将此文件放入预部署包中。

针对 Flink 1.13.x 版本适配问题

  1. <properties>
  2. <scala.version>2.12</scala.version>
  3. <flink.version>1.11.2</flink.version>
  4. <libthrift.version>0.9.3</libthrift.version>
  5. <arrow.version>0.15.1</arrow.version>
  6. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  7. <doris.home>${basedir}/../../</doris.home>
  8. <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
  9. </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编辑即可。

4.2 配置 Flink

这里我们是通过 Flink Sql Client 方式来进行操作。

这里我们演示使用的软件版本:

  1. Mysql 8.x

  2. Apache Flink :1.13.3

  3. Apache Doris :0.14.13.1

■ 4.2.1 安装 Flink

首先下载和安装 Flink :

https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz

这里演示使用的是本地单机模式,

 
 
  1. # wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
  2. # tar zxvf flink-1.12.5-bin-scala_2.12.tgz

下载 Flink CDC 相关 Jar 包:

https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar

这里注意 Flink CDC 和 Flink 的版本对应关系。

75caaa2e35d98c0400ce202310356d01.png

  • 将上面下载或者编译好的 Flink  Doris Connector jar 包复制到 Flink 根目录下的 lib 目录下;

  • Flink CDC 的 jar 包也复制到 Flink 根目录下的 lib 目录下。

16137bd70e001ea728b5d4901f292fe9.png

■ 4.2.2 启动 Flink

这里我们使用的是本地单机模式。

 
 
  1. # bin/start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host doris01.
  4. Starting taskexecutor daemon on host doris01.

我们通过 web 访问 (默认端口是 8081) 启动起来 Flink 集群,可以看到集群正常启动。

bea70589c31295563707f2b7966d9763.png

4.3 安装 Apache Doris

具体安装部署 Doris 的方法,参照下面的连接:

https://hf200012.github.io/2021/09/Apache-Doris- 环境安装部署。

4.4 安装配置 Mysql

1. 安装 Mysql,快速使用 Docker 安装配置 Mysql,具体参照下面的连接:

https://segmentfault.com/a/1190000021523570

2. 开启 Mysql binlog,进入 Docker 容器修改 /etc/my.cnf 文件,在 [mysqld] 下面添加以下内容,

  1. log_bin=mysql_bin
  2. binlog-format=Row
  3. server-id=1

然后重启 Mysql。

systemctl restart mysqld

3. 创建 Mysql 数据库表。

  1. CREATE TABLE `test_cdc` (
  2. `id` int NOT NULL AUTO_INCREMENT,
  3. `name` varchar(255) DEFAULT NULL,
  4. PRIMARY KEY (`id`)
  5. ) ENGINE=InnoDB

4.5 创建 Doris 表

 
 
  1. CREATE TABLE `doris_test` (
  2. `id` int NULL COMMENT "",
  3. `name` varchar(100) NULL COMMENT ""
  4. ) ENGINE=OLAP
  5. UNIQUE KEY(`id`)
  6. COMMENT "OLAP"
  7. DISTRIBUTED BY HASH(`id`) BUCKETS 1
  8. PROPERTIES (
  9. "replication_num" = "3",
  10. "in_memory" = "false",
  11. "storage_format" = "V2"
  12. );

4.6 启动 Flink Sql Client

 
 
  1. ./bin/sql-client.sh embedded
  2. > set execution.result-mode=tableau;

9b35eefa82e364d424c838b886044ade.png

■ 4.6.1 创建 Flink CDC Mysql 映射表
 
 
  1. CREATE TABLE test_flink_cdc (
  2. id INT,
  3. name STRING,
  4. primary key(id) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'mysql-cdc',
  7. 'hostname' = 'localhost',
  8. 'port' = '3306',
  9. 'username' = 'root',
  10. 'password' = 'password',
  11. 'database-name' = 'demo',
  12. 'table-name' = 'test_cdc'
  13. );

执行查询创建的 Mysql 映射表,显示正常。

 
 
select * from test_flink_cdc;

baa442d3163204f4c64f95bfd579ee36.png

■ 4.6.2 创建 Flink Doris Table 映射表

使用 Doris Flink Connector 创建 Doris 映射表。

 
 
  1. CREATE TABLE doris_test_sink (
  2. id INT,
  3. name STRING
  4. )
  5. WITH (
  6. 'connector' = 'doris',
  7. 'fenodes' = 'localhost:8030',
  8. 'table.identifier' = 'db_audit.doris_test',
  9. 'sink.batch.size' = '2',
  10. 'sink.batch.interval'='1',
  11. 'username' = 'root',
  12. 'password' = ''
  13. )

在命令行下执行上面的语句,可以看到创建表成功,然后执行查询语句,验证是否正常。

 
 
select * from doris_test_sink;

ccdb0c81bfe250ef0d2b24ae418fb8ee.png

执行插入操作,将 Mysql 里的数据通过 Flink CDC 结合 Doris Flink Connector 方式插入到 Doris 中。

 
 
INSERT INTO doris_test_sink select id,name from test_flink_cdc

d98dc669a96a5ef1e7662072c8a22852.png

提交成功之后我们在 Flink 的 Web 界面可以看到相关的 Job 任务信息。

59d5b8c02e60da89e69d3c280816e2ab.png

■ 4.6.3 向 Mysql 表中插入数据

 
 
  1. INSERT INTO test_cdc VALUES (123, 'this is a update');
  2. INSERT INTO test_cdc VALUES (1212, '测试flink CDC');
  3. INSERT INTO test_cdc VALUES (1234, '这是测试');
  4. INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
  5. INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
  6. INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
  7. INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
  8. INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
  9. INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
  10. INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
  11. INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
  12. INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');
■ 4.6.4 观察 Doris 表的数据

首先停掉 Insert into 这个任务,因为我是在本地单机模式,只有一个 task 任务,所以要停掉,然后在命令行执行查询语句才能看到数据。

d11967a54cefa19e1d397a5a2e554998.png

■ 4.6.5 修改 Mysql 的数据

重新启动 Insert into 任务:

9399576219e1aa587da34a95325b3a81.png

修改 Mysql 表里的数据:

 
 
update test_cdc set name='这个是验证修改的操作' where id =123

再去观察 Doris 表中的数据,你会发现已经修改。

注意这里如果要想 Mysql 表里的数据修改,Doris 里的数据也同样修改,Doris 数据表的模型要是 Unique key 模型,其他数据模型 (Aggregate Key 和 Duplicate Key) 不能进行数据的更新操作。

924f5e425c2d53eb102593e2e5317a79.png

■ 4.6.6 删除数据操作

目前 Doris Flink Connector 还不支持删除操作,后面计划会加上这个操作。

更多 Flink CDC 相关技术问题,可扫码加入社区钉钉交流群~

e0beaaf423a0e43335838a9993a9f1ba.png


相关文章

近期热点


08846e80fd4bc806d8002c66faec4d48.png

▼ 关注「Apache Flink」,获取更多技术干货 ▼

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

975f3e7c51b524d8a24c28f19277c9d5.png

 e98a182ed9e97cfe42950b045d61beb8.gif  戳我,查看更多技术干货~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/395967
推荐阅读
相关标签
  

闽ICP备14008679号