赞
踩
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文详细的介绍了Flink CDC的应用,并且提供2个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况及验证。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,本文依赖Flink 集群环境、mysql。
本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版
本文介绍的CDC是基于2.4版本,当前版本已经发布至3.0,本Flink 专栏介绍是基于Flink 1.17版本,CDC 2.4版本支持到1.17版本。
Apache Flink®的CDC连接器是用于Apache Flnk®的一组源连接器,使用更改数据捕获(CDC)接收来自不同数据库的更改。Apache Flink®的CDC连接器将Debezium集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。
了解更多关于Debezium的信息。
或者参考:37、Flink 的CDC 格式:debezium部署以及mysql示例
下表显示了连接器的当前功能:
1、需要有一个flink的集群环境
具体搭建参考:2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
2、下载flink cdc的jar并放在FLINK_HOME/lib/目录下面
下载地址:https://github.com/ververica/flink-cdc-connectors/releases
3、重启flink集群
本示例的前提是设置好了binlog,具体设置方式可以参考文章:
37、Flink 的CDC 格式:debezium部署以及mysql示例
Flink SQL> CREATE TABLE mysql_binlog_user (
> id INT NOT NULL,
> name STRING,
> age INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = 'server4',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'cdctest',
> 'table-name' = 'user'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from mysql_binlog_user;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 4 | test456 | 8888 |
| +I | 2 | alanchan | 20 |
| +I | 3 | alanchanchn | 33 |
| +I | 1 | alan | 18 |
| -U | 4 | test456 | 8888 |
| +U | 4 | test123 | 8888 |
| -U | 4 | test123 | 8888 |
| +U | 4 | test123 | 66666 |
| -D | 4 | test123 | 66666 |
| +I | 4 | alanchanchn2 | 100 |
Flink SQL> select name ,sum(age) from mysql_binlog_user group by name;
+----+--------------------------------+-------------+
| op | name | EXPR$1 |
+----+--------------------------------+-------------+
| +I | alanchanchn2 | 100 |
| +I | alanchan | 20 |
| +I | alanchanchn | 33 |
| +I | alan | 18 |
本示例是捕获mysql cdctest库的user表数据变化情况。
使用flink cdc添加如下依赖即可,但flink本身的运行环境相关依赖需要添加。
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.extern.slf4j.Slf4j;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
@Slf4j
public class TestFlinkCDCFromMysqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("server4")
.port(3306)
.databaseList("cdctest") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
.tableList("cdctest.user") // 设置捕获的表
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
.build();
DataStream<String> result = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
log.info(result.toString());
result.map(new MapFunction<String,String>() {
@Override
public String map(String value) throws Exception {
log.info("value ======={}",value);
return value;
}
});
env.execute();
}
}
在程序运行起来后,对cdctest.user表的数据进行添加、修改、删除操作,观察程序控制台日志输出情况
08:50:26.819 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":2,"name":"alanchan","age":20},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626222,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":3,"name":"alanchanchn","age":33},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
08:50:26.821 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":1,"name":"alan","age":18},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626221,"transaction":null}
08:50:26.822 [Source: MySQL Source -> Map (4/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":null,"after":{"id":4,"name":"test456","age":999000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1705884626223,"transaction":null}
一月 22, 2024 8:50:27 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect信息:
Connected to server4:3306 at alan_master_logbin.000004/10816 (sid:6116, cid:565)
08:50:56.030 [Source: MySQL Source -> Map (1/16)#0] INFO com.win.TestFlinkCDCFromMysqlDemo - value ======={"before":{"id":4,"name":"test456","age":999000},"after":{"id":4,"name":"test456","age":8888},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1705884032000,"snapshot":"false","db":"cdctest","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"alan_master_logbin.000004","pos":11010,"row":0,"thread":557,"query":null},"op":"u","ts_ms":1705884655747,"transaction":null}
关于debezium更多的信息可以参考:37、Flink 的CDC 格式:debezium部署以及mysql示例
在flink cdc的版本中,不需要特别对debezium数据格式进行处理,默认的形如下面的内容,也即不带schema的,解析方式参考上例。
{
"before": {
"name": "alan_test",
"scores": 666.0
},
"after": {
"name": "alan_test",
"scores": 888.0
},
"source": {
"version": "1.7.2.Final",
"connector": "mysql",
"name": "ALAN",
"ts_ms": 1705717298000,
"snapshot": "false",
"db": "cdctest",
"sequence": null,
"table": "userscoressink",
"server_id": 1,
"gtid": null,
"file": "alan_master_logbin.000004",
"pos": 4931,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1705717772785,
"transaction": null
}
在某些情况下可能需要带schema的,形如下例,
如果需要解析则需要将JsonDebeziumDeserializationSchema()改成JsonDebeziumDeserializationSchema(true)
一般推荐使用系统默认的,不带schema的数据格式。
{
"schema": {
"type": "struct",
"fields": [{
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "name"
}, {
"type": "double",
"optional": true,
"field": "scores"
}],
"optional": true,
"name": "ALAN.cdctest.userscoressink.Value",
"field": "before"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "name"
}, {
"type": "double",
"optional": true,
"field": "scores"
}],
"optional": true,
"name": "ALAN.cdctest.userscoressink.Value",
"field": "after"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "connector"
}, {
"type": "string",
"optional": false,
"field": "name"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": true,
"field": "sequence"
}, {
"type": "string",
"optional": true,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "server_id"
}, {
"type": "string",
"optional": true,
"field": "gtid"
}, {
"type": "string",
"optional": false,
"field": "file"
}, {
"type": "int64",
"optional": false,
"field": "pos"
}, {
"type": "int32",
"optional": false,
"field": "row"
}, {
"type": "int64",
"optional": true,
"field": "thread"
}, {
"type": "string",
"optional": true,
"field": "query"
}],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
}, {
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": true,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [{
"type": "string",
"optional": false,
"field": "id"
}, {
"type": "int64",
"optional": false,
"field": "total_order"
}, {
"type": "int64",
"optional": false,
"field": "data_collection_order"
}],
"optional": true,
"field": "transaction"
}],
"optional": false,
"name": "ALAN.cdctest.userscoressink.Envelope"
},
"payload": {
"before": {
"name": "alan_test",
"scores": 666.0
},
"after": {
"name": "alan_test",
"scores": 888.0
},
"source": {
"version": "1.7.2.Final",
"connector": "mysql",
"name": "ALAN",
"ts_ms": 1705717298000,
"snapshot": "false",
"db": "cdctest",
"sequence": null,
"table": "userscoressink",
"server_id": 1,
"gtid": null,
"file": "alan_master_logbin.000004",
"pos": 4931,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1705717772785,
"transaction": null
}
}
以上,本文详细的介绍了Flink CDC的应用,并且提供2个示例进行说明如何使用,即使用Flink sql client的观察数据同步的情况、通过DataStream API 捕获数据变化情况及验证。
本专题分为以下几篇文章:
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-Streaming ELT介绍及示例(2)
60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-完整版
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。