当前位置:   article > 正文

Flink CDC 基于mysql binlog 实时同步mysql表_idea mysql cdc 无法实时同步

idea mysql cdc 无法实时同步

环境说明:

flink 1.15.2

mysql 版本5.7    注意:需要开启binlog,因为增量同步是基于binlog捕获数据

windows11 IDEA 本地运行

先上官网使用说明和案例:MySQL CDC Connector — Flink CDC documentation

1. mysql开启binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人测试是捕获不到binlog日志的,增量相当于没用,不知道是不是ndbcluster 下的binlog 配置是否有问题,但是同一集群下,InnoDB的表就可以捕获到binlog日志。听朋友说,ndbcluster 是内存型引擎,有可能不会实时写日志到磁盘,所以捕获不到.....)

# 判断MySQL是否已经开启binlog   on  为打开状态
SHOW VARIABLES LIKE 'log_bin';    

# 查看MySQL的binlog模式
show global variables like "binlog%";

# 查看日志开启状态 
show variables like 'log_%';

# 刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果 
flush logs;

# 清空所有binlog日志 
reset master;

2. 创建一个用户,赋权

CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'flink@cdc';
GRANT ALL PRIVILEGES ON *.* TO 'flink_cdc_user'@'%';

3. maven依赖:

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <flink.version>1.15.2</flink.version>
  5. </properties>
  6. <dependencies>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-clients</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-streaming-java</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-runtime-web</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-table-planner_2.12</artifactId>
  25. <version>${flink.version}</version>
  26. <!-- <scope>provided</scope>-->
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-connector-jdbc</artifactId>
  31. <version>${flink.version}</version>
  32. <!-- <scope>provided</scope>-->
  33. <!-- 此标签会移除jar包,当需要打包到集群运行时加上此标签-->
  34. </dependency>
  35. <dependency>
  36. <groupId>mysql</groupId>
  37. <artifactId>mysql-connector-java</artifactId>
  38. <version>8.0.29</version>
  39. <!-- <scope>provided</scope>-->
  40. </dependency>
  41. <dependency>
  42. <groupId>org.projectlombok</groupId>
  43. <artifactId>lombok</artifactId>
  44. <version>1.18.22</version>
  45. </dependency>
  46. <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
  47. <dependency>
  48. <groupId>com.ververica</groupId>
  49. <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  50. <version>2.3.0</version>
  51. <!--<scope>provided</scope>-->
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.flink</groupId>
  55. <artifactId>flink-connector-jdbc</artifactId>
  56. <version>1.15.2</version>
  57. <!--<scope>provided</scope>-->
  58. <!--此标签会移除jar包,当需要打包到集群运行时加上此标签-->
  59. </dependency>
  60. <dependency>
  61. <groupId>org.apache.flink</groupId>
  62. <artifactId>flink-connector-base</artifactId>
  63. <version>${flink.version}</version>
  64. <!--<scope>provided</scope>-->
  65. </dependency>
  66. </dependencies>

4. 若是打包到集群运行,相关依赖要放开 provided,这样就不会把依赖打入到jar包里面,就不会和flink lib里面的jar包冲突。

lib 里面需要加入的包:从官网下载,放入即可

flink-connector-jdbc-1.15.4.jar

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

flink-sql-connector-mysql-cdc-2.3.0.jar

mysql-connector-java-8.0.29.jar

commons-cli-1.5.0.jar

5.mysql建表如下:

#mysql建表:

CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `user_sink` (
  `id` int(11) NOT NULL,
  `username` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6.测试demo如下:

  1. package com.xgg.flink.stream.sql;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. public class MysqlToMysqlHavePrimaryKey {
  5. public static void main(String[] args) {
  6. //1.获取stream的执行环境
  7. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
  8. senv.setParallelism(1);
  9. //2.创建表执行环境
  10. StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
  11. String sourceTable = "CREATE TABLE mysql_cdc_source (" +
  12. " id INT,\n" +
  13. " username STRING,\n" +
  14. " password STRING,\n" +
  15. "PRIMARY KEY(id) NOT ENFORCED\n" +
  16. ") WITH (\n" +
  17. "'connector' = 'mysql-cdc',\n" +
  18. "'hostname' = 'localhost',\n" +
  19. "'port' = '3306',\n" +
  20. "'username' = 'root',\n" +
  21. "'password' = 'root',\n" +
  22. "'database-name' = 'test_cdc',\n" +
  23. "'debezium.snapshot.mode' = 'initial',\n" +
  24. "'table-name' = 'user'\n" +
  25. ")";
  26. tEnv.executeSql(sourceTable);
  27. String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
  28. " id INT,\n" +
  29. " username STRING,\n" +
  30. " password STRING,\n" +
  31. "PRIMARY KEY(id) NOT ENFORCED\n" +
  32. ") WITH (\n" +
  33. "'connector' = 'jdbc',\n" +
  34. "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
  35. "'url' = 'jdbc:mysql://localhost:3306/test_cdc?rewriteBatchedStatements=true',\n" +
  36. "'username' = 'root',\n" +
  37. "'password' = 'root',\n" +
  38. "'table-name' = 'user_sink'\n" +
  39. ")";
  40. tEnv.executeSql(sinkTable);
  41. tEnv.executeSql("insert into mysql_cdc_sink select id,username,password from mysql_cdc_source");
  42. tEnv.executeSql("select * from mysql_cdc_source").print();
  43. }
  44. }

源表进行操作,flink cdc 捕获操作记录进行打印,然后插入到表中。(mysql的cdc可以一边打印,一边写表,无问题。oracle的cdc,如果有多个执行操作,就会只执行一个,比如,先打印再写表,oracle只能打印,写表操作就不会触发。如果不打印,只写表,那就没问题。好像和senv.setParallelism(1);没关系,应该还是底层实现的问题。)

user 源表和目标表 user_sink,数据都如下。

 源表和目标表都是在Mysql有主键的,所以找个参数虽然是初始化操作,后面插入也是 insert into ,但是不管执行多少遍,都不会有重复的数据。

"'debezium.snapshot.mode' = 'initial',\n" +
?rewriteBatchedStatements=true 这个参数是开启批量写,能加大写速度。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/692540
推荐阅读
相关标签
  

闽ICP备14008679号