当前位置:   article > 正文

实战Java springboot 采用Flink CDC操作SQL Server数据库获取增量变更数据_springboot flinkcdc

springboot flinkcdc

目录

前言:

1、springboot引入依赖:

2、yml配置文件

3、创建SQL server CDC变更数据监听器

4、反序列化数据,转为变更JSON对象

5、CDC 数据实体类

6、自定义ApplicationContextUtil

7、自定义sink 交由spring管理,处理变更数据


前言:

        我的场景是从SQL Server数据库获取指定表的增量数据,查询了很多获取增量数据的方案,最终选择了Flink的 flink-connector-sqlserver-cdc ,这个需要用到SQL Server 的CDC(变更数据捕获),通过CDC来获取增量数据,处理数据前需要对数据库进行配置,如果不清楚如何配置可以看看我这篇文章:《SQL Server数据库开启CDC变更数据捕获操作指引》

废话不多说,直接上干货,如有不足还请指正

1、springboot引入依赖:

  1. <properties>
  2. <flink.version>1.16.0</flink.version>
  3. </properties>
  4. <dependencies>
  5. <dependency>
  6. <groupId>com.microsoft.sqlserver</groupId>
  7. <artifactId>mssql-jdbc</artifactId>
  8. <version>9.4.0.jre8</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.projectlombok</groupId>
  12. <artifactId>lombok</artifactId>
  13. <version>1.18.26</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-java</artifactId>
  18. <version>${flink.version}</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-streaming-java</artifactId>
  23. <version>${flink.version}</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.flink</groupId>
  27. <artifactId>flink-clients</artifactId>
  28. <version>${flink.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>com.ververica</groupId>
  32. <artifactId>flink-connector-sqlserver-cdc</artifactId>
  33. <version>2.3.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-connector-kafka</artifactId>
  38. <version>${flink.version}</version>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.apache.flink</groupId>
  42. <artifactId>flink-table-planner-blink_2.11</artifactId>
  43. <version>1.13.6</version>
  44. </dependency>
  45. </dependencies>

2、yml配置文件

  1. spring:
  2. datasource:
  3. url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
  4. username: sa
  5. password: root
  6. driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
  7. # 实时同步SQL Server数据库配置
  8. CDC:
  9. DataSource:
  10. host: 127.0.0.1
  11. port: 1433
  12. database: HM_5001
  13. tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
  14. username: sa
  15. password: sa

3、创建SQL server CDC变更数据监听器

  1. import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
  2. import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
  3. import com.ververica.cdc.debezium.DebeziumSourceFunction;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.boot.ApplicationArguments;
  9. import org.springframework.boot.ApplicationRunner;
  10. import org.springframework.stereotype.Component;
  11. import java.io.Serializable;
  12. /**
  13. * SQL server CDC变更监听器
  14. **/
  15. @Component
  16. @Slf4j
  17. public class SQLServerCDCListener implements ApplicationRunner, Serializable {
  18. /**
  19. * CDC数据源配置
  20. */
  21. @Value("${CDC.DataSource.host}")
  22. private String host;
  23. @Value("${CDC.DataSource.port}")
  24. private String port;
  25. @Value("${CDC.DataSource.database}")
  26. private String database;
  27. @Value("${CDC.DataSource.tableList}")
  28. private String tableList;
  29. @Value("${CDC.DataSource.username}")
  30. private String username;
  31. @Value("${CDC.DataSource.password}")
  32. private String password;
  33. private final DataChangeSink dataChangeSink;
  34. public SQLServerCDCListener(DataChangeSink dataChangeSink) {
  35. this.dataChangeSink = dataChangeSink;
  36. }
  37. @Override
  38. public void run(ApplicationArguments args) throws Exception {
  39. log.info("开始启动Flink CDC获取ERP变更数据......");
  40. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  41. env.setParallelism(1);
  42. DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
  43. DataStream<DataChangeInfo> streamSource = env
  44. .addSource(dataChangeInfoMySqlSource, "SQLServer-source")
  45. .setParallelism(1);
  46. streamSource.addSink(dataChangeSink);
  47. env.execute("SQLServer-stream-cdc");
  48. }
  49. /**
  50. * 构造CDC数据源
  51. */
  52. private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
  53. String[] tables = tableList.replace(" ", "").split(",");
  54. return SqlServerSource.<DataChangeInfo>builder()
  55. .hostname(host)
  56. .port(Integer.parseInt(port))
  57. .database(database) // monitor sqlserver database
  58. .tableList(tables) // monitor products table
  59. .username(username)
  60. .password(password)
  61. /*
  62. *initial初始化快照,即全量导入后增量导入(检测更新数据写入)
  63. * latest:只进行增量导入(不读取历史变化)
  64. */
  65. .startupOptions(StartupOptions.latest())
  66. .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
  67. .build();
  68. }
  69. }

4、反序列化数据,转为变更JSON对象

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
  3. import io.debezium.data.Envelope;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.flink.api.common.typeinfo.TypeInformation;
  6. import org.apache.flink.util.Collector;
  7. import org.apache.kafka.connect.data.Field;
  8. import org.apache.kafka.connect.data.Schema;
  9. import org.apache.kafka.connect.data.Struct;
  10. import org.apache.kafka.connect.source.SourceRecord;
  11. import java.time.Instant;
  12. import java.time.LocalDateTime;
  13. import java.time.ZoneId;
  14. import java.util.List;
  15. import java.util.Optional;
  16. /**
  17. * SQLServer消息读取自定义序列化
  18. **/
  19. @Slf4j
  20. public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {
  21. public static final String TS_MS = "ts_ms";
  22. public static final String BEFORE = "before";
  23. public static final String AFTER = "after";
  24. public static final String SOURCE = "source";
  25. public static final String CREATE = "CREATE";
  26. public static final String UPDATE = "UPDATE";
  27. /**
  28. *
  29. * 反序列化数据,转为变更JSON对象
  30. */
  31. @Override
  32. public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
  33. try {
  34. String topic = sourceRecord.topic();
  35. String[] fields = topic.split("\\.");
  36. String database = fields[1];
  37. String tableName = fields[2];
  38. Struct struct = (Struct) sourceRecord.value();
  39. final Struct source = struct.getStruct(SOURCE);
  40. DataChangeInfo dataChangeInfo = new DataChangeInfo();
  41. dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
  42. dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
  43. // 获取操作类型 CREATE UPDATE DELETE 1新增 2修改 3删除
  44. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  45. String type = operation.toString().toUpperCase();
  46. int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
  47. dataChangeInfo.setEventType(eventType);
  48. dataChangeInfo.setDatabase(database);
  49. dataChangeInfo.setTableName(tableName);
  50. ZoneId zone = ZoneId.systemDefault();
  51. Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
  52. dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
  53. //7.输出数据
  54. collector.collect(dataChangeInfo);
  55. } catch (Exception e) {
  56. log.error("SQLServer消息读取自定义序列化报错:{}", e.getMessage());
  57. e.printStackTrace();
  58. }
  59. }
  60. /**
  61. *
  62. * 从源数据获取出变更之前或之后的数据
  63. */
  64. private JSONObject getJsonObject(Struct value, String fieldElement) {
  65. Struct element = value.getStruct(fieldElement);
  66. JSONObject jsonObject = new JSONObject();
  67. if (element != null) {
  68. Schema afterSchema = element.schema();
  69. List<Field> fieldList = afterSchema.fields();
  70. for (Field field : fieldList) {
  71. Object afterValue = element.get(field);
  72. jsonObject.put(field.name(), afterValue);
  73. }
  74. }
  75. return jsonObject;
  76. }
  77. @Override
  78. public TypeInformation<DataChangeInfo> getProducedType() {
  79. return TypeInformation.of(DataChangeInfo.class);
  80. }
  81. }

5、CDC 数据实体类

  1. import lombok.Data;
  2. import java.io.Serializable;
  3. import java.time.LocalDateTime;
  4. /**
  5. * CDC 数据实体类
  6. */
  7. @Data
  8. public class DataChangeInfo implements Serializable {
  9. /**
  10. * 数据库名
  11. */
  12. private String database;
  13. /**
  14. * 表名
  15. */
  16. private String tableName;
  17. /**
  18. * 变更时间
  19. */
  20. private LocalDateTime changeTime;
  21. /**
  22. * 变更类型 1新增 2修改 3删除
  23. */
  24. private Integer eventType;
  25. /**
  26. * 变更前数据
  27. */
  28. private String beforeData;
  29. /**
  30. * 变更后数据
  31. */
  32. private String afterData;
  33. }

6、自定义ApplicationContextUtil

  1. import org.springframework.beans.BeansException;
  2. import org.springframework.context.ApplicationContext;
  3. import org.springframework.context.ApplicationContextAware;
  4. import org.springframework.stereotype.Component;
  5. import java.io.Serializable;
  6. @Component
  7. public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
  8. /**
  9. * 上下文
  10. */
  11. private static ApplicationContext context;
  12. @Override
  13. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  14. this.context = applicationContext;
  15. }
  16. public static ApplicationContext getApplicationContext() {
  17. return context;
  18. }
  19. public static <T> T getBean(Class<T> beanClass) {
  20. return context.getBean(beanClass);
  21. }
  22. }

7、自定义sink 交由spring管理,处理变更数据

  1. import org.apache.flink.configuration.Configuration;
  2. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  3. import org.springframework.stereotype.Component;
  4. import lombok.extern.slf4j.Slf4j;
  5. /**
  6. * 自定义sink 交由spring管理
  7. * 处理变更数据
  8. **/
  9. @Component
  10. @Slf4j
  11. public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {
  12. private static final long serialVersionUID = -74375380912179188L;
  13. private UserMapper userMapper;
  14. /**
  15. * 在open()方法中动态注入Spring容器的类
  16. * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;
  17. * 但是Flink启动的项目中,默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,
  18. * 只有在Spring所在的线程才能使用@Autowired,故在Flink自定义的Sink的open()方法中初始化Spring容器
  19. */
  20. @Override
  21. public void open(Configuration parameters) throws Exception {
  22. super.open(parameters);
  23. userMapper = ApplicationContextUtil.getBean(UserMapper.class);
  24. }
  25. @Override
  26. public void invoke(DataChangeInfo dataChangeInfo, Context context) {
  27. log.info("收到变更原始数据:{}", dataChangeInfo);
  28. // TODO 开始处理你的数据吧
  29. }

以上是我亲自验证测试的结果,已发布生产环境,如有不足还请指正。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/200265
推荐阅读
相关标签
  

闽ICP备14008679号