当前位置:   article > 正文

FlinkCDC同步mysql的demo

FlinkCDC同步mysql的demo

目录

一 : 什么是CDC ?使用场景是什么?

二: 目前有哪些技术

基于查询的 CDC:

基于日志的 CDC:

三- FlinkCDC采集mysql 到 mysql的demo

1- mysql必须开启binlog

 2- 创建一个用户,权限 SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 。必须有reload

3- 将flink-cdc-connectors的 jar包放入 /lib 目录下。

4- 引入依赖 (注意 打包成 jar的时候,不要打包进去。不然会报错)

5.1- SQL的实现方式


一 : 什么是CDC ?使用场景是什么?

CDC是(change data capture),翻译过来就是 捕获数据变更。通常数据处理上,我们说的 CDC 技术主要面向 数据库的变更,是一种用于捕获数据库中数据变更的技术。

它的使用场景(作用)主要有:

        1- 数据同步,用于备份,容灾

        2- 数据分发,一个数据源分发给多个下游

        3- 数据采集(E),面向数据仓库/数据湖的 ETL 数据集成

二: 目前有哪些技术

根据实现机制可以分为两个方向,基于查询和基于日志。

基于查询是就是select进行全表扫描过滤出变更的数据。

基于日志就是连续实时读取数据库的操作log,例如msyql的binlog

  • 基于查询的 CDC:

    • 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;

    • 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;

    • 无法保障实时性,基于离线调度存在天然的延迟。

    • 影响数据库性能

  • 基于日志的 CDC:

    • 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
    • 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
    • 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。


    因我们的业务场景是要求近实时(分钟级),所以必须采用基于binlog的技术,canal的demo可以参考我的另外文章。又因为 初始化时需要导入全量数据(msyql到kudu),canal得依赖其他的组件,需要保证数据完整一致性(数据不丢,不重复),且对数据库影响小(锁表先导入全量数据,在进行增量)。操作起来较为麻烦,此时FlinkCDC闪亮登场( 如何全量,增量和精准一次可参考)。图片

三- FlinkCDC采集mysql 到 mysql的demo

前置条件:Mysql 必须是 5.7 或 8.0.X

1- mysql必须开启binlog

  1. server-id = <server-id> # 可以自定义,但必须唯一
  2. log_bin = <mysql-bin> # 可以自定义,binlog文件的前缀名
  3. binlog_format = ROW # 必须是row
  4. binlog_row_image = FULL # 必须是full

 2- 创建一个用户,权限 SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT 。必须有reload

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '<flinkuser>'@'<mysqlADD>' identified by '<flinkuserPWD>';

 Flink必须是 1.12以上的,如果使用flinkCDC2.0且使用flinkSQL,必须是1.13,java 8

下载网页flink-cdc-connector(包括了 mysql postgres和mongdb)

没放入可能会包找不到的错误

org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

4- 引入依赖 (注意 打包成 jar的时候,不要打包进去。不然会报错)

  1. <dependency>
  2. <groupId>com.ververica</groupId>
  3. <artifactId>flink-connector-mysql-cdc</artifactId>
  4. <version>2.0.0</version>
  5. <scope>provided</scope> <- 编译打包是不要打包进去,不然运行会报错->
  6. </dependency>
  1. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler
  2. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
  3. at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
  4. at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
  5. at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
  6. at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
  7. at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
  8. at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
  9. at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
  10. at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
  11. Caused by: java.lang.IllegalStateException: Unable to instantiate java compiler
  12. at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:428)
  13. at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:374)
  14. at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
  15. at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
  16. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
  17. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
  18. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
  19. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
  20. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
  21. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
  22. at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
  23. at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:469)
  24. at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:481)
  25. at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:95)
  26. at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:784)
  27. at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
  28. at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
  29. at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
  30. 。。
  31. 。。
  32. 。。
  33. 。。
  34. Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
  35. at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
  36. at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
  37. at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)

 完整的pom文件(包含了 DataStreaming实现和 SQL实现的依赖)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>Flink</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <flink.version>1.13.0</flink.version>
  13. <scala.binary.version>2.12</scala.binary.version>
  14. <mysql.version>5.1.49</mysql.version>
  15. <flinkcdc.version>2.0.0</flinkcdc.version>
  16. <fastjson.version>1.2.75</fastjson.version>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-java</artifactId>
  22. <version>${flink.version}</version>
  23. <scope>provided</scope>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.flink</groupId>
  27. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  28. <version>${flink.version}</version>
  29. <scope>provided</scope>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  34. <version>${flink.version}</version>
  35. <scope>provided</scope>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  40. <version>${flink.version}</version>
  41. <scope>provided</scope>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.ververica</groupId>
  45. <artifactId>flink-connector-mysql-cdc</artifactId>
  46. <version>${flinkcdc.version}</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>com.alibaba</groupId>
  50. <artifactId>fastjson</artifactId>
  51. <version>${fastjson.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.flink</groupId>
  55. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  56. <version>${flink.version}</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>mysql</groupId>
  60. <artifactId>mysql-connector-java</artifactId>
  61. <version>${mysql.version}</version>
  62. </dependency>
  63. </dependencies>
  64. <build>
  65. <sourceDirectory>src/main/java/</sourceDirectory>
  66. <plugins>
  67. <plugin>
  68. <groupId>org.apache.maven.plugins</groupId>
  69. <artifactId>maven-compiler-plugin</artifactId>
  70. <version>2.3.2</version>
  71. <configuration>
  72. <source>1.8</source>
  73. <target>1.8</target>
  74. <encoding>UTF-8</encoding>
  75. <verbose>true</verbose>
  76. </configuration>
  77. </plugin>
  78. <plugin>
  79. <groupId>org.apache.maven.plugins</groupId>
  80. <artifactId>maven-surefire-plugin</artifactId>
  81. <version>2.8.1</version>
  82. <configuration>
  83. <includes>
  84. <include>**/*.java</include>
  85. </includes>
  86. <skipTests>true</skipTests>
  87. </configuration>
  88. </plugin>
  89. <plugin>
  90. <groupId>org.apache.maven.plugins</groupId>
  91. <artifactId>maven-shade-plugin</artifactId>
  92. <version>2.4.3</version>
  93. <executions>
  94. <execution>
  95. <phase>package</phase>
  96. <goals>
  97. <goal>shade</goal>
  98. </goals>
  99. </execution>
  100. </executions>
  101. </plugin>
  102. </plugins>
  103. </build>
  104. </project>
flink jar包选择provide也是官方的建议  

5.1- SQL的实现方式

本地测试 从 bigdata.person中读取到 ,bigdata.person1。

  建表语句

  1. CREATE DATABASE bigdata;
  2. use bigdata;
  3. CREATE TABLE person(
  4. id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  5. name VARCHAR(20) NOT NULL DEFAULT ""
  6. ) ENGINE=INNODB DEFAULT CHARSET=UTF8 ;
  7. CREATE TABLE person1(
  8. id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  9. name VARCHAR(20) NOT NULL DEFAULT ""
  10. ) ENGINE=INNODB DEFAULT CHARSET=UTF8 ;

 往数据库写入数据

  1. #随机创建字符串函数
  2. DELIMITER $$
  3. CREATE FUNCTION rand_string(n INT) RETURNS VARCHAR(255)
  4. BEGIN
  5. DECLARE chars_str VARCHAR(100) DEFAULT 'abcdefghijklmnopqrstuvwxyzABCDEFJHIJKLMNOPQRSTUVWXYZ';
  6. DECLARE return_str VARCHAR(255) DEFAULT '';
  7. DECLARE i INT DEFAULT 0;
  8. WHILE i < n DO
  9. SET return_str =CONCAT(return_str,SUBSTRING(chars_str,FLOOR(1+RAND()*52),1));
  10. SET i = i + 1;
  11. END WHILE;
  12. RETURN return_str;
  13. END $$
  14. #往表中插入数据的 存储过程
  15. DELIMITER $$
  16. CREATE PROCEDURE insert_person(IN START INT(10),IN max_num INT(10))
  17. BEGIN
  18. DECLARE i INT DEFAULT 0;
  19. #set autocommit =0 把autocommit设置成0
  20. SET autocommit = 0;
  21. REPEAT
  22. SET i = i + 1;
  23. INSERT INTO person (name ) VALUES (rand_string(6));
  24. UNTIL i = max_num
  25. END REPEAT;
  26. COMMIT;
  27. END $$
  28. #调用存储过程,写入数据
  29. DELIMITER ;
  30. CALL insert_person(100001,5000);

FlinkSQL实现方式

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.api.TableResult;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. public class Mysql2MysqlLocal {
  5. public static void main(String[] args) throws Exception {
  6. EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  7. TableEnvironment tableEnv = TableEnvironment.create(envSettings);
  8. String sourceDDL =
  9. "CREATE TABLE mysql_binlog (\n" +
  10. " id Int,\n" +
  11. " name STRING,\n" +
  12. " primary key (id) not enforced\n" +
  13. ") WITH (\n" +
  14. " 'connector' = 'mysql-cdc',\n" +
  15. " 'hostname' = '127.0.0.1',\n" +
  16. " 'port' = '3306',\n" +
  17. " 'username' = 'root',\n" +
  18. " 'password' = '123456',\n" +
  19. " 'database-name' = 'bigdata',\n" +
  20. " 'table-name' = 'person',\n" +
  21. " 'scan.startup.mode' = 'earliest-offset'\n" +
  22. ")";
  23. String sinkDDL =
  24. "CREATE TABLE test_cdc (" +
  25. " id Int," +
  26. " name STRING," +
  27. " primary key (id) not enforced" +
  28. ") WITH (" +
  29. " 'connector' = 'jdbc'," +
  30. " 'driver' = 'com.mysql.cj.jdbc.Driver'," +
  31. " 'url' = 'jdbc:mysql://127.0.0.1:3306/bigdata?serverTimezone=UTC&useSSL=false'," +
  32. " 'username' = 'root'," +
  33. " 'password' = '123456'," +
  34. " 'table-name' = 'person1'" +
  35. ")";
  36. // 简单的聚合处理
  37. String transformDmlSQL = "insert into test_cdc select * from mysql_binlog";
  38. TableResult tableResult = tableEnv.executeSql(sourceDDL);
  39. TableResult sinkResult = tableEnv.executeSql(sinkDDL);
  40. tableEnv.executeSql(transformDmlSQL);
  41. }
  42. }

测试步骤:

case1 : 增量测试

1- 本地idea测试的话,直接运行程序

2- 调用存储过程往 表person写入数据(也可以手动单条插入)

3- 查看 person1的数据条数是否一致

case2: 全量+增量测试

1- 调用存储过程 往表person写入 100w条数据

2- 在idea启动运行程序,继续调用存储过程往表person写入100w条数据

3- 查看person1的 数据数量

case3: 本地运行jar包 进行增量测试

1- idea等编译生成 jar包

2- 启动flink集群  <FLINK_HOME>/bin/start-cluster.sh

3- 启动运行 jar程序, bin/flink run -m 127.0.0.1 -c Mysql2MysqlLocal  <flink_jar_name>.jar

4- 往person表写入数据

5- 查看person1 表的数据

先写到这里

参考文档:FlinkCDC的git地址

Flink 中文社区 | Flink CDC 2.0 正式发布,详解核心改进

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/473106
推荐阅读
相关标签