赞
踩
CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。
CDC Connectors for Apache Flink ®是一组用于Apache Flink ®的源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更。用于 Apache Flink ®的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
白话的意思是,Flink-CDC 一个成型的cdc技术实现(Debezium)的包装,我前面也使用过Debezium,并编写了一个简略的博客,感兴趣的可以戳下方连接去看一下
springboot+debezium捕获数据库变更(mysql、sql-server、mongodb、oracle…)
支持读取数据库快照,即使发生故障也能继续读取binlog,一次处理。
DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。
Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监控单个表的更改。
下表显示了 Flink® CDC 连接器和 Flink® 之间的版本映射:
Flink ® CDC 版本 | Flink®版本_ |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.* , 1.14.* |
按常理来说,一个正常的flink-job 最终我们并不会集成到springboot项目中,我们会直接编写一个maven项目,在发布时使用flink程序来启动任务
比如官网示例:
本文即要使用flink-cdc进行数据变更捕获 (可以视作为一个flink-job),但又要契合我们的springboot项目,使用spring的特性,因此,我们需要转换一下思路,转换成什么样子呢?就是不要将这个flink-cdc作为一个job 使用flink程序进行发布提交,我们就当它在我们开发时一样,作为一个本地项目,main方法启动
flink客户端版本使用 1.13.6 cdc 版本使用 2.0.0
<properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <flink.version>1.13.6</flink.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <!--mysql -cdc--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.18</version> </dependency> </dependencies>
无法简单的使用main方法来启动cdc 作业,因为如果这样的话,我们就无法与spring完美的契合
因此我们可以利用springboot的特性, 实现 ApplicationRunner 将flink-cdc 作为一个项目启动时需要运行的分支子任务即可
package com.leilei.mysql; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; /** * @author lei * @create 2022-08-25 13:42 * @desc mysql变更监听 **/ @Component public class MysqlEventListener implements ApplicationRunner { private final DataChangeSink dataChangeSink; public MysqlEventListener(DataChangeSink dataChangeSink) { this.dataChangeSink = dataChangeSink; } @Override public void run(ApplicationArguments args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource(); DataStream<DataChangeInfo> streamSource = env .addSource(dataChangeInfoMySqlSource, "mysql-source") .setParallelism(1); streamSource.addSink(dataChangeSink); env.execute("mysql-stream-cdc"); } /** * 构造变更数据源 * * @param * @return DebeziumSourceFunction<DataChangeInfo> * @author lei * @date 2022-08-25 15:29:38 */ private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() { return MySqlSource.<DataChangeInfo>builder() .hostname("10.50.40.145") .port(3306) .databaseList("paas_common_db") .tableList("paas_common_db.base_business_driver_score_*") .username("root") .password("cdwk-3g-145") /**initial初始化快照,即全量导入后增量导入(检测更新数据写入) * latest:只进行增量导入(不读取历史变化) * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据) */ .startupOptions(StartupOptions.latest()) .deserializer(new MysqlDeserialization()) .serverTimeZone("GMT+8") .build(); } }
我这里解析为一个数据变更对象
package com.leilei.mysql; import com.alibaba.fastjson.JSON; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import com.alibaba.fastjson.JSONObject; import java.util.List; import java.util.Optional; /** * @author lei * @create 2022-08-25 13:43 * @desc mysql消息读取自定义序列化 **/ public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> { public static final String TS_MS = "ts_ms"; public static final String BIN_FILE = "file"; public static final String POS = "pos"; public static final String CREATE = "CREATE"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String UPDATE = "UPDATE"; /** * * 反序列化数据,转为变更JSON对象 * @param sourceRecord * @param collector * @return void * @author lei * @date 2022-08-25 14:44:31 */ @Override public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); //5.获取操作类型 CREATE UPDATE DELETE Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse("")); dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0)); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis)); //7.输出数据 collector.collect(dataChangeInfo); } /** * * 从袁术数据获取出变更之前或之后的数据 * @param value * @param fieldElement * @return JSONObject * @author lei * @date 2022-08-25 14:48:13 */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List<Field> fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation<DataChangeInfo> getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
import lombok.Data; /** * @author lei * @create 2022-08-25 14:33 * @desc 数据变更对象 **/ @Data public class DataChangeInfo { /** * 变更前数据 */ private String beforeData; /** * 变更后数据 */ private String afterData; /** * 变更类型 1新增 2修改 3删除 */ private Integer eventType; /** * binlog文件名 */ private String fileName; /** * binlog当前读取点位 */ private Integer filePos; /** * 数据库名 */ private String database; /** * 表名 */ private String tableName; /** * 变更时间 */ private Long changeTime; }
package com.leilei.mysql; import lombok.extern.log4j.Log4j2; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.springframework.stereotype.Component; /** * @author lei * @create 2022-08-25 14:01 * @desc **/ @Component @Log4j2 public class DataChangeSink implements SinkFunction<DataChangeInfo> { @Override public void invoke(DataChangeInfo value, Context context) { log.info("收到变更原始数据:{}", value); // todo 数据处理;因为此sink也是交由了spring管理,您想进行任何操作都非常简单 } }
当然,以上仅仅只是整合思路,如果你想使用flink-cdc 进行数据同步或日志记录等,结合您自身的需求进行调整接口,以上内容,大的架子是没问题的
如果遇到问题,可以先从官网QA寻找:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
项目源码:springboot-flink-cdc
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。