赞
踩
Flink CDC主要关注于从源数据库(如MySQL、PostgreSQL等)捕获数据变更,并将这些变更实时地提供给Flink作业进行处理。Flink CDC的核心优势在于其实时性和一致性。通过捕获数据库的增量变动记录,Flink CDC能够实时地将这些变更数据同步到Flink流处理作业中,从而实现低延迟的数据处理和分析。同时,Flink CDC还保证了数据的一致性,确保在数据处理过程中数据的准确性和完整性。
为了实现这一功能,Flink社区开发了flink-cdc-connectors组件。这是一个可以直接从MySQL、PostgreSQL等数据库读取全量数据和增量变更数据的source组件。通过配置相应的连接器和参数,Flink作业可以连接到源数据库,并实时捕获和处理数据变更。
在使用CDC之前务必要开启MySQl的binlog。
修改my.cnf文件,增加:
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=15
binlog_do_db=testdb
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
创建表
CREATE TABLE `userdemo` (
`user_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_general_ci',
`user_name` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
`age` INT(11) NULL DEFAULT '0',
PRIMARY KEY (`user_id`) USING BTREE
)
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Date: 2024/3/12 10:03
* @Description DataStream API CDC
**/
public class FlinkMysqlCdc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.hostname("10.168.192.70")
.port(3306)
.username("root")
.password("XXXXX")
.databaseList("testdb")
// 这里一定要是db.table的形式
.tableList("testdb.userdemo")
.serverTimeZone("GMT+8")
// .deserializer(new StringDebeziumDeserializationSchema())
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.print();
env.execute("FlinkDSCDC");
}
}
运行程序输出内容入下:
{"before":null,"after":{"user_id":"001","user_name":"sdaf","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"true","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834716,"transaction":null}
{"before":null,"after":{"user_id":"002","user_name":"DSF","age":35},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"last","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834720,"transaction":null}
插入数据
INSERT INTO userdemo (user_id,user_name,age) VALUES('004','wangwu',26);
{"before":null,"after":{"user_id":"004","user_name":"wangwu","age":26},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235648000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":649,"row":0,"thread":7,"query":null},"op":"c","ts_ms":1710235647380,"transaction":null}
修改userdemo数据
UPDATE userdemo SET user_name='zhangsan' WHERE user_id='001'
运行结果如下:
{"before":{"user_id":"001","user_name":"sdaf","age":23},"after":{"user_id":"001","user_name":"zhangsan","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235526000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":352,"row":0,"thread":7,"query":null},"op":"u","ts_ms":1710235525246,"transaction":null}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.*;
public class MyFlinkCDCJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 配置 MySQL CDC 源
String sourceDDL = "CREATE TABLE my_table (" +
" id INT NOT NULL," +
" name STRING," +
" age INT," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'your_mysql_hostname'," +
" 'port' = '3306'," +
" 'username' = 'your_username'," +
" 'password' = 'your_password'," +
" 'database-name' = 'your_database_name'," +
" 'table-name' = 'your_table_name'" +
")";
tableEnv.executeSql(sourceDDL);
// 定义 Flink 作业逻辑
Table result = tableEnv.sqlQuery("SELECT * FROM my_table");
tableEnv.toRetractStream(result, Row.class).print();
// 执行作业
env.execute("My Flink CDC Job");
}
}
在上面的代码中,我们创建了一个名为 my_table
的表,该表通过 MySQL CDC 连接器连接到 MySQL 数据库。然后,我们执行一个 SQL 查询来选择这个表中的所有数据,并将结果打印到控制台。
请注意,你需要替换示例代码中的 'your_mysql_hostname'
, 'your_username'
, 'your_password'
, 'your_database_name'
, 和 'your_table_name'
为你的实际 MySQL 数据库信息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。