赞
踩
1、pom依赖
- <?xml version="1.0" encoding="UTF-8"?>
-
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>Flink1.17.2</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <name>Flink1.17.2</name>
- <!-- FIXME change it to the project's website -->
- <url>http://www.example.com</url>
-
- <repositories>
- <repository>
- <id>oss.sonatype.org-snapshot</id>
- <name>OSS Sonatype Snapshot Repository</name>
- <url>http://oss.sonatype.org/content/repositories/snapshots</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- <repository>
- <id>apache.snapshots</id>
- <name>Apache Development Snapshot Repository</name>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <scala.version>2.12.7</scala.version>
- <scala.binary.version>2.12</scala.binary.version>
- <flink.version>1.17.2</flink.version>
- <java.version>1.8</java.version>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- </properties>
-
- <dependencies>
- <!-- <dependency>-->
- <!-- <groupId>com.alibaba.flink</groupId>-->
- <!-- <artifactId>datahub-connector</artifactId>-->
- <!-- <version>0.1-SNAPSHOT</version>-->
- <!-- <classifier>jar-with-dependencies</classifier>-->
- <!-- </dependency>-->
- <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-datahub -->
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>ververica-connector-datahub</artifactId>
- <version>1.17-vvr-8.0.8</version>
- <scope>provided</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-common -->
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>ververica-connector-common</artifactId>
- <version>1.17-vvr-8.0.8</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>ververica-connector-continuous-odps</artifactId>
- <version>1.17-vvr-8.0.8</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java-bridge</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>${flink.version}</version>
- <!-- <scope>test</scope>-->
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-hive_2.12</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.12.7</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.34</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.23</version>
- <scope>provided</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-debezium -->
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-debezium</artifactId>
- <version>2.2.0</version>
- <!-- <scope>provided</scope>-->
- </dependency>
-
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.2.0</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-sql-connector-mysql-cdc</artifactId>
- <version>2.2.0</version>
- <!-- <scope>provided</scope>-->
- </dependency>
-
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.4</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-${project.version}-flink-fat-jar</finalName>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.tbea.cdc.FlinkCDCStreamExample</mainClass> <!-- 指定你的主类,例如 org.example.MyFlinkJob -->
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
2、FlinkCDC-SQL
- package com.tbea.cdc;
-
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.table.api.*;
-
- public class FlinkCDCSqlExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
-
- // 创建HiveCatalog
- // String name = "myhive";
- // String defaultDatabase = "mydatabase";
- // String hiveConfDir = "/path/to/your/hive/conf"; // 替换为你的Hive配置文件夹路径
- // HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
- // tableEnv.registerCatalog("myhive", hiveCatalog);
- //
- // // 设置HiveCatalog为当前catalog
- // tableEnv.useCatalog("myhive");
-
-
- // 注册MySQL表
- String sourceDDL = "CREATE TABLE source_table (" +
- " id INT," +
- " name STRING," +
- " age INT," +
- " addr STRING ,"+
- " create_time timestamp,"+
- " PRIMARY KEY (id) NOT ENFORCED" +
- ") WITH (" +
- " 'connector' = 'mysql-cdc'," +
- " 'hostname' = '192.168.140.1'," + // 替换为你的MySQL主机名
- " 'port' = '3306'," +
- " 'username' = 'flink'," + // 替换为你的MySQL用户名
- " 'password' = 'flink'," + // 替换为你的MySQL密码
- " 'database-name' = 'flinkcdc'," + // 替换为你的数据库名
- " 'table-name' = 'user_info'," + // 替换为你的表名
- // " 'scan.startup.timestamp-millis' = '1000',"+
- " 'scan.incremental.snapshot.enabled' = 'true',"+
- " 'scan.incremental.snapshot.chunk.size' = '8096'," +
- " 'scan.startup.mode' = 'latest-offset'," +
- " 'debezium.snapshot.mode'= 'latest',"+
- " 'scan.newly-added-table.enabled' = 'true'"+
- // " 'debezium.skipped.operations'='d'" +
- ")";
-
- tableEnv.executeSql(sourceDDL);
-
- // 查询并打印数据
- TableResult result = tableEnv.executeSql("SELECT id,name,age,addr,create_time FROM source_table");
-
- result.print();
- env.execute();
- }
- }
运行结果:
3、FlinkCDC-DataStream
- package com.tbea.cdc;
-
- import com.ververica.cdc.connectors.mysql.source.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
- import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.api.common.restartstrategy.RestartStrategies;
- import org.apache.flink.api.common.time.Time;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- public class FlinkCDCStreamExample {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
- .hostname("192.168.140.1")
- .port(3306)
- // 设置捕获的数据库
- .databaseList("flinkcdc")
- // 设置捕获的表 [product, user, address]
- .tableList("flinkcdc.*")
- .username("flink")
- .password("flink")
- .deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串
- // 启用扫描新添加的表功能
- .scanNewlyAddedTableEnabled(true)
- .startupOptions(StartupOptions.latest())
- .build();
- // 你的业务代码
- // 设置 3s 的 checkpoint 间隔
- env.setParallelism(1);
- env.setRestartStrategy(RestartStrategies.failureRateRestart(1, Time.seconds(3L),Time.seconds(5L)));
- env.enableCheckpointing(3000);
-
- DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql-Source");
- dataStreamSource.print();
-
-
- env.execute("Print MySQL Snapshot + Binlog");
- }
- }
运行结果:
- {"before":null,"after":{"id":22,"name":"杨延昭","age":45,"addr":"辽宁","create_time":1723798965000},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723770167000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":8640,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1723770167469,"transaction":null}
-
- {"before":{"id":17,"name":"储熊","age":28,"addr":"张北","create_time":null},"after":{"id":17,"name":"储熊","age":28,"addr":"张北","create_time":1723798410000},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723769613000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":6540,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1723769613737,"transaction":null}
-
- {"before":{"id":8,"name":"赵刚","age":33,"addr":"延安","create_time":1724230872000},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723770354000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":8968,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1723770354642,"transaction":null}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。