当前位置:   article > 正文

SpringBoot集成Flink CDC实现binlog监听_flink mysql cdc 解析binlog

flink mysql cdc 解析binlog

Flink CDC

CDC相关介绍

CDC是什么?

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到MQ以供其他服务进行订阅及消费

CDC分类

CDC主要分为基于查询基于Binlog

基于查询基于Binlog
开源产品Sqoop、DataXCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也因为这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到

Flink CDC

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的source组件。

目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

Java中集成Flink CDC

MySQL相关设置

执行初始化SQL数据
# 创建test数据库
create database test;
# 在test库中创建studnet, t1, t2, t3表, 插入数据
use test;
CREATE TABLE `student`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `age` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
INSERT INTO `student` VALUES (1, 'joy', 18);
INSERT INTO `student` VALUES (2, 'tom', 123123);

CREATE TABLE t1(
    `id` VARCHAR(255) PRIMARY KEY,
    `name` VARCHAR(255)
);
CREATE TABLE t2(
    `id` VARCHAR(255) PRIMARY KEY,
    `name` VARCHAR(255)
);
CREATE TABLE t3(
    `id` VARCHAR(255) PRIMARY KEY,
    `name` VARCHAR(255)
);

# 创建test_route数据库
create database test_route;
# 在test_route库中创建t1, t2, t3表
use test_route;
CREATE TABLE t1(
    `id` VARCHAR(255) PRIMARY KEY,
    `name` VARCHAR(255)
);
CREATE TABLE t2(
    `id` VARCHAR(255) PRIMARY KEY,
    `name` VARCHAR(255)
);
CREATE TABLE t3(
    `id` VARCHAR(255) PRIMARY KEY,
    `name` VARCHAR(255)
);


# 在test_route数据库中的t1, t2, t3表插入数据
use test_route;
INSERT INTO t1 VALUES('1001','zhangsan');
INSERT INTO t1 VALUES('1002','lisi');
INSERT INTO t1 VALUES('1003','wangwu');

INSERT INTO t2 VALUES('1004','zhangsan');
INSERT INTO t2 VALUES('1005','lisi');
INSERT INTO t2 VALUES('1006','wangwu');

INSERT INTO t3 VALUES('1001','F');
INSERT INTO t3 VALUES('1002','F');
INSERT INTO t3 VALUES('1003','M');
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
开启Binlog

通常来说默认安装MySQL的cnf都是存在/etc下的

sudo vim /etc/my.cnf
  • 1
# 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
# 数据库id
server-id = 1
# 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
default-time-zone = '+8:00'
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
# binlog类型,maxwell要求为row类型
binlog_format=row
# 启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
修改数据库时区

永久修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)

default-time-zone = '+8:00'
  • 1

临时修改(重启会丢失)

# MySQL 8 执行这个
set persist time_zone='+8:00';

# MySQL 5.x版本执行这个
set time_zone='+8:00';
  • 1
  • 2
  • 3
  • 4
  • 5
重启MySQL

注意了, 设置后需要重启MySQL!

service mysqld restart
  • 1

代码

相关依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Flink CDC依赖 start-->
    <!-- Flink核心依赖, 提供了Flink的核心API -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--  Flink流处理Java API依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Flink客户端工具依赖, 包含命令行界面和实用函数 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Flink连接器基础包, 包含连接器公共功能 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 -->
    <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.2.0-1.19</version>
        </dependency>-->
    <!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Flink Table API桥接器, 连接DataStream API和Table API -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Flink JSON格式化数据依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 开启Web UI支持, 端口为8081, 默认为不开启-->
    <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>1.19.1</version>
        </dependency>-->

    <!-- MySQL CDC依赖
        org.apache.flink的适用MySQL 8.0

         具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408
         -->
    <dependency>
        <!--MySQL 8.0适用-->
        <!--<groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>3.1.0</version>-->

        <!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用-->
        <groupId>com.ververica</groupId>
        <artifactId>flink-sql-connector-mysql-cdc</artifactId>
        <!--<version>2.3.0</version>-->
        <version>3.0.1</version>
    </dependency>

    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!-- gson工具类 -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.11.0</version>
    </dependency>

    <!-- ognl表达式 -->
    <dependency>
        <groupId>ognl</groupId>
        <artifactId>ognl</artifactId>
        <version>3.1.1</version>
    </dependency>

    <!-- hutool工具类 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.26</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.31</version>
    </dependency>
</dependencies>
<name>cdc-test</name>
<description>cdc-test</description>
<properties>
    <java.version>11</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.6.13</spring-boot.version>
    <flink.version>1.19.0</flink.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
yaml
# Flink CDC相关配置
flink-cdc:
  mysql:
    hostname: 192.168.132.101
    port: 3306
    username: root
    password: 12345678
    databaseList: test
    tableList: test.student, test.t1
    includeSchemaChanges: false
    parallelism: 1
    enableCheckpointing: 5000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
FlinkCDCConfig
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description: Flink CDC配置
 */
@Data
@Configuration
@ConfigurationProperties("flink-cdc.mysql")
public class FlinkCDCConfig {
    /**
     * 数据库地址
     */
    private String hostname;

    /**
     * 数据库端口
     */
    private Integer port;

    /**
     * 数据库用户名
     */
    private String username;

    /**
     * 数据库密码
     */
    private String password;

    /**
     * 数据库名
     */
    private String[] databaseList;

    /**
     * 表名
     */
    private String[] tableList;

    /**
     * 是否包含schema变更
     */
    private Boolean includeSchemaChanges;

    /**
     * 并行度
     */
    private Integer parallelism;

    /**
     * 检查点间隔, 单位毫秒
     */
    private Integer enableCheckpointing;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
相关枚举
OperatorTypeEnum
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 操作类型枚举
 */
@Getter
@AllArgsConstructor
public enum OperatorTypeEnum {
    /**
     * 新增
     */
    INSERT(1),

    /**
     * 修改
     */
    UPDATE(2),

    /**
     * 删除
     */
    DELETE(3),
    ;

    private final int type;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
StrategyEnum
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.beans.Introspector;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 表处理策略
 * todo 后续在这里新增相关枚举即可
 */
@Getter
@AllArgsConstructor
public enum StrategyEnum {
    /**
     * Student处理策略
     */
    STUDENT("student", Student.class, Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),
    ;

    /**
     * 表名
     */
    private final String tableName;

    /**
     * class对象
     */
    private final Class<?> varClass;

    /**
     * 处理器名
     */
    private final String handlerName;

    /**
     * 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回
     *
     * @param dataChangeInfo 数据变更对象
     * @return StrategyHandlerSelector
     */
    public static StrategyHandleSelector getSelector(DataChangeInfo dataChangeInfo) {
        if (ObjUtil.isNull(dataChangeInfo)) {
            return null;
        }
        String tableName = dataChangeInfo.getTableName();
        StrategyHandleSelector selector = new StrategyHandleSelector();
        // 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略
        for (StrategyEnum strategyEnum : values()) {
            // 如果找到匹配的策略, 创建并配置 StrategyHandleSelector
            if (strategyEnum.getTableName().equals(tableName)) {
                selector.setHandlerName(strategyEnum.handlerName);
                selector.setOperatorTime(dataChangeInfo.getOperatorTime());
                selector.setOperatorType(dataChangeInfo.getOperatorType());
                JSONObject jsonObject = JSONUtil.parseObj(dataChangeInfo.getData());
                selector.setData(BeanUtil.copyProperties(jsonObject, strategyEnum.varClass));
                return selector;
            }
        }
        return null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
pojo
Student
import lombok.Data;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 学生类, 用于演示
 */
@Data
public class Student {
    /**
     * id
     */
    private Integer id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 年龄
     */
    private Integer age;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
DataChangeInfo
import lombok.Data;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 数据变更对象
 */
@Data
public class DataChangeInfo {
    /**
     * 变更前数据
     */
    private String beforeData;

    /**
     * 变更后数据
     */
    private String afterData;

    /**
     * 操作的数据
     */
    private String data;

    /**
     * 变更类型 1->新增 2->修改 3->删除
     */
    private Integer operatorType;

    /**
     * binlog文件名
     */
    private String fileName;

    /**
     * binlog当前读取点位
     */
    private Integer filePos;
    /**
     * 数据库名
     */
    private String database;

    /**
     * 表名
     */
    private String tableName;

    /**
     * 变更时间
     */
    private Long operatorTime;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
自定义Sink
DataChangeSink
import cn.hutool.core.util.ObjUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.Map;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description
 */
@Slf4j
@Component
@AllArgsConstructor
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> implements Serializable {
    /**
     * BaseLogHandler相关的缓存
     * Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中
     */
    private final Map<String, BaseLogHandler> strategyHandlerMap;

    @Override
    public void invoke(DataChangeInfo value, Context context) {
        log.info("收到变更原始数据:{}", value);
        // 选择策略
        StrategyHandleSelector selector = StrategyEnum.getSelector(value);
        if (ObjUtil.isNull(selector)) {
            return;
        }
        BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getHandlerName());

        // insert操作
        if (selector.getOperatorType().equals(OperatorTypeEnum.INSERT.getType())) {
            handler.handleInsertLog(selector.getData(), selector.getOperatorTime());
            return;
        }
        // update操作
        if (selector.getOperatorType().equals(OperatorTypeEnum.UPDATE.getType())) {
            handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());
            return;
        }
        // delete操作
        if (selector.getOperatorType().equals(OperatorTypeEnum.DELETE.getType())) {
            handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());
        }
    }

    /**
     * 前置操作
     */
    @Override
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
    }

    /**
     * 后置操作
     */
    @Override
    public void close() throws Exception {
        super.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
CustomSink
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 自定义Sink处理器, 这个是根据ognl表达式区分ddl语句类型, 搭配
 */
@Slf4j
@Component
public class CustomSink extends RichSinkFunction<String> {
    @Override
    public void invoke(String json, Context context) throws Exception {
        // op字段:  该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read
        // 对于U操作,其数据部分同时包含了Before和After。
        log.info("监听到数据: {}", json);
        String op = JSONUtil.getValue(json, "op", String.class);
        // 语句的id
        Integer id = null;
        // 如果是update语句
        if ("u".equals(op)) {
            id = JSONUtil.getValue(json, "after.id", Integer.class);
            log.info("执行update语句");
            // 执行update语句
        }

        // 如果是delete语句
        if ("d".equals(op)) {
            id = JSONUtil.getValue(json, "before.id", Integer.class);
            log.info("执行delete语句");
            // 执行删除语句
        }
        // 如果是新增
        if ("c".equals(op)) {
            log.info("执行insert语句");
        }
    }

    // 前置操作
    @Override
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
    }

    // 后置操作
    @Override
    public void close() throws Exception {
        super.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
自定义反序列化器 MySQLDeserialization
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySQL消息读取 自定义反序列化器
 */
@Slf4j
@Service
public class MySQLDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {

    public static final String TS_MS = "ts_ms";
    public static final String BIN_FILE = "file";
    public static final String POS = "pos";
    public static final String CREATE = "CREATE";
    public static final String BEFORE = "before";
    public static final String AFTER = "after";
    public static final String SOURCE = "source";
    public static final String UPDATE = "UPDATE";

    /**
     * 反序列化数据, 转为变更JSON对象
     *
     * @param sourceRecord SourceRecord
     * @param collector Collector<DataChangeInfo>
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
        try {
            // 根据主题的格式,获取数据库名(database)和表名(tableName)
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];

            Struct struct = (Struct) sourceRecord.value();
            final Struct source = struct.getStruct(SOURCE);
            DataChangeInfo dataChangeInfo = new DataChangeInfo();
            // 获取操作类型  CREATE UPDATE DELETE
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toUpperCase();
            int eventType = type.equals(CREATE) ? OperatorTypeEnum.INSERT.getType() : UPDATE.equals(type) ?
                    OperatorTypeEnum.UPDATE.getType() : OperatorTypeEnum.DELETE.getType();

            // 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出
            // 获取变更前和变更后的数据,并将其设置到DataChangeInfo对象中
            dataChangeInfo.setBeforeData(this.getJsonObject(struct, BEFORE).toJSONString());
            dataChangeInfo.setAfterData(this.getJsonObject(struct, AFTER).toJSONString());
            if (eventType == OperatorTypeEnum.DELETE.getType()) {
                dataChangeInfo.setData(this.getJsonObject(struct, BEFORE).toJSONString());
            } else {
                dataChangeInfo.setData(this.getJsonObject(struct, AFTER).toJSONString());
            }
            dataChangeInfo.setOperatorType(eventType);
            dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE))
                    .map(Object::toString)
                    .orElse(""));
            dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS))
                    .map(x -> Integer.parseInt(x.toString()))
                    .orElse(0));
            dataChangeInfo.setDatabase(database);
            dataChangeInfo.setTableName(tableName);
            dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS))
                    .map(x -> Long.parseLong(x.toString()))
                    .orElseGet(System::currentTimeMillis));
            // 输出数据
            collector.collect(dataChangeInfo);
        } catch (Exception e) {
            log.error("反序列binlog失败", e);
        }
    }

    /**
     * 从源数据获取出变更之前或之后的数据
     *
     * @param value Struct
     * @param fieldElement 字段
     * @return JSONObject
     */
    private JSONObject getJsonObject(Struct value, String fieldElement) {
        Struct element = value.getStruct(fieldElement);
        JSONObject jsonObject = new JSONObject();
        if (element != null) {
            Schema afterSchema = element.schema();
            List<Field> fieldList = afterSchema.fields();
            for (Field field : fieldList) {
                Object afterValue = element.get(field);
                jsonObject.put(field.name(), afterValue);
            }
        }
        return jsonObject;
    }


    @Override
    public TypeInformation<DataChangeInfo> getProducedType() {
        return TypeInformation.of(DataChangeInfo.class);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
LogHandler
BaseLogHandler
import java.io.Serializable;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 日志处理器
 * todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现
 */
public interface BaseLogHandler<T> extends Serializable {
    /**
     * 日志处理
     *
     * @param data 数据转换后模型
     * @param operatorTime 操作时间
     */
    void handleInsertLog(T data, Long operatorTime);

    /**
     * 日志处理
     *
     * @param data 数据转换后模型
     * @param operatorTime 操作时间
     */
    void handleUpdateLog(T data, Long operatorTime);

    /**
     * 日志处理
     *
     * @param data 数据转换后模型
     * @param operatorTime 操作时间
     */
    void handleDeleteLog(T data, Long operatorTime);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
StrategyHandleSelector
import lombok.Data;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description 策略处理选择器
 */
@Data
public class StrategyHandleSelector {
    /**
     * 策略处理器名称
     */
    private String handlerName;

    /**
     * 数据源
     */
    private Object data;

    /**
     * 操作时间
     */
    private Long operatorTime;

    /**
     * 操作类型
     */
    private Integer operatorType;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
StudentLogHandler
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description Student对应处理器
 */
@Slf4j
@Service
public class StudentLogHandler implements BaseLogHandler<Student> {
    @Override
    public void handleInsertLog(Student data, Long operatorTime) {
        log.info("处理Student表的新增日志: {}", data);
    }

    @Override
    public void handleUpdateLog(Student data, Long operatorTime) {
        log.info("处理Student表的修改日志: {}", data);
    }

    @Override
    public void handleDeleteLog(Student data, Long operatorTime) {
        log.info("处理Student表的删除日志: {}", data);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
JSONUtil
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import ognl.Ognl;
import ognl.OgnlContext;

import java.util.Map;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description: JSON工具类
 */
public class JSONUtil {
    /**
     * 将指定JSON转为Map对象, Key类型为String,对应JSON的key
     * Value分情况:
     * 1. Value是字符串, 自动转为字符串, 例如:{"a","b"}
     * 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}}
     * 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]}
     *
     * @param json 输入的的JSON对象
     * @return 动态Map集合
     */
    public static Map<String, Object> transferToMap(String json) {
        Gson gson = new Gson();
        Map<String, Object> map = gson.fromJson(json, new TypeToken<Map<String, Object>>() {}.getType());
        return map;
    }

    /**
     * 获取指定JSON的指定路径的值
     *
     * @param json  原始JSON数据
     * @param path  OGNL原则表达式
     * @param clazz Value对应的目标类
     * @return clazz对应的数据
     */
    public static <T> T getValue(String json, String path, Class<T> clazz) {
        try {
            Map<String, Object> map = JSONUtil.transferToMap(json);
            OgnlContext ognlContext = new OgnlContext();
            ognlContext.setRoot(map);
            T value = (T) Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);
            return value;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
MysqlEventListener
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import lombok.AllArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySQL变更监听
 */
@Component
@AllArgsConstructor
public class MysqlEventListener implements ApplicationRunner {

    /**
     * Flink CDC相关配置
     */
    private final FlinkCDCConfig flinkCDCConfig;

    /**
     * 自定义Sink
     * customSink: 通过ognl解析ddl语句类型
     * dataChangeSink: 通过struct解析ddl语句类型
     * 通常两个选择一个就行
     */
    private final CustomSink customSink;
    private final DataChangeSink dataChangeSink;

    /**
     * 自定义反序列化处理器
     */
    private final MySQLDeserialization mySQLDeserialization;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置整个Flink程序的默认并行度
        env.setParallelism(flinkCDCConfig.getParallelism());
        // 设置checkpoint 间隔
        env.enableCheckpointing(flinkCDCConfig.getEnableCheckpointing());
        // 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // todo 下列的两个MySqlSource选择一个
        // 自定义的反序列化器
        // MySqlSource<DataChangeInfo> mySqlSource = this.buildBaseMySqlSource(DataChangeInfo.class)
        //         .deserializer(mySQLDeserialization)
        //         .build();

        // Flink CDC自带的反序列化器
        MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();


        env.fromSource(mySqlSource,
                       WatermarkStrategy.noWatermarks(),
                       "mysql-source")
            // 设置该数据源的并行度
            .setParallelism(flinkCDCConfig.getParallelism())
            // todo 根据上述的选择,选择对应的Sink
            // .addSink(dataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink
            .addSink(customSink);

        env.execute("mysql-stream-cdc");
    }

    /**
     * 构建基本的MySqlSourceBuilder
     *
     * @param clazz 返回的数据类型Class对象
     * @param <T>   源数据中存储的类型
     * @return MySqlSourceBuilder
     */
    private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) {
        return MySqlSource.<T>builder()
            .hostname(flinkCDCConfig.getHostname())
            .port(flinkCDCConfig.getPort())
            .username(flinkCDCConfig.getUsername())
            .password(flinkCDCConfig.getPassword())
            .databaseList(flinkCDCConfig.getDatabaseList())
            .tableList(flinkCDCConfig.getTableList())
            /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest: 只进行增量导入(不读取历史变化)
                 * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */
            .startupOptions(StartupOptions.latest())
            .includeSchemaChanges(flinkCDCConfig.getIncludeSchemaChanges()) // 包括schema的改变
            .serverTimeZone("GMT+8"); // 时区
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/1017607
推荐阅读
相关标签
  

闽ICP备14008679号