当前位置:   article > 正文

Flinkjar开发 CDC 实时mysql到mysql_flinkcdc mysql

flinkcdc mysql

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]中加入如下信息

[mysqld]

server-id=1

log-bin=mysql-bin

binlog-format=row

重启数据库。

2.创建springboot项目,pom添加依赖

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <java.version>1.8</java.version>
    <flink.version>1.14.2</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>

</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.14.2</version>

  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <!--<scope>provided</scope>-->
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.14.2</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.14.2</version>
    <scope>provided</scope>
  </dependency>

  
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.14.2</version>
  </dependency>
  <dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.2.0</version>
  </dependency>

  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.17</version>
  </dependency>
 
  <!-- flink-clients 用于本地调试 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.14.2</version>
    <scope>test</scope>
  </dependency>

<!-- 本地启动用,需要下载flink,将lib下面的flink-dist_2.11-拷贝到项目中引用-->

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-dist</artifactId>
  <version>200</version>
  <scope>system</scope>
  <systemPath>${basedir}/src/main/lib/flink-dist_2.11-1.14.4.jar</systemPath>
</dependency>
</dependencies>
<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-jar-plugin</artifactId>
      <version>3.2.0</version>
      <configuration>
        <!-- 设置主类 -->
        <archive>
          <manifestEntries>
            <Main-Class>org.example.FlinkMysqlToMysql</Main-Class>
          </manifestEntries>
        </archive>
      </configuration>
    </plugin>
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <configuration>
              <source>8</source>
              <target>8</target>
          </configuration>
      </plugin>
  </plugins>
</build>

Flink cdc实现mysql到mysql代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
"'connector' = 'mysql-cdc'," +
"'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'database-name' = 'quarant_db',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'password' = 'admin'\n" +
")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM sourceTable");
// tEnv.registerTable("sourceTable",result);
tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
" 'table-name' = 'organization_info',\n" +
" 'username' = 'root',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'password' = 'admin'\n" +
")");
// 执行CDC过程
String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
tEnv.executeSql(query).print();
}
}

运行Main方法

Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。

3.将程序打包成flink jar,修改pom

  1. <build>
  2. <plugins>
  3. <!-- 编译插件 -->
  4. <plugin>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <configuration>
  7. <source>1.8</source>
  8. <target>1.8</target>
  9. <encoding>UTF-8</encoding>
  10. </configuration>
  11. </plugin>
  12. <!-- spring boot 项目打包
  13. <plugin>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-maven-plugin</artifactId>
  16. </plugin>-->
  17. <!-- Flink打包方式一 -->
  18. <plugin>
  19. <groupId>org.apache.maven.plugins</groupId>
  20. <artifactId>maven-assembly-plugin</artifactId>
  21. <version>3.2.0</version>
  22. <configuration>
  23. <archive>
  24. <manifest>
  25. <Main-Class>org.example.FlinkMysqlToMysql </Main-Class>
  26. </manifest>
  27. </archive>
  28. <!-- 打包依赖 -->
  29. <descriptorRefs>
  30. <descriptorRef>jar-with-dependencies</descriptorRef>
  31. </descriptorRefs>
  32. </configuration>
  33. <executions>
  34. <execution>
  35. <id>make-assembly</id>
  36. <phase>package</phase>
  37. <goals>
  38. <goal>single</goal>
  39. </goals>
  40. </execution>
  41. </executions>
  42. </plugin>
  43. </plugins>
  44. </build>

点击idea package 进行打包

 

 选择包含依赖的jar包放到flink上运行。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号