赞
踩
目录
我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了Flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》
废话不多说,直接上干货,如有不足还请指正
- <properties>
- <flink.version>1.16.0</flink.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>com.microsoft.sqlserver</groupId>
- <artifactId>mssql-jdbc</artifactId>
- <version>9.4.0.jre8</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.26</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-sqlserver-cdc</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.11</artifactId>
- <version>1.13.6</version>
- </dependency>
- </dependencies>
- spring:
- datasource:
- url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
- username: sa
- password: root
- driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
- # 实时同步SQL Server数据库配置
- CDC:
- DataSource:
- host: 127.0.0.1
- port: 1433
- database: HM_5001
- tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
- username: sa
- password: sa
- import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
- import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.stereotype.Component;
-
- import java.io.Serializable;
-
- /**
- * SQL server CDC变更监听器
- **/
- @Component
- @Slf4j
- public class SQLServerCDCListener implements ApplicationRunner, Serializable {
-
- /**
- * CDC数据源配置
- */
- @Value("${CDC.DataSource.host}")
- private String host;
- @Value("${CDC.DataSource.port}")
- private String port;
- @Value("${CDC.DataSource.database}")
- private String database;
- @Value("${CDC.DataSource.tableList}")
- private String tableList;
- @Value("${CDC.DataSource.username}")
- private String username;
- @Value("${CDC.DataSource.password}")
- private String password;
-
- private final DataChangeSink dataChangeSink;
-
- public SQLServerCDCListener(DataChangeSink dataChangeSink) {
- this.dataChangeSink = dataChangeSink;
- }
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- log.info("开始启动Flink CDC获取ERP变更数据......");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
- DataStream<DataChangeInfo> streamSource = env
- .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
- .setParallelism(1);
- streamSource.addSink(dataChangeSink);
- env.execute("SQLServer-stream-cdc");
- }
-
- /**
- * 构造CDC数据源
- */
- private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
- String[] tables = tableList.replace(" ", "").split(",");
- return SqlServerSource.<DataChangeInfo>builder()
- .hostname(host)
- .port(Integer.parseInt(port))
- .database(database) // monitor sqlserver database
- .tableList(tables) // monitor products table
- .username(username)
- .password(password)
- /*
- *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
- * latest:只进行增量导入(不读取历史变化)
- */
- .startupOptions(StartupOptions.latest())
- .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
- .build();
- }
- }
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.time.ZoneId;
- import java.util.List;
- import java.util.Optional;
-
- /**
- * SQLServer消息读取自定义序列化
- **/
- @Slf4j
- public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {
-
- public static final String TS_MS = "ts_ms";
- public static final String BEFORE = "before";
- public static final String AFTER = "after";
- public static final String SOURCE = "source";
- public static final String CREATE = "CREATE";
- public static final String UPDATE = "UPDATE";
-
- /**
- *
- * 反序列化数据,转为变更JSON对象
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
- try {
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- String database = fields[1];
- String tableName = fields[2];
- Struct struct = (Struct) sourceRecord.value();
- final Struct source = struct.getStruct(SOURCE);
- DataChangeInfo dataChangeInfo = new DataChangeInfo();
- dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
- dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
- // 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- String type = operation.toString().toUpperCase();
- int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
- dataChangeInfo.setEventType(eventType);
- dataChangeInfo.setDatabase(database);
- dataChangeInfo.setTableName(tableName);
- ZoneId zone = ZoneId.systemDefault();
- Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
- dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
- //7.输出数据
- collector.collect(dataChangeInfo);
- } catch (Exception e) {
- log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- /**
- *
- * 从源数据获取出变更之前或之后的数据
- */
- private JSONObject getJsonObject(Struct value, String fieldElement) {
- Struct element = value.getStruct(fieldElement);
- JSONObject jsonObject = new JSONObject();
- if (element != null) {
- Schema afterSchema = element.schema();
- List<Field> fieldList = afterSchema.fields();
- for (Field field : fieldList) {
- Object afterValue = element.get(field);
- jsonObject.put(field.name(), afterValue);
- }
- }
- return jsonObject;
- }
-
-
-
- @Override
- public TypeInformation<DataChangeInfo> getProducedType() {
- return TypeInformation.of(DataChangeInfo.class);
- }
- }
- import lombok.Data;
- import java.io.Serializable;
- import java.time.LocalDateTime;
-
- /**
- * CDC 数据实体类
- */
- @Data
- public class DataChangeInfo implements Serializable {
-
- /**
- * 数据库名
- */
- private String database;
- /**
- * 表名
- */
- private String tableName;
- /**
- * 变更时间
- */
- private LocalDateTime changeTime;
- /**
- * 变更类型 1新增 2修改 3删除
- */
- private Integer eventType;
- /**
- * 变更前数据
- */
- private String beforeData;
- /**
- * 变更后数据
- */
- private String afterData;
- }
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
-
- import java.io.Serializable;
-
- @Component
- public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
- /**
- * 上下文
- */
- private static ApplicationContext context;
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.context = applicationContext;
- }
- public static ApplicationContext getApplicationContext() {
- return context;
- }
- public static <T> T getBean(Class<T> beanClass) {
- return context.getBean(beanClass);
- }
- }
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.springframework.stereotype.Component;
- import lombok.extern.slf4j.Slf4j;
-
- /**
- * 自定义sink 交由spring管理
- * 处理变更数据
- **/
- @Component
- @Slf4j
- public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {
-
- private static final long serialVersionUID = -74375380912179188L;
-
- private UserMapper userMapper;
-
- /**
- * 在open()方法中动态注入Spring容器的类
- * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;
- * 但是Flink启动的项目中,默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,
- * 只有在Spring所在的线程才能使用@Autowired,故在Flink自定义的Sink的open()方法中初始化Spring容器
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- userMapper = ApplicationContextUtil.getBean(UserMapper.class);
- }
-
- @Override
- public void invoke(DataChangeInfo dataChangeInfo, Context context) {
- log.info("收到变更原始数据:{}", dataChangeInfo);
- // TODO 开始处理你的数据吧
- }
以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。