当前位置:   article > 正文

Flink CDC 2.2.0同步Mysql数据到Hudi数据湖表实践

scan.incremental.snapshot.enabled

6d12ca9dcecafe5d0abc3fe5ce7b13e0.png全网最全大数据面试提升手册!

目录

  1. 介绍

  2. Deserialization序列化和反序列化

  3. 添加Flink CDC依赖
    3.1 sql-client
    3.2 Java/Scala API

  4. 使用SQL方式同步Mysql数据到Hudi数据湖
    4.1 Mysql表结构和数据
    4.2 Flink开启checkpoint
    4.3 在Flink中创建Mysql的映射表
    4.4 在Flink中创建Hudi Sink的映射表
    4.5 流式写入Hudi

1. 介绍

Flink CDC底层是使用Debezium来进行data changes的capture

特色:

  1. snapshot能并行读取。根据表定义的primary key中的第一列划分成chunk。如果表没有primary key,需要通过参数scan.incremental.snapshot.enabled关闭snapshot增量读取

  2. snapshot读取时的checkpoint粒度为chunk

  3. snapshot读取不需要global read lock(FLUSH TABLES WITH READ LOCK)

  4. reader读取snapshot和binlog的一致性过程:

  • 标记当前的binlog position为low

  • 多个reader读取各自的chunk

  • 标记当前的binlog position为high

  • 一个reader读取low ~ high之间的binlog

  • 一个reader读取high之后的binlog

2. Deserialization序列化和反序列化

下面用json格式,展示了change event

  1. {
  2.   "before": {
  3.     "id"111,
  4.     "name""scooter",
  5.     "description""Big 2-wheel scooter",
  6.     "weight"5.18
  7.   },
  8.   "after": {
  9.     "id"111,
  10.     "name""scooter",
  11.     "description""Big 2-wheel scooter",
  12.     "weight"5.15
  13.   },
  14.   "source": {...},
  15.   "op""u",  // operation type, u表示这是一个update event 
  16.   "ts_ms"1589362330904,  // connector处理event的时间
  17.   "transaction": null
  18. }

在DataStrea API中,用户可以使用Constructor:JsonDebeziumDeserializationSchema(true),在message中包含schema。但是不推荐使用

JsonDebeziumDeserializationSchema也可以接收JsonConverter的自定义配置。如下示例在output中包含小数的数据

  1. Map<String, Object> customConverterConfigs = new HashMap<>();
  2.  customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
  3.  JsonDebeziumDeserializationSchema schema = 
  4.       new JsonDebeziumDeserializationSchema(true, customConverterConfigs);

3. 添加Flink CDC依赖

3.1 sql-client

集成步骤如下:

  1. 从github flink cdc下载flink-sql-connector-mysql-cdc-2.2.0.jar包

  2. 将jar包放到Flink集群所有服务器的lib目录下

  3. 重启Flink集群

  4. 启动sql-client.sh

3.2 Java/Scala API

添加如下依赖到pom.xml中

  1. <dependency>
  2.     <groupId>com.ververica</groupId>
  3.     <artifactId>flink-connector-mysql-cdc</artifactId>
  4.     <version>2.2.0</version>
  5. </dependency>

4. 使用SQL方式同步Mysql数据到Hudi数据湖

4.1 Mysql表结构和数据

建表语句如下:

  1. CREATE TABLE `info_message` (
  2.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.   `msg_title` varchar(100) DEFAULT NULL COMMENT '消息名称',
  4.   `msg_ctx` varchar(2048) DEFAULT NULL COMMENT '消息内容',
  5.   `msg_time` datetime DEFAULT NULL COMMENT '消息发送时间',
  6.   PRIMARY KEY (`id`)
  7. )

部分数据内容如下:

  1. mysql> 
  2. mysql> select * from d_general.info_message limit 3;
  3. +--------------------+-----------+-------------------------------------------------------+---------------------+
  4. | id                 | msg_title | msg_ctx                                               | msg_time            |
  5. +--------------------+-----------+-------------------------------------------------------+---------------------+
  6. |         1          |   title1  |                         content1                      | 2019-03-29 15:27:21 |
  7. |         2          |   title2  |                         content2                      | 2019-03-29 15:38:36 |
  8. |         3          |   title3  |                         content3                      | 2019-03-29 15:38:36 |
  9. +--------------------+-----------+-------------------------------------------------------+---------------------+
  10. 3 rows in set (0.00 sec)
  11. mysql>
4.2 Flink开启checkpoint
  • Checkpoint默认是不开启的,开启Checkpoint让Hudi可以提交事务

  • 并且mysql-cdc在binlog读取阶段开始前,需要等待一个完整的checkpoint来避免binlog记录乱序的情况

  • binlog读取的并行度为1,checkpoint的粒度为数据行级别

  • 可以在任务失败的情况下,达到Exactly-once语义

  1. Flink SQL> set 'execution.checkpointing.interval' = '10s';
  2. [INFO] Session property has been set.
  3. Flink SQL>
4.3 在Flink中创建Mysql的映射表
  1. Flink SQL> create table mysql_source(
  2. > database_name string metadata from 'database_name' virtual,
  3. > table_name string metadata from 'table_name' virtual,
  4. > id decimal(20,0) not null,
  5. > msg_title string,
  6. > msg_ctx string,
  7. > msg_time timestamp(9),
  8. > primary key (id) not enforced
  9. > ) with (
  10. >     'connector' = 'mysql-cdc',
  11. >     'hostname' = '192.168.8.124',
  12. >     'port' = '3306',
  13. >     'username' = 'hnmqet',
  14. >     'password' = 'hnmq123456',
  15. 'server-time-zone' = 'Asia/Shanghai',
  16. 'scan.startup.mode' = 'initial',
  17. >     'database-name' = 'd_general',
  18. >     'table-name' = 'info_message'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL>

说明如下:

  • Flink的table中添加了两个metadata列。还可以定义op_ts列,类型为TIMESTAMP_LTZ(3),表示binlog在数据库创建的时间,如果是snapshot,则值为0

  • 如果Mysql中有很多个列,这里只获取Flink Table中定义的列

  • Mysql的用户需要的权限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT

  • server-time-zone: Mysql数据库的session time zone,用来控制如何将Mysql的timestamp类型转换成string类型

  • scan.startup.mode:mysql-cdc启动时消费的模式,initial表示同步snapshot和binlog,latest-offset表示同步最新的binlog

  • database-name和table-name可以使用正则表达式匹配多个数据库和多个表,例如"d_general[0-9]+"可以匹配d_general0、d_general999等

4.4 在Flink中创建Hudi Sink的映射表
  1. Flink SQL> create table hudi_sink(
  2. > database_name string,
  3. > table_name string,
  4. > id decimal(20,0) not null,
  5. > msg_title string,
  6. > msg_ctx string,
  7. > msg_time timestamp(6),
  8. > primary key (database_name, table_name, id) not enforced
  9. > ) with (
  10. >     'connector' = 'hudi',
  11. 'path' = 'hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
  12. 'table.type' = 'MERGE_ON_READ',
  13. 'hoodie.datasource.write.recordkey.field' = 'database_name.table_name.id',
  14. 'write.precombine.field' = 'msg_time',
  15. 'write.rate.limit' = '2000',
  16. 'write.tasks' = '2',
  17. 'write.operation' = 'upsert',
  18. 'compaction.tasks' = '2',
  19. 'compaction.async.enabled' = 'true',
  20. 'compaction.trigger.strategy' = 'num_commits',
  21. 'compaction.delta_commits' = '5',
  22. 'read.tasks' = '2',
  23. 'changelog.enabled' = 'true'
  24. > );
  25. [INFO] Execute statement succeed.
  26. Flink SQL>

说明如下:

  • 不同数据库和表的id字段可能会相同,定义复合主键

  • hoodie.datasource.write.recordkey.field:默指定表的主键,多个字段用.分隔。认为uuid字段

  • 如果upstream不能保证数据的order,则需要显式指定write.precombine.field,且选取的字段不能包含null。默认为ts字段。作用是如果在一个批次中,有两条key相同的数据,取较大的precombine数据,插入到Hudi中

  • write.rate.limit:每秒写入数据的条数,默认为0表示不限制

  • 默认write的并行度为4

  • write.operation:默认是upsert

  • 默认compaction的并行度为4

  • compaction.async.enabled:是否开启online compaction,默认为true

  • compaction.trigger.strategy:compaction触发的策略,可选值:num_commits、time_elapsed、num_and_time、num_or_time,默认值为num_commits

  • compaction.delta_commits:每多少次commit进行一次compaction,默认值为5

  • MOR类型的表,还不能处理delete,所以会导致数据不一致。可以通过changelog.enabled转换到change log模式

4.5 流式写入Hudi

先同步snapshot,再同步事务日志

  1. Flink SQL> insert into hudi_sink select database_name, table_name, id, msg_title, msg_ctx, msg_time from mysql_source /*+ OPTIONS('server-id'='5401') */ where msg_time is not null;
  2. [INFO] Submitting SQL update statement to the cluster...
  3. [INFO] SQL update statement has been successfully submitted to the cluster:
  4. Job ID: afa575f5451af65d1ee7d225d77888ac
  5. Flink SQL>
  • 注意:这里如果where条件如果添加了"msg_time > timestamp ‘2021-04-14 09:49:00’",任务会一直卡在write_stream这一步,write_stream的状态一直是bush(max): 100%,并且checkpoint也会一直卡住,查看HDFS上的表是没有数据

  • 默认查询的并行度是1。如果并行度大于1,需要为每个slot设置server-id,4个slot的设置方法为:'server-id'='5401-5404'。这样Mysql server就能正确维护network connection和binlog position

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

e695864a442955c4ae39fc8ea9137683.png

cef7011800f44c3b8d47c967f9f532fd.jpeg

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/598274
推荐阅读
相关标签
  

闽ICP备14008679号