当前位置:   article > 正文

FlinkCDC初体验

FlinkCDC初体验

1、pom依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>org.example</groupId>
  6. <artifactId>Flink1.17.2</artifactId>
  7. <version>1.0-SNAPSHOT</version>
  8. <name>Flink1.17.2</name>
  9. <!-- FIXME change it to the project's website -->
  10. <url>http://www.example.com</url>
  11. <repositories>
  12. <repository>
  13. <id>oss.sonatype.org-snapshot</id>
  14. <name>OSS Sonatype Snapshot Repository</name>
  15. <url>http://oss.sonatype.org/content/repositories/snapshots</url>
  16. <releases>
  17. <enabled>false</enabled>
  18. </releases>
  19. <snapshots>
  20. <enabled>true</enabled>
  21. </snapshots>
  22. </repository>
  23. <repository>
  24. <id>apache.snapshots</id>
  25. <name>Apache Development Snapshot Repository</name>
  26. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  27. <releases>
  28. <enabled>false</enabled>
  29. </releases>
  30. <snapshots>
  31. <enabled>true</enabled>
  32. </snapshots>
  33. </repository>
  34. </repositories>
  35. <properties>
  36. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  37. <scala.version>2.12.7</scala.version>
  38. <scala.binary.version>2.12</scala.binary.version>
  39. <flink.version>1.17.2</flink.version>
  40. <java.version>1.8</java.version>
  41. <maven.compiler.source>1.8</maven.compiler.source>
  42. <maven.compiler.target>1.8</maven.compiler.target>
  43. </properties>
  44. <dependencies>
  45. <!-- <dependency>-->
  46. <!-- <groupId>com.alibaba.flink</groupId>-->
  47. <!-- <artifactId>datahub-connector</artifactId>-->
  48. <!-- <version>0.1-SNAPSHOT</version>-->
  49. <!-- <classifier>jar-with-dependencies</classifier>-->
  50. <!-- </dependency>-->
  51. <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-datahub -->
  52. <dependency>
  53. <groupId>com.alibaba.ververica</groupId>
  54. <artifactId>ververica-connector-datahub</artifactId>
  55. <version>1.17-vvr-8.0.8</version>
  56. <scope>provided</scope>
  57. </dependency>
  58. <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-common -->
  59. <dependency>
  60. <groupId>com.alibaba.ververica</groupId>
  61. <artifactId>ververica-connector-common</artifactId>
  62. <version>1.17-vvr-8.0.8</version>
  63. <scope>provided</scope>
  64. </dependency>
  65. <dependency>
  66. <groupId>com.alibaba.ververica</groupId>
  67. <artifactId>ververica-connector-continuous-odps</artifactId>
  68. <version>1.17-vvr-8.0.8</version>
  69. <scope>provided</scope>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.apache.flink</groupId>
  73. <artifactId>flink-connector-base</artifactId>
  74. <version>${flink.version}</version>
  75. <!-- <scope>provided</scope>-->
  76. </dependency>
  77. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
  78. <dependency>
  79. <groupId>org.apache.flink</groupId>
  80. <artifactId>flink-core</artifactId>
  81. <version>${flink.version}</version>
  82. <!-- <scope>provided</scope>-->
  83. </dependency>
  84. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
  85. <dependency>
  86. <groupId>org.apache.flink</groupId>
  87. <artifactId>flink-java</artifactId>
  88. <version>${flink.version}</version>
  89. <!-- <scope>provided</scope>-->
  90. </dependency>
  91. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
  92. <dependency>
  93. <groupId>org.apache.flink</groupId>
  94. <artifactId>flink-clients</artifactId>
  95. <version>${flink.version}</version>
  96. <!-- <scope>provided</scope>-->
  97. </dependency>
  98. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
  99. <dependency>
  100. <groupId>org.apache.flink</groupId>
  101. <artifactId>flink-streaming-java</artifactId>
  102. <version>${flink.version}</version>
  103. <!-- <scope>provided</scope>-->
  104. </dependency>
  105. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime -->
  106. <dependency>
  107. <groupId>org.apache.flink</groupId>
  108. <artifactId>flink-runtime</artifactId>
  109. <version>${flink.version}</version>
  110. <!-- <scope>provided</scope>-->
  111. </dependency>
  112. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
  113. <dependency>
  114. <groupId>org.apache.flink</groupId>
  115. <artifactId>flink-table-common</artifactId>
  116. <version>${flink.version}</version>
  117. <!-- <scope>provided</scope>-->
  118. </dependency>
  119. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
  120. <dependency>
  121. <groupId>org.apache.flink</groupId>
  122. <artifactId>flink-table-api-java-bridge</artifactId>
  123. <version>${flink.version}</version>
  124. <!-- <scope>provided</scope>-->
  125. </dependency>
  126. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
  127. <dependency>
  128. <groupId>org.apache.flink</groupId>
  129. <artifactId>flink-table-planner_2.12</artifactId>
  130. <version>${flink.version}</version>
  131. <!-- <scope>test</scope>-->
  132. </dependency>
  133. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
  134. <dependency>
  135. <groupId>org.apache.flink</groupId>
  136. <artifactId>flink-connector-hive_2.12</artifactId>
  137. <version>${flink.version}</version>
  138. <scope>provided</scope>
  139. </dependency>
  140. <dependency>
  141. <groupId>org.scala-lang</groupId>
  142. <artifactId>scala-library</artifactId>
  143. <version>2.12.7</version>
  144. <!-- <scope>provided</scope>-->
  145. </dependency>
  146. <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
  147. <dependency>
  148. <groupId>org.projectlombok</groupId>
  149. <artifactId>lombok</artifactId>
  150. <version>1.18.34</version>
  151. <!-- <scope>provided</scope>-->
  152. </dependency>
  153. <dependency>
  154. <groupId>mysql</groupId>
  155. <artifactId>mysql-connector-java</artifactId>
  156. <version>8.0.23</version>
  157. <scope>provided</scope>
  158. </dependency>
  159. <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-debezium -->
  160. <dependency>
  161. <groupId>com.ververica</groupId>
  162. <artifactId>flink-connector-debezium</artifactId>
  163. <version>2.2.0</version>
  164. <!-- <scope>provided</scope>-->
  165. </dependency>
  166. <dependency>
  167. <groupId>com.ververica</groupId>
  168. <artifactId>flink-connector-mysql-cdc</artifactId>
  169. <version>2.2.0</version>
  170. <!-- <scope>provided</scope>-->
  171. </dependency>
  172. <dependency>
  173. <groupId>com.ververica</groupId>
  174. <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  175. <version>2.2.0</version>
  176. <!-- <scope>provided</scope>-->
  177. </dependency>
  178. </dependencies>
  179. <build>
  180. <plugins>
  181. <plugin>
  182. <groupId>org.apache.maven.plugins</groupId>
  183. <artifactId>maven-compiler-plugin</artifactId>
  184. <version>3.1</version>
  185. <configuration>
  186. <source>${java.version}</source>
  187. <target>${java.version}</target>
  188. </configuration>
  189. </plugin>
  190. <plugin>
  191. <groupId>org.apache.maven.plugins</groupId>
  192. <artifactId>maven-shade-plugin</artifactId>
  193. <version>3.2.4</version>
  194. <executions>
  195. <execution>
  196. <phase>package</phase>
  197. <goals>
  198. <goal>shade</goal>
  199. </goals>
  200. <configuration>
  201. <finalName>${project.artifactId}-${project.version}-flink-fat-jar</finalName>
  202. <transformers>
  203. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  204. <mainClass>com.tbea.cdc.FlinkCDCStreamExample</mainClass> <!-- 指定你的主类,例如 org.example.MyFlinkJob -->
  205. </transformer>
  206. </transformers>
  207. </configuration>
  208. </execution>
  209. </executions>
  210. </plugin>
  211. </plugins>
  212. </build>
  213. </project>

 2、FlinkCDC-SQL

  1. package com.tbea.cdc;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. import org.apache.flink.table.api.*;
  5. public class FlinkCDCSqlExample {
  6. public static void main(String[] args) throws Exception {
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  9. // 创建HiveCatalog
  10. // String name = "myhive";
  11. // String defaultDatabase = "mydatabase";
  12. // String hiveConfDir = "/path/to/your/hive/conf"; // 替换为你的Hive配置文件夹路径
  13. // HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
  14. // tableEnv.registerCatalog("myhive", hiveCatalog);
  15. //
  16. // // 设置HiveCatalog为当前catalog
  17. // tableEnv.useCatalog("myhive");
  18. // 注册MySQL表
  19. String sourceDDL = "CREATE TABLE source_table (" +
  20. " id INT," +
  21. " name STRING," +
  22. " age INT," +
  23. " addr STRING ,"+
  24. " create_time timestamp,"+
  25. " PRIMARY KEY (id) NOT ENFORCED" +
  26. ") WITH (" +
  27. " 'connector' = 'mysql-cdc'," +
  28. " 'hostname' = '192.168.140.1'," + // 替换为你的MySQL主机名
  29. " 'port' = '3306'," +
  30. " 'username' = 'flink'," + // 替换为你的MySQL用户名
  31. " 'password' = 'flink'," + // 替换为你的MySQL密码
  32. " 'database-name' = 'flinkcdc'," + // 替换为你的数据库名
  33. " 'table-name' = 'user_info'," + // 替换为你的表名
  34. // " 'scan.startup.timestamp-millis' = '1000',"+
  35. " 'scan.incremental.snapshot.enabled' = 'true',"+
  36. " 'scan.incremental.snapshot.chunk.size' = '8096'," +
  37. " 'scan.startup.mode' = 'latest-offset'," +
  38. " 'debezium.snapshot.mode'= 'latest',"+
  39. " 'scan.newly-added-table.enabled' = 'true'"+
  40. // " 'debezium.skipped.operations'='d'" +
  41. ")";
  42. tableEnv.executeSql(sourceDDL);
  43. // 查询并打印数据
  44. TableResult result = tableEnv.executeSql("SELECT id,name,age,addr,create_time FROM source_table");
  45. result.print();
  46. env.execute();
  47. }
  48. }

运行结果:

3、FlinkCDC-DataStream

  1. package com.tbea.cdc;
  2. import com.ververica.cdc.connectors.mysql.source.MySqlSource;
  3. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
  5. import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  7. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  8. import org.apache.flink.api.common.time.Time;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. public class FlinkCDCStreamExample {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  15. .hostname("192.168.140.1")
  16. .port(3306)
  17. // 设置捕获的数据库
  18. .databaseList("flinkcdc")
  19. // 设置捕获的表 [product, user, address]
  20. .tableList("flinkcdc.*")
  21. .username("flink")
  22. .password("flink")
  23. .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
  24. // 启用扫描新添加的表功能
  25. .scanNewlyAddedTableEnabled(true)
  26. .startupOptions(StartupOptions.latest())
  27. .build();
  28. // 你的业务代码
  29. // 设置 3s 的 checkpoint 间隔
  30. env.setParallelism(1);
  31. env.setRestartStrategy(RestartStrategies.failureRateRestart(1, Time.seconds(3L),Time.seconds(5L)));
  32. env.enableCheckpointing(3000);
  33. DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql-Source");
  34. dataStreamSource.print();
  35. env.execute("Print MySQL Snapshot + Binlog");
  36. }
  37. }

 运行结果:

  1. {"before":null,"after":{"id":22,"name":"杨延昭","age":45,"addr":"辽宁","create_time":1723798965000},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723770167000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":8640,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1723770167469,"transaction":null}
  2. {"before":{"id":17,"name":"储熊","age":28,"addr":"张北","create_time":null},"after":{"id":17,"name":"储熊","age":28,"addr":"张北","create_time":1723798410000},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723769613000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":6540,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1723769613737,"transaction":null}
  3. {"before":{"id":8,"name":"赵刚","age":33,"addr":"延安","create_time":1724230872000},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723770354000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":8968,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1723770354642,"transaction":null}
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号