赞
踩
CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
[mysqld]
中加入如下信息[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
重启数据库。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <flink.version>1.14.2</flink.version> <scala.binary.version>2.11</scala.binary.version> <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.14.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.14.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.14.2</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.17</version> </dependency> <!-- flink-clients 用于本地调试 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.14.2</version> <scope>test</scope> </dependency><!-- 本地启动用,需要下载flink,将lib下面的flink-dist_2.11-拷贝到项目中引用-->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-dist</artifactId> <version>200</version> <scope>system</scope> <systemPath>${basedir}/src/main/lib/flink-dist_2.11-1.14.4.jar</systemPath> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.2.0</version> <configuration> <!-- 设置主类 --> <archive> <manifestEntries> <Main-Class>org.example.FlinkMysqlToMysql</Main-Class> </manifestEntries> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build>
Flink cdc实现mysql到mysql代码
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkMysqlToMysql {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 注册源表和目标表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM sourceTable");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 执行CDC过程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}
运行Main方法
- <build>
- <plugins>
- <!-- 编译插件 -->
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- <!-- spring boot 项目打包
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>-->
- <!-- Flink打包方式一 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.2.0</version>
- <configuration>
- <archive>
- <manifest>
- <Main-Class>org.example.FlinkMysqlToMysql </Main-Class>
- </manifest>
- </archive>
- <!-- 打包依赖 -->
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>

点击idea package 进行打包
选择包含依赖的jar包放到flink上运行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。