赞
踩
CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到MQ以供其他服务进行订阅及消费
CDC主要分为基于查询和基于Binlog
基于查询 | 基于Binlog | |
---|---|---|
开源产品 | Sqoop、DataX | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也因为这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到
Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的source组件。
目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
# 创建test数据库 create database test; # 在test库中创建studnet, t1, t2, t3表, 插入数据 use test; CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `age` int(11) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; INSERT INTO `student` VALUES (1, 'joy', 18); INSERT INTO `student` VALUES (2, 'tom', 123123); CREATE TABLE t1( `id` VARCHAR(255) PRIMARY KEY, `name` VARCHAR(255) ); CREATE TABLE t2( `id` VARCHAR(255) PRIMARY KEY, `name` VARCHAR(255) ); CREATE TABLE t3( `id` VARCHAR(255) PRIMARY KEY, `name` VARCHAR(255) ); # 创建test_route数据库 create database test_route; # 在test_route库中创建t1, t2, t3表 use test_route; CREATE TABLE t1( `id` VARCHAR(255) PRIMARY KEY, `name` VARCHAR(255) ); CREATE TABLE t2( `id` VARCHAR(255) PRIMARY KEY, `name` VARCHAR(255) ); CREATE TABLE t3( `id` VARCHAR(255) PRIMARY KEY, `name` VARCHAR(255) ); # 在test_route数据库中的t1, t2, t3表插入数据 use test_route; INSERT INTO t1 VALUES('1001','zhangsan'); INSERT INTO t1 VALUES('1002','lisi'); INSERT INTO t1 VALUES('1003','wangwu'); INSERT INTO t2 VALUES('1004','zhangsan'); INSERT INTO t2 VALUES('1005','lisi'); INSERT INTO t2 VALUES('1006','wangwu'); INSERT INTO t3 VALUES('1001','F'); INSERT INTO t3 VALUES('1002','F'); INSERT INTO t3 VALUES('1003','M');
通常来说默认安装MySQL的cnf都是存在/etc下的
sudo vim /etc/my.cnf
# 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
# 数据库id
server-id = 1
# 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
default-time-zone = '+8:00'
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
# binlog类型,maxwell要求为row类型
binlog_format=row
# 启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=test
binlog-do-db=test_route
永久修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)
default-time-zone = '+8:00'
临时修改(重启会丢失)
# MySQL 8 执行这个
set persist time_zone='+8:00';
# MySQL 5.x版本执行这个
set time_zone='+8:00';
注意了, 设置后需要重启MySQL!
service mysqld restart
- 1
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Flink CDC依赖 start--> <!-- Flink核心依赖, 提供了Flink的核心API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink流处理Java API依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink客户端工具依赖, 包含命令行界面和实用函数 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink连接器基础包, 包含连接器公共功能 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 --> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>3.2.0-1.19</version> </dependency>--> <!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink Table API桥接器, 连接DataStream API和Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink JSON格式化数据依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <!-- 开启Web UI支持, 端口为8081, 默认为不开启--> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> <version>1.19.1</version> </dependency>--> <!-- MySQL CDC依赖 org.apache.flink的适用MySQL 8.0 具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408 --> <dependency> <!--MySQL 8.0适用--> <!--<groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>3.1.0</version>--> <!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用--> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <!--<version>2.3.0</version>--> <version>3.0.1</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- gson工具类 --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.11.0</version> </dependency> <!-- ognl表达式 --> <dependency> <groupId>ognl</groupId> <artifactId>ognl</artifactId> <version>3.1.1</version> </dependency> <!-- hutool工具类 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.26</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.31</version> </dependency> </dependencies> <name>cdc-test</name> <description>cdc-test</description> <properties> <java.version>11</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.6.13</spring-boot.version> <flink.version>1.19.0</flink.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
# Flink CDC相关配置
flink-cdc:
mysql:
hostname: 192.168.132.101
port: 3306
username: root
password: 12345678
databaseList: test
tableList: test.student, test.t1
includeSchemaChanges: false
parallelism: 1
enableCheckpointing: 5000
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @author whiteBrocade * @version 1.0 * @description: Flink CDC配置 */ @Data @Configuration @ConfigurationProperties("flink-cdc.mysql") public class FlinkCDCConfig { /** * 数据库地址 */ private String hostname; /** * 数据库端口 */ private Integer port; /** * 数据库用户名 */ private String username; /** * 数据库密码 */ private String password; /** * 数据库名 */ private String[] databaseList; /** * 表名 */ private String[] tableList; /** * 是否包含schema变更 */ private Boolean includeSchemaChanges; /** * 并行度 */ private Integer parallelism; /** * 检查点间隔, 单位毫秒 */ private Integer enableCheckpointing; }
import lombok.AllArgsConstructor; import lombok.Getter; /** * @author whiteBrocade * @version 1.0 * @description 操作类型枚举 */ @Getter @AllArgsConstructor public enum OperatorTypeEnum { /** * 新增 */ INSERT(1), /** * 修改 */ UPDATE(2), /** * 删除 */ DELETE(3), ; private final int type; }
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.ObjUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import lombok.AllArgsConstructor; import lombok.Getter; import java.beans.Introspector; /** * @author whiteBrocade * @version 1.0 * @description 表处理策略 * todo 后续在这里新增相关枚举即可 */ @Getter @AllArgsConstructor public enum StrategyEnum { /** * Student处理策略 */ STUDENT("student", Student.class, Introspector.decapitalize(StudentLogHandler.class.getSimpleName())), ; /** * 表名 */ private final String tableName; /** * class对象 */ private final Class<?> varClass; /** * 处理器名 */ private final String handlerName; /** * 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回 * * @param dataChangeInfo 数据变更对象 * @return StrategyHandlerSelector */ public static StrategyHandleSelector getSelector(DataChangeInfo dataChangeInfo) { if (ObjUtil.isNull(dataChangeInfo)) { return null; } String tableName = dataChangeInfo.getTableName(); StrategyHandleSelector selector = new StrategyHandleSelector(); // 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略 for (StrategyEnum strategyEnum : values()) { // 如果找到匹配的策略, 创建并配置 StrategyHandleSelector if (strategyEnum.getTableName().equals(tableName)) { selector.setHandlerName(strategyEnum.handlerName); selector.setOperatorTime(dataChangeInfo.getOperatorTime()); selector.setOperatorType(dataChangeInfo.getOperatorType()); JSONObject jsonObject = JSONUtil.parseObj(dataChangeInfo.getData()); selector.setData(BeanUtil.copyProperties(jsonObject, strategyEnum.varClass)); return selector; } } return null; } }
import lombok.Data; /** * @author whiteBrocade * @version 1.0 * @description 学生类, 用于演示 */ @Data public class Student { /** * id */ private Integer id; /** * 姓名 */ private String name; /** * 年龄 */ private Integer age; }
import lombok.Data; /** * @author whiteBrocade * @version 1.0 * @description 数据变更对象 */ @Data public class DataChangeInfo { /** * 变更前数据 */ private String beforeData; /** * 变更后数据 */ private String afterData; /** * 操作的数据 */ private String data; /** * 变更类型 1->新增 2->修改 3->删除 */ private Integer operatorType; /** * binlog文件名 */ private String fileName; /** * binlog当前读取点位 */ private Integer filePos; /** * 数据库名 */ private String database; /** * 表名 */ private String tableName; /** * 变更时间 */ private Long operatorTime; }
import cn.hutool.core.util.ObjUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.Map; /** * @author whiteBrocade * @version 1.0 * @description */ @Slf4j @Component @AllArgsConstructor public class DataChangeSink extends RichSinkFunction<DataChangeInfo> implements Serializable { /** * BaseLogHandler相关的缓存 * Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中 */ private final Map<String, BaseLogHandler> strategyHandlerMap; @Override public void invoke(DataChangeInfo value, Context context) { log.info("收到变更原始数据:{}", value); // 选择策略 StrategyHandleSelector selector = StrategyEnum.getSelector(value); if (ObjUtil.isNull(selector)) { return; } BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getHandlerName()); // insert操作 if (selector.getOperatorType().equals(OperatorTypeEnum.INSERT.getType())) { handler.handleInsertLog(selector.getData(), selector.getOperatorTime()); return; } // update操作 if (selector.getOperatorType().equals(OperatorTypeEnum.UPDATE.getType())) { handler.handleUpdateLog(selector.getData(), selector.getOperatorTime()); return; } // delete操作 if (selector.getOperatorType().equals(OperatorTypeEnum.DELETE.getType())) { handler.handleDeleteLog(selector.getData(), selector.getOperatorTime()); } } /** * 前置操作 */ @Override public void open(OpenContext openContext) throws Exception { super.open(openContext); } /** * 后置操作 */ @Override public void close() throws Exception { super.close(); } }
import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.springframework.stereotype.Component; /** * @author whiteBrocade * @version 1.0 * @description 自定义Sink处理器, 这个是根据ognl表达式区分ddl语句类型, 搭配 */ @Slf4j @Component public class CustomSink extends RichSinkFunction<String> { @Override public void invoke(String json, Context context) throws Exception { // op字段: 该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read // 对于U操作,其数据部分同时包含了Before和After。 log.info("监听到数据: {}", json); String op = JSONUtil.getValue(json, "op", String.class); // 语句的id Integer id = null; // 如果是update语句 if ("u".equals(op)) { id = JSONUtil.getValue(json, "after.id", Integer.class); log.info("执行update语句"); // 执行update语句 } // 如果是delete语句 if ("d".equals(op)) { id = JSONUtil.getValue(json, "before.id", Integer.class); log.info("执行delete语句"); // 执行删除语句 } // 如果是新增 if ("c".equals(op)) { log.info("执行insert语句"); } } // 前置操作 @Override public void open(OpenContext openContext) throws Exception { super.open(openContext); } // 后置操作 @Override public void close() throws Exception { super.close(); } }
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.springframework.stereotype.Service; import java.util.List; import java.util.Optional; /** * @author whiteBrocade * @version 1.0 * @description MySQL消息读取 自定义反序列化器 */ @Slf4j @Service public class MySQLDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> { public static final String TS_MS = "ts_ms"; public static final String BIN_FILE = "file"; public static final String POS = "pos"; public static final String CREATE = "CREATE"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String UPDATE = "UPDATE"; /** * 反序列化数据, 转为变更JSON对象 * * @param sourceRecord SourceRecord * @param collector Collector<DataChangeInfo> */ @Override public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) { try { // 根据主题的格式,获取数据库名(database)和表名(tableName) String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); // 获取操作类型 CREATE UPDATE DELETE Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? OperatorTypeEnum.INSERT.getType() : UPDATE.equals(type) ? OperatorTypeEnum.UPDATE.getType() : OperatorTypeEnum.DELETE.getType(); // 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出 // 获取变更前和变更后的数据,并将其设置到DataChangeInfo对象中 dataChangeInfo.setBeforeData(this.getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(this.getJsonObject(struct, AFTER).toJSONString()); if (eventType == OperatorTypeEnum.DELETE.getType()) { dataChangeInfo.setData(this.getJsonObject(struct, BEFORE).toJSONString()); } else { dataChangeInfo.setData(this.getJsonObject(struct, AFTER).toJSONString()); } dataChangeInfo.setOperatorType(eventType); dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)) .map(Object::toString) .orElse("")); dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)) .map(x -> Integer.parseInt(x.toString())) .orElse(0)); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS)) .map(x -> Long.parseLong(x.toString())) .orElseGet(System::currentTimeMillis)); // 输出数据 collector.collect(dataChangeInfo); } catch (Exception e) { log.error("反序列binlog失败", e); } } /** * 从源数据获取出变更之前或之后的数据 * * @param value Struct * @param fieldElement 字段 * @return JSONObject */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List<Field> fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation<DataChangeInfo> getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
import java.io.Serializable; /** * @author whiteBrocade * @version 1.0 * @description 日志处理器 * todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现 */ public interface BaseLogHandler<T> extends Serializable { /** * 日志处理 * * @param data 数据转换后模型 * @param operatorTime 操作时间 */ void handleInsertLog(T data, Long operatorTime); /** * 日志处理 * * @param data 数据转换后模型 * @param operatorTime 操作时间 */ void handleUpdateLog(T data, Long operatorTime); /** * 日志处理 * * @param data 数据转换后模型 * @param operatorTime 操作时间 */ void handleDeleteLog(T data, Long operatorTime); }
import lombok.Data; /** * @author whiteBrocade * @version 1.0 * @description 策略处理选择器 */ @Data public class StrategyHandleSelector { /** * 策略处理器名称 */ private String handlerName; /** * 数据源 */ private Object data; /** * 操作时间 */ private Long operatorTime; /** * 操作类型 */ private Integer operatorType; }
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * @author whiteBrocade * @version 1.0 * @description Student对应处理器 */ @Slf4j @Service public class StudentLogHandler implements BaseLogHandler<Student> { @Override public void handleInsertLog(Student data, Long operatorTime) { log.info("处理Student表的新增日志: {}", data); } @Override public void handleUpdateLog(Student data, Long operatorTime) { log.info("处理Student表的修改日志: {}", data); } @Override public void handleDeleteLog(Student data, Long operatorTime) { log.info("处理Student表的删除日志: {}", data); } }
import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import ognl.Ognl; import ognl.OgnlContext; import java.util.Map; /** * @author whiteBrocade * @version 1.0 * @description: JSON工具类 */ public class JSONUtil { /** * 将指定JSON转为Map对象, Key类型为String,对应JSON的key * Value分情况: * 1. Value是字符串, 自动转为字符串, 例如:{"a","b"} * 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}} * 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]} * * @param json 输入的的JSON对象 * @return 动态Map集合 */ public static Map<String, Object> transferToMap(String json) { Gson gson = new Gson(); Map<String, Object> map = gson.fromJson(json, new TypeToken<Map<String, Object>>() {}.getType()); return map; } /** * 获取指定JSON的指定路径的值 * * @param json 原始JSON数据 * @param path OGNL原则表达式 * @param clazz Value对应的目标类 * @return clazz对应的数据 */ public static <T> T getValue(String json, String path, Class<T> clazz) { try { Map<String, Object> map = JSONUtil.transferToMap(json); OgnlContext ognlContext = new OgnlContext(); ognlContext.setRoot(map); T value = (T) Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz); return value; } catch (Exception e) { throw new RuntimeException(e); } } }
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import lombok.AllArgsConstructor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; /** * @author whiteBrocade * @version 1.0 * @description MySQL变更监听 */ @Component @AllArgsConstructor public class MysqlEventListener implements ApplicationRunner { /** * Flink CDC相关配置 */ private final FlinkCDCConfig flinkCDCConfig; /** * 自定义Sink * customSink: 通过ognl解析ddl语句类型 * dataChangeSink: 通过struct解析ddl语句类型 * 通常两个选择一个就行 */ private final CustomSink customSink; private final DataChangeSink dataChangeSink; /** * 自定义反序列化处理器 */ private final MySQLDeserialization mySQLDeserialization; @Override public void run(ApplicationArguments args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置整个Flink程序的默认并行度 env.setParallelism(flinkCDCConfig.getParallelism()); // 设置checkpoint 间隔 env.enableCheckpointing(flinkCDCConfig.getEnableCheckpointing()); // 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // todo 下列的两个MySqlSource选择一个 // 自定义的反序列化器 // MySqlSource<DataChangeInfo> mySqlSource = this.buildBaseMySqlSource(DataChangeInfo.class) // .deserializer(mySQLDeserialization) // .build(); // Flink CDC自带的反序列化器 MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source") // 设置该数据源的并行度 .setParallelism(flinkCDCConfig.getParallelism()) // todo 根据上述的选择,选择对应的Sink // .addSink(dataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink .addSink(customSink); env.execute("mysql-stream-cdc"); } /** * 构建基本的MySqlSourceBuilder * * @param clazz 返回的数据类型Class对象 * @param <T> 源数据中存储的类型 * @return MySqlSourceBuilder */ private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) { return MySqlSource.<T>builder() .hostname(flinkCDCConfig.getHostname()) .port(flinkCDCConfig.getPort()) .username(flinkCDCConfig.getUsername()) .password(flinkCDCConfig.getPassword()) .databaseList(flinkCDCConfig.getDatabaseList()) .tableList(flinkCDCConfig.getTableList()) /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入) * latest: 只进行增量导入(不读取历史变化) * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据) */ .startupOptions(StartupOptions.latest()) .includeSchemaChanges(flinkCDCConfig.getIncludeSchemaChanges()) // 包括schema的改变 .serverTimeZone("GMT+8"); // 时区 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。