赞
踩
组件 | 版本 |
---|---|
scala | 2.12 |
netcat | * |
kafka | * |
mysql | * |
flink | 1.13.3 |
监听mysql某个表的动态,实时同步到另一个数据库中。
当然使用maxwell或canal也可以实现同样的效果。这里只是简单演示
创建环境
//创建环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
创建表user01,mysql-cdc是第三方连接器
//创建表user01 - 使用 mysql-cdc connector tEnv.executeSql( """ |create table user01 ( |id int , |name string, |PRIMARY KEY (id) NOT ENFORCED |)with( |'connector' = 'mysql-cdc', |'hostname' = 'server120', |'port' = '3306', |'username' = 'flink_test', |'password' = 'flink_test', |'database-name' = 'flink_test', |'table-name' = 'user01', |'scan.incremental.snapshot.enabled' = 'false' |) |""".stripMargin)
创建表user02,jdbc 是flink自带的连接器
//创建表user02 - 使用jdbc connector
tEnv.executeSql(
"""
|create table user02 (
|id int PRIMARY KEY,
|name string
|)with(
'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://server120:3306/flink_test',
| 'table-name' = 'user02',
| 'username' = 'flink_test',
| 'password' = 'flink_test'
|)
|""".stripMargin)
将user01同步到user02
//将user01同步到user02
tEnv.from("user01").executeInsert("user02")
这种方式简单的实现了实时同步mysql某个表的增删改查。
package com.z.tableapi import org.apache.flink.table.api._ /** * @Author wenz.ma * @Date 2021/10/27 17:52 * @Desc cdc 实时同步mysql表数据 */ object Mysql2MysqlWithCDC { def main(args: Array[String]): Unit = { //创建环境 val settings = EnvironmentSettings.newInstance().inStreamingMode().build() val tEnv = TableEnvironment.create(settings) //创建表user01 - 使用 mysql-cdc connector tEnv.executeSql( """ |create table user01 ( |id int , |name string, |PRIMARY KEY (id) NOT ENFORCED |)with( |'connector' = 'mysql-cdc', |'hostname' = 'server120', |'port' = '3306', |'username' = 'flink_test', |'password' = 'flink_test', |'database-name' = 'flink_test', |'table-name' = 'user01', |'scan.incremental.snapshot.enabled' = 'false' |) |""".stripMargin) //创建表user02 - 使用jdbc connector tEnv.executeSql( """ |create table user02 ( |id int PRIMARY KEY, |name string |)with( 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://server120:3306/flink_test', | 'table-name' = 'user02', | 'username' = 'flink_test', | 'password' = 'flink_test' |) |""".stripMargin) //将user01同步到user02 tEnv.from("user01").executeInsert("user02") } }
flink mysql 依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${mysql.cdc}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
flink 依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
https://download.csdn.net/download/sinat_25528181/44038825
hive-catalog-demo
目前有mysql-cdc、postgres-cdc、MongoDB-cdc、oracle-cdc
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。