赞
踩
CDC 即 Change Data Capture
变更数据捕获,为Flink 1.11中一个新增功能。我们可以通过 CDC 得知数据源表的更新内容(包含Insert Update 和 Delete),并将这些更新内容作为数据流发送到下游系统。捕获到的数据操作具有一个标识符,分别对应数据的增加,修改和删除。
> +I:新增数据。
> -U:一条数据的修改会产生两个U 标识符数据。其中-U 含义为修改前数据。
> +U:修改之后的数据。
> -D:删除的数据。
在我的文章 Flink CDC 中可以更详细的理解 CDC
接下来以 MySQL CDC 为例,熟悉配置 Flink MySQL CDC。
在使用CDC之前务必要开启 MySQL 的 binlog。下面以 MySQL 5.7 版本为例说明。
修改my.cnf
文件,增加:
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
到这里MySQL环境已经配置完毕。接下来开始准备测试表和数据。
createdatabase demo characterset utf8mb4;
use demo;
createtable student(`id`int primary key, `name`varchar(128), `age`int);
这里创建了演示数据库 demo 和一张 student 表。
到这一步我们开始使用 Flink 程序来获取 CDC 数据流。
首先需要引入依赖。注意:本文参考 ververica 官网 的CDC 教程https://ververica.github.io/flink-cdc-connectors/master/content/about.html
, 但是ververica 被阿里收购后, groupId 变成了 com.alibaba.ververica
, 这个地方需要注意下。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
然后使用Table API编写程序。这里我们仅仅将CDC数据流配置为数据源,然后将CDC数据流的内容打印出来。
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用MySQLSource创建数据源
// 同时指定StringDebeziumDeserializationSchema,将CDC转换为String类型输出
val sourceFunction = MySQLSource.builder()
.hostname("your-ip")
.port(3306)
.databaseList("demo")
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema).build();
// 单并行度打印,避免输出乱序
env.addSource(sourceFunction).print.setParallelism(1)
env.execute()
此时我们插入一条数据:
insert into student values(2, 'kate', 28);
可以看到程序有如下输出:
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1618390979, file=mysql-bin.000003, pos=885, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.demo.student', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.demo.student.Key:STRUCT}, value=Struct{after=Struct{id=2,name=kate,age=28},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1618390979000,db=demo,table=student,server_id=1,file=mysql-bin.000003,pos=1011,row=0,thread=2},op=c,ts_ms=1618391175254}, valueSchema=Schema{mysql_binlog_source.demo.student.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
接下来我们使用更为简洁的SQL方式。
首先引入Flink SQL必须的依赖。需要注意的是,这里使用blink planner。本例子中使用Scala语言编写,所以引入了Scala相关依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.3.0</version> </dependency>
编写如下所示的程序代码:
// 创建Blink Streaming的TableEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment = TableEnvironment.create(bsSettings)
// 创建表,connector使用mysql-cdc
tableEnvironment.executeSql("CREATE TABLE mysql_binlog (id INT NOT NULL, name STRING, age INT) WITH ('connector' = 'mysql-cdc', 'hostname' = '10.180.210.135', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'demo', 'table-name' = 'student')")
// 创建下游数据表,这里使用print类型的connector,将数据直接打印出来
tableEnvironment.executeSql("CREATE TABLE sink_table (id INT NOT NULL, name STRING, age INT) WITH ('connector' = 'print')")
// 将CDC数据源和下游数据表对接起来
tableEnvironment.executeSql("INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog")
接下来可以执行insert
语句插入数据,控制台会打印出数据的变化。
例如我们依次执行:
insertinto student values(1,'paul',20);
update student set age=30whereid=1;
deletefrom student whereid=1;
在控制台可以得到如下输出:
+I(1,paul,20)
-U(1,paul,20)
+U(1,paul,30)
-D(1,paul,30)
相比较创建一个 Java 项目以 jar 包的方式创建作业,Fllink提供了一个更为简单的方式:使用 SQL Client。接下来我们开始配置SQL Client环境。
在 Flink SQL Client 使用 CDC 功能之前,我们需要将相关依赖放入 Flink lib
目录。
访问https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc/
,下载flink-connector-mysql-cdc
jar包,复制到flink安装位置的lib
目录中。
这里SQL Client在standalone集群上运行。
官网配置方式链接:https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#getting-started,简单来说是执行Flink安装目录如下两个命令:
./bin/start-cluster.sh
./bin/sql-client.sh embedded
如果没有问题,此时可以进入SQL Client。
执行如下SQL(和上一章"使用SQL"使用的语句相同):
CREATE TABLE mysql_binlog ( id INT NOT NULL, name STRING, age INT ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'demo', 'table-name' = 'student' ); CREATE TABLE sink_table ( id INT NOT NULL, name STRING, age INT ) WITH ( 'connector' = 'print' ); INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog;
然后在MySQL命令行执行些insert语句插入数据。需要注意的是sink_table
的输出是无法在SQL client上面查看的。需要打开Flink Web UI的Task Managers页面的stdout标签。可以找到类似如下输出:
+I(1,paul,20)
-U(1,paul,20)
+U(1,paul,30)
-D(1,paul,30)
Flink 已经成功捕获到MySQL的数据变更。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。