当前位置:   article > 正文

Flink 实现Mysql的数据同步 Mysql Sink Mysql_flink mysql sink

flink mysql sink

环境

组件版本
scala2.12
netcat*
kafka*
mysql*
flink1.13.3

需求

监听mysql某个表的动态,实时同步到另一个数据库中。
当然使用maxwell或canal也可以实现同样的效果。这里只是简单演示
  • 1
  • 2

解析

创建环境
  • 1
//创建环境
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)
  • 1
  • 2
  • 3
创建表user01,mysql-cdc是第三方连接器
  • 1
//创建表user01 - 使用 mysql-cdc connector
    tEnv.executeSql(
      """
        |create table user01 (
        |id int ,
        |name string,
        |PRIMARY KEY  (id) NOT ENFORCED
        |)with(
        |'connector' = 'mysql-cdc',
        |'hostname' = 'server120',
        |'port' = '3306',
        |'username' = 'flink_test',
        |'password' = 'flink_test',
        |'database-name' = 'flink_test',
        |'table-name' = 'user01',
        |'scan.incremental.snapshot.enabled' = 'false'
        |)
        |""".stripMargin)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
创建表user02,jdbc 是flink自带的连接器
  • 1
//创建表user02 - 使用jdbc connector
    tEnv.executeSql(
      """
        |create table user02 (
        |id int PRIMARY KEY,
        |name string
        |)with(
         'connector' = 'jdbc',
        | 'url' = 'jdbc:mysql://server120:3306/flink_test',
        | 'table-name' = 'user02',
        | 'username' = 'flink_test',
        | 'password' = 'flink_test'
        |)
        |""".stripMargin)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
将user01同步到user02
  • 1
    //将user01同步到user02
    tEnv.from("user01").executeInsert("user02")
  • 1
  • 2
这种方式简单的实现了实时同步mysql某个表的增删改查。
  • 1

完整代码

package com.z.tableapi

import org.apache.flink.table.api._

/**
 * @Author wenz.ma
 * @Date 2021/10/27 17:52
 * @Desc cdc 实时同步mysql表数据
 */
object Mysql2MysqlWithCDC {
  def main(args: Array[String]): Unit = {
    //创建环境
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = TableEnvironment.create(settings)
    //创建表user01 - 使用 mysql-cdc connector
    tEnv.executeSql(
      """
        |create table user01 (
        |id int ,
        |name string,
        |PRIMARY KEY  (id) NOT ENFORCED
        |)with(
        |'connector' = 'mysql-cdc',
        |'hostname' = 'server120',
        |'port' = '3306',
        |'username' = 'flink_test',
        |'password' = 'flink_test',
        |'database-name' = 'flink_test',
        |'table-name' = 'user01',
        |'scan.incremental.snapshot.enabled' = 'false'
        |)
        |""".stripMargin)
    //创建表user02 - 使用jdbc connector
    tEnv.executeSql(
      """
        |create table user02 (
        |id int PRIMARY KEY,
        |name string
        |)with(
         'connector' = 'jdbc',
        | 'url' = 'jdbc:mysql://server120:3306/flink_test',
        | 'table-name' = 'user02',
        | 'username' = 'flink_test',
        | 'password' = 'flink_test'
        |)
        |""".stripMargin)

    //将user01同步到user02
    tEnv.from("user01").executeInsert("user02")
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

flink mysql 依赖

<dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${mysql.cdc}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

flink 依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

源码下载

https://download.csdn.net/download/sinat_25528181/44038825
hive-catalog-demo
在这里插入图片描述

附:flink-cdc

目前有mysql-cdc、postgres-cdc、MongoDB-cdc、oracle-cdc
  • 1

Github官网:https://github.com/ververica/flink-cdc-connectors
在这里插入图片描述

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号