当前位置:   article > 正文

Flink 内容分享(八):Flink CDC的概览和使用_flinkcdc获取更新和删除标记

flinkcdc获取更新和删除标记

目录

什么是CDC

什么是Flink CDC

Flink CDC 前生今世

Flink CDC 1.x

Flink CDC 2.x

Flink CDC 3.x

Flink CDC使用

Debezium标准CDC Event格式详解


什么是CDC

CDC(Change Data Capture)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

CDC 可以捕获数据库中的以下类型的数据变化:

  1. 插入(Insert):当新数据被插入到数据库表中时。

  2. 更新(Update):当数据库表中的现有数据被修改时。

  3. 删除(Delete):当数据从数据库表中被删除时

什么是Flink CDC

Flink CDC(Change Data Capture,即数据变更抓取)是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现数据同步、数据管道、实时分析和实时应用等功能。

本质上是一系列的Flink Source Connector集合,用于来获取数据库的实时变更,底层基于Debezium实现。

https://github.com/ververica/flink-cdc-connectors

Flink CDC 前生今世

Flink CDC 1.x

Flink CDC 1.x开启了Flink在CDC上的实践之路,Flink CDC 1.x第一次引入了Debezium框架,利用Debezium已有的能力将数据库实时变更接入到Flink流计算框架中,利用Flink丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC 1.x只能说是可以用,但不能生产上用,为什么:

  1. 1.x版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态

  2. 各方面功能还不够完善,比如自动加表、DDL事件传递等

总体而言Flink CDC 1.x只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。

Flink CDC 2.x

在2.x版本中,Flink CDC引入了Netfix DBLog中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于FLIP-27对Flink Source API的重构,Flink CDC也基于FLIP-27升级到了新的框架设计,至此,Flink CDC被大规模公司使用并投入到生产中。

Flink CDC 2.0 正式发布,详解核心改进-阿里云开发者社区

 

Flink CDC 3.x

近期,Flink CDC发布了全新的3.0版本,并宣布捐赠回Flink主项目,在新的3.0版本中,Flink CDC对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的Flink Source Connector转变为了Data Integration Engine,未来将与SeaTunnel、DataX、Chunjun等一系列老牌数据集成项目同台竞技,让我们拭目以待。

 

流计算迎来代际变革:流式湖仓 Flink + Paimon 加速落地、Flink CDC 重磅升级_大数据_Apache Flink_InfoQ写作社区

Flink CDC使用

  1. 在本地启动一个MySQL的Docker环境

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4
  1. 创建表

  1. create database cdc_test;
  2. use cdc_test;
  3. create table cdc_table (
  4.     id int primary key auto_increment,
  5.     name varchar(1000),
  6.     age int
  7. );
  • 在idea中新建一个Java项目

  • 导入依赖:

  1. <flink-cdc.version>2.4.2</flink-cdc.version>
  2. <flink.version>1.16.3</flink.version>
  3. <logback.version>1.2.7</logback.version>
  4. <dependency>
  5.     <groupId>com.ververica</groupId>
  6.     <artifactId>flink-connector-mysql-cdc</artifactId>
  7.     <version>${flink-cdc.version}</version>
  8. </dependency>
  9. <dependency>
  10.     <groupId>org.apache.flink</groupId>
  11.     <artifactId>flink-connector-base</artifactId>
  12.     <version>${flink.version}</version>
  13. </dependency>
  14. <dependency>
  15.     <groupId>org.apache.flink</groupId>
  16.     <artifactId>flink-clients</artifactId>
  17.     <version>${flink.version}</version>
  18. </dependency>
  19. <dependency>
  20.     <groupId>org.apache.flink</groupId>
  21.     <artifactId>flink-table-runtime</artifactId>
  22.     <version>${flink.version}</version>
  23. </dependency>
  24. <dependency>
  25.     <groupId>org.apache.flink</groupId>
  26.     <artifactId>flink-runtime-web</artifactId>
  27.     <version>${flink.version}</version>
  28. </dependency>
  29. <dependency>
  30.     <groupId>ch.qos.logback</groupId>
  31.     <artifactId>logback-classic</artifactId>
  32.     <version>${logback.version}</version>
  33. </dependency>
  • 编写代码

  1. public class FlinkCDCApplication {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         env.enableCheckpointing(60000L);
  6.         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  7.             .hostname("localhost")
  8.             .port(3306)
  9.             .databaseList("cdc_test"// set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
  10.             .tableList("cdc_test.cdc_table"// set captured table
  11.             .username("root")
  12.             .password("debezium")
  13.             .includeSchemaChanges(true)
  14.             .startupOptions(StartupOptions.latest())
  15.             .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  16.             .build();
  17.         env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
  18.             .print();
  19.         env.execute();
  20.     }
  21. }
  • 添加日志配置

  1. <!--
  2.   ~ Licensed to the Apache Software Foundation (ASF) under one or more
  3.   ~ contributor license agreements.  See the NOTICE file distributed with
  4.   ~ this work for additional information regarding copyright ownership.
  5.   ~ The ASF licenses this file to You under the Apache License, Version 2.0
  6.   ~ (the "License"); you may not use this file except in compliance with
  7.   ~ the License.  You may obtain a copy of the License at
  8.   ~
  9.   ~    http://www.apache.org/licenses/LICENSE-2.0
  10.   ~
  11.   ~ Unless required by applicable law or agreed to in writing, software
  12.   ~ distributed under the License is distributed on an "AS IS" BASIS,
  13.   ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.   ~ See the License for the specific language governing permissions and
  15.   ~ limitations under the License.
  16.   -->
  17. <configuration>
  18.        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  19.           <encoder>
  20.              <pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern>
  21.           </encoder>
  22.        </appender>
  23.        <root level="INFO">
  24.           <appender-ref ref="STDOUT" />
  25.        </root>
  26. </configuration>

Debezium标准CDC Event格式详解

  1. {
  2.     "before"null,
  3.     "after": {
  4.         "id"1,
  5.         "name""xing.yu",
  6.         "age"26,
  7.         "new_column""dewu"
  8.     },
  9.     "source": {
  10.         "version""1.9.7.Final",
  11.         "connector""mysql",
  12.         "name""mysql_binlog_source",
  13.         "ts_ms"1702723640000,
  14.         "snapshot""false",
  15.         "db""cdc_test",
  16.         "sequence"null,
  17.         "table""cdc_table",
  18.         "server_id"223344,
  19.         "gtid"null,
  20.         "file""mysql-bin.000003",
  21.         "pos"2394,
  22.         "row"0,
  23.         "thread"39,
  24.         "query"null
  25.     },
  26.     "op""c",
  27.     "ts_ms"1702723640483,
  28.     "transaction"null
  29. }
  30. {
  31.     // 表数据更新前的值,update/delete
  32.     "before": {},
  33.     // 表数据更新后的值,create/update
  34.     "after": {},
  35.     // 元数据信息
  36.     "source": {},
  37.     // 操作类型 c/d/u
  38.     "op""",
  39.     // 记录解析时间
  40.     "ts_ms""",
  41.     "transaction"""
  42. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/723276
推荐阅读
相关标签
  

闽ICP备14008679号