当前位置:   article > 正文

2024年最新Flink实时数仓同步:快照表实战详解(2),2024年最新面向大数据开发开发者的复习指南_com.ververica.cdc.connectors.mysql.source.mysqlsou

com.ververica.cdc.connectors.mysql.source.mysqlsource

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource mySqlSource = MySqlSource.builder()
.hostname(“yourHostname”)
.port(yourPort)
.databaseList(“yourDatabaseName”) // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 “.*”.
.tableList(“yourDatabaseName.yourTableName”) // 设置捕获的表
.username(“yourUsername”)
.password(“yourPassword”)
.startupOptions(StartupOptions.timestamp(1685548800000L)) // 从2023-06-01零点处读取binlog
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置 3s 的 checkpoint 间隔
env.enableCheckpointing(3000);

env
  .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
  // 设置 source 节点的并行度为 4
  .setParallelism(4)
  .print().setParallelism(1); // 设置 sink 节点并行度为 1 

env.execute("Print MySQL Snapshot + Binlog");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

}
}



> 
> 代码摘自[mysql-cdc-connector官网示例]( )
> 
> 
> 


#### 5.2.2、全量同步阶段


1. 接下来我们将从全量同步开始逐步演示同步过程,这里我们以2023-06-0日的[Mysql]业务数据为例,此时表数据如下:




| id | name | phone | gender | create\_time | update\_time |
| --- | --- | --- | --- | --- | --- |
| 1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |


2. 此时Flink应用启动获取到的数据如下:仅展示一条



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

{
“before”: null,
“after”: { # 实际数据
“id”: 1,
“name”: “jack”,
“phone”: “111”,
“gender”: “男”,
“create_time”: “2023-06-01T05:00:00Z”, # 该日期是UTC时间,只需增加8小时即可转化为北京时间
“update_time”: “2023-06-01T05:00:00Z” # 该日期是UTC时间,只需增加8小时即可转化为北京时间
},
“source”: { # 元数据
“version”: “1.6.4.Final”,
“connector”: “mysql”,
“name”: “mysql_binlog_source”,
“ts_ms”: 0,
“snapshot”: “false”,
“db”: “yushu_dds”,
“sequence”: null,
“table”: “user”,
“server_id”: 0,
“gtid”: null,
“file”: “”,
“pos”: 0,
“row”: 0,
“thread”: null,
“query”: null
},
“op”: “r”, # 记录每条数据的操作类型[重要]
“ts_ms”: 1705471382867,
“transaction”: null
}


3. 在我们使用 Flink CDC MySQL 同步数据时,默认采用 `initial` 模式,这意味着首先进行全量同步,然后再进行增量同步。因此,在区分全量和增量同步时,关键在于观察获取到的数据中的 `op` 字段。`op` 字段是用来记录每条数据的操作类型的标志。具体的操作类型如下:


	* `op=d` 代表删除操作
	* `op=u` 代表更新操作
	* `op=c` 代表新增操作
	* `op=r` 代表全量读取,而不是来自 binlog 的增量读取
4. 在 Flink 程序中,只需要通过 `op=r` 即可筛选出全量数据。在全量数据同步阶段只需将`op=r`的业务数据直接同步至快照表(之所以全量数据同步至快照表是为了次日凌晨与流水表变更数据合并成完整数据),流水表在全量阶段无需同步,导入语句如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

INSERT INTO example_user_snapshot (id, dt, name, phone, gender, create_time, update_time)
VALUES
(1, ‘2023-06-01’, ‘jack’, 111, ‘男’, ‘2023-06-01 13:00:00’, ‘2023-06-01 13:00:00’),
(2, ‘2023-06-01’, ‘jason’, 222, ‘男’, ‘2023-06-01 13:00:00’, ‘2023-06-01 13:00:00’),
(3, ‘2023-06-01’, ‘tom’, 333, ‘男’, ‘2023-06-01 13:00:00’, ‘2023-06-01 13:00:00’);


5. 此时doris快照表数据如下所示:




| id | dt | name | phone | gender | create\_time | update\_time |
| --- | --- | --- | --- | --- | --- | --- |
| 1 | 2023-06-01 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | 2023-06-01 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | 2023-06-01 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |


6. 此时doris流水表数据如下所示:全量阶段流水表无需同步




| id | update\_time | dt | create\_time | name | phone | gender | op | before | binlog |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |


#### 5.2.3、增量同步阶段


1. 这里我们以2023-06-02日的[Mysql]业务数据为例,新增了一名tony用户,且更改了tom的手机号,此时表数据如下:




| id | name | phone | gender | create\_time | update\_time |
| --- | --- | --- | --- | --- | --- |
| 1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | tom | **444** | 男 | 2023-06-01 13:00:00 | **2023-06-02 09:00:00** |
| **4** | **tony** | **555** | **男** | **2023-06-02 10:00:00** | **2023-06-02 10:00:00** |


2. 此时Flink应用获取到的数据如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

新增tony变更数据如下

{
“before”: null,
“after”: {
“id”: 4,
“name”: “tony”,
“phone”: “666”,
“gender”: “男”,
“create_time”: “2023-06-02T02:00:00Z”,
“update_time”: “2023-06-02T02:00:00Z”
},
“source”: {
# 元数据信息忽略
},
“op”: “c”, # 操作类型
“ts_ms”: 1706768344113,
“transaction”: null
}

tom手机号333->444变更数据如下

{
“before”: {
“id”: 3,
“name”: “tom”,
“phone”: “333”,
“gender”: “男”,
“create_time”: “2023-06-01T05:00:00Z”,
“update_time”: “2023-06-01T05:00:00Z”
},
“after”: {
“id”: 3,
“name”: “tom”,
“phone”: “444”,
“gender”: “男”,
“create_time”: “2023-06-01T05:00:00Z”,
“update_time”: “2023-06-01T23:00:00Z”
},
“source”: {
# 元数据信息忽略
},
“op”: “u”, # 操作类型
“ts_ms”: 1706768454904,
“transaction”: null
}


3. 当 Flink 同步程序接收到 `op=c/u/d` 表示增量更新数据时,提取其中的 `op`、`before` 和 `after` 数据。接着将这些信息拼装成 Doris 的 `INSERT` 语句后插入到流水表中,此时流水表数据如下所示:




| id | update\_time | dt | create\_time | name | phone | gender | op | before | binlog |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| 4 | 2023-06-02 10:00:00 | 2023-06-02 | 2023-06-02 10:00:00 | tony | 555 | 男 | c | NULL | {“before”:null,“after”:{“id”:4,“name”:“tony”,“phone”:“666”,“gender”:“男”,“create\_time”:“2023-06-02T02:00:00Z”,“update\_time”:“2023-06-02T02:00:00Z”},“source”:{“version”:“1.6.4.Final”,“connector”:“mysql”,“name”:“mysql\_binlog\_source”,“ts\_ms”:1706768344000,“snapshot”:“false”,“db”:“yushu\_dds”,“sequence”:null,“table”:“user”,“server\_id”:2307031958,“gtid”:“71221bfd-56e8-11ee-8275-fa163e4ecceb:33719321”,“file”:“3509-binlog.000191”,“pos”:643757739,“row”:0,“thread”:null,“query”:null},“op”:“c”,“ts\_ms”:1706768344113,“transaction”:null} |
| 3 | 2023-06-02 08:00:00 | 2023-06-02 | 2023-06-02 13:00:00 | tom | 444 | 男 | u | {“id”:3,“name”:“tom”,“phone”:“333”,“gender”:“男”,“create\_time”:“2023-06-01T05:00:00Z”,“update\_time”:“2023-06-01T05:00:00Z”} | {“before”:{“id”:3,“name”:“tom”,“phone”:“333”,“gender”:“男”,“create\_time”:“2023-06-01T05:00:00Z”,“update\_time”:“2023-06-01T05:00:00Z”},“after”:{“id”:3,“name”:“tom”,“phone”:“444”,“gender”:“男”,“create\_time”:“2023-06-01T05:00:00Z”,“update\_time”:“2023-06-01T23:00:00Z”},“source”:{“version”:“1.6.4.Final”,“connector”:“mysql”,“name”:“mysql\_binlog\_source”,“ts\_ms”:1706768454000,“snapshot”:“false”,“db”:“yushu\_dds”,“sequence”:null,“table”:“user”,“server\_id”:2307031958,“gtid”:“71221bfd-56e8-11ee-8275-fa163e4ecceb:33719761”,“file”:“3509-binlog.000191”,“pos”:692873739,“row”:0,“thread”:null,“query”:null},“op”:“u”,“ts\_ms”:1706768454904,“transaction”:null} |


4. 因增量数据无需同步至快照表,故此时快照表与之前06-01号一样保持不变,快照表数据如下:




| id | dt | name | phone | gender | create\_time | update\_time |
| --- | --- | --- | --- | --- | --- | --- |
| 1 | 2023-06-01 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | 2023-06-01 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | 2023-06-01 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |


#### 5.2.4、合并阶段


在合并阶段,我们将流水表前一天的数据与快照表中前两天的数据进行整合,最终得到前一天的全量数据,并将其写入至快照表的前一天分区。


合并任务会在满足以下任意一个条件时触发:


1. 当binlog数据中的日期为第二天。
2. 当凌晨过了5分钟(这是一个自定义的时间阈值)。


第二个条件的存在是因为业务数据很可能在凌晨00:00 ~ 00:05 分之间没有增量数据。因此,即使在没有业务数据同步的情况下,我们仍然可以通过第二个条件触发合并阶段,确保数据的完整性和准确性。




---


1. 这里我们假设2023-06-03 00:05:00 触发合并阶段为例,此时业务数据如下所示:




| id | name | phone | gender | create\_time | update\_time |
| --- | --- | --- | --- | --- | --- |
| 1 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | tom | **444** | 男 | 2023-06-01 13:00:00 | **2023-06-02 09:00:00** |
| **4** | **tony** | **555** | **男** | **2023-06-02 10:00:00** | **2023-06-02 10:00:00** |


2. flink程序中无新增数据,但由于满足第二个触发条件,在flink程序中将会触发合并任务[可用单独线程实现],此时执行的doris合并语句如下:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

INSERT INTO example_user_snapshot (id, dt, name, phone, gender, create_time, update_time)
SELECT
id,
‘2023-06-02’ as dt, – 通过固定dt字段值从而写入快照表p20230602分区中
name,
phone,
gender,
create_time,
update_time
FROM (
SELECT
snap.id,
snap.name,
snap.phone,
snap.gender,
snap.create_time,
snap.update_time
FROM example_user_snapshot PARTITION p20230601 snap
LEFT JOIN example_user_stream PARTITION p20230602 stream ON snap.id = stream.id
WHERE stream.id IS NULL
UNION
SELECT
id,
name,
phone,
gender,
create_time,
update_time
FROM (
SELECT
id,
name,
phone,
gender,
create_time,
update_time,
– 使用窗口函数的目的是处理流水表中可能存在多条相同id的记录,例如tom在06-02日更改多次手机号则会有多条相同id的数据,故此窗口函数用于确保选择每个id对应的update_time最大的记录;如果流水表设计的unique key = (id) 则不会出现重复情况无需此处的窗口函数。
ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC) AS row_num
FROM example_user_stream PARTITION p20230602
) ranked
WHERE row_num = 1
) AS temp;



> 
> 该 SQL 查询是先获取两表联接中未更新的数据,与已更新的数据合并,最后写入到快照表中,确保了 `2023-06-02` 分区的数据是完整的全量数据。
> 
> 
> 若想详细剖析此sql的运算逻辑可参考笔者另一篇文章:[数仓日常维护:剖析每日增量同步的内部机制]( )
> 
> 
> 


3. 此时快照表的数据如下:




| id | dt | name | phone | gender | create\_time | update\_time |
| --- | --- | --- | --- | --- | --- | --- |
| 1 | 2024-02-02 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | 2024-02-02 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | 2024-02-02 | tom | 333 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 1 | 2024-02-03 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 2 | 2024-02-03 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | 2024-02-03 | tom | 555 | 男 | 2023-06-02 13:00:00 | 2023-06-02 09:00:00 |
| 4 | 2024-02-03 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |


4. 用户可以通过如下语句查询2023-06-02全量数据:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

SELECT * FROM example_user_snapshot PARTITION p20230602;




| 1 | 2024-02-03 | jack | 111 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| --- | --- | --- | --- | --- | --- | --- |
| 2 | 2024-02-03 | jason | 222 | 男 | 2023-06-01 13:00:00 | 2023-06-01 13:00:00 |
| 3 | 2024-02-03 | tom | 555 | 男 | 2023-06-02 13:00:00 | 2023-06-02 09:00:00 |
| 4 | 2024-02-03 | tony | 555 | 男 | 2023-06-02 10:00:00 | 2023-06-02 10:00:00 |



> 
> 合并阶段的主要压力是Doris,Flink程序只是传递sql执行后获取结果即可;至此实时快照表同步逻辑结束。
> 
> 
> 


### 5.3、数据一致性设计


在上述快照表同步过程中,如果Flink程序挂掉或者重启,是否会影响数据一致性?由于Flink程序是通过定时执行checkpoint且binlog可重读溯源,因此在数据获取阶段不会出现数据一致性问题。


需要考虑的地方在于合并阶段,如果触发了合并任务,而此时Flink程序还在不断消费业务变更数据,这里是异步还是阻塞?笔者建议使用异步:即Flink程序仍实时同步业务变更数据至流水表,而快照表的合并阶段主要是下沉到Doris库中执行。


需要注意的是如果在合并阶段时Flink程序挂掉,重启后该如何处理?笔者建议在Flink程序中采用有状态的计算,即`Rich functions` 富函数中的`ValueState`,用于记录当前合并阶段是否成功,如下:


![img](https://img-blog.csdnimg.cn/img_convert/01ed9df58d6a75894885cb9ada83604e.png)
![img](https://img-blog.csdnimg.cn/img_convert/7903168f8c894570000962d8f7d5f44c.png)
![img](https://img-blog.csdnimg.cn/img_convert/5a8a8a3b5f2503582147fe858cf063f2.png)

**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!**

**由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新**

**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**

Rich functions` 富函数中的`ValueState`,用于记录当前合并阶段是否成功,如下:


[外链图片转存中...(img-AvfUXShi-1714908489888)]
[外链图片转存中...(img-jymjEzRx-1714908489888)]
[外链图片转存中...(img-S3QVqlek-1714908489888)]

**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!**

**由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新**

**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号