当前位置:   article > 正文

Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (Table API方式)_flink cdc table api

flink cdc table api

上一篇采用 DataStream 方式 Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式) ,该篇采用Table API方式。

一、创建项目

基于jdk17 + springboot3.0.2 + elasticsearch7.17.9 + flink1.16.0 + flink CDC2.3.0

1、pom 主要依赖

<properties>
    <java.version>17</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>3.0.2</spring-boot.version>
    <flink.version>1.16.0</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
    <!-- Flink CDC connector for MySQL -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
        <version>3.0.0-1.16</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

2、实现MySQL变更监听

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MysqlEventListener implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) {
        mysql2es();
    }

    /**
     * mysql to es
     */
    private void mysql2es() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置1个并行源任务
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            // 数据源表
            String sourceDDL =
                    """
                            CREATE TABLE IF NOT EXISTS system_dept (
                                id BIGINT,
                                name VARCHAR(30),
                                sort INT,
                                leader_user_id BIGINT,
                                phone VARCHAR(11),
                                email VARCHAR(50),
                                status TINYINT,
                                creator VARCHAR(64),
                                create_time TIMESTAMP(19),
                                PRIMARY KEY(id) NOT ENFORCED
                            ) WITH (
                                'connector' = 'mysql-cdc',
                                'hostname' = 'localhost',
                                'port' = '3306',
                                'username' = 'flinkcdc',
                                'password' = 'flinkcdc',
                                'database-name' = 'db01',
                                'table-name' = 'system_dept'
                            )
                            """;
            // 输出目标表
            String sinkDDL =
                    """
                            CREATE TABLE IF NOT EXISTS system_dept_es (
                                id BIGINT,
                                name VARCHAR(30),
                                sort INT,
                                leader_user_id BIGINT,
                                phone VARCHAR(11),
                                email VARCHAR(50),
                                status TINYINT,
                                creator VARCHAR(64),
                                create_time TIMESTAMP(19),
                                PRIMARY KEY(id) NOT ENFORCED
                            ) WITH (
                                'connector' = 'elasticsearch-7',
                                'hosts' = 'http://localhost:9200',
                                'index' = 'system_dept_search',
                                'sink.bulk-flush.max-actions' = '1'
                            )
                            """;
            // 简单的聚合处理
            String transformSQL = "INSERT INTO system_dept_es SELECT * FROM system_dept";
            tableEnv.executeSql(sourceDDL);
            tableEnv.executeSql(sinkDDL);
            TableResult result = tableEnv.executeSql(transformSQL);
            result.print();
            env.executeAsync("mysql-cdc-es");
        } catch (Exception e) {
            log.error("mysql --> es, Exception=", e);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

到此就大功告成啦!
代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/master/flink-cdc-mysql2es-2

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/563627
推荐阅读
相关标签
  

闽ICP备14008679号