当前位置:   article > 正文

Springboot集成-数据采集工具Flink-CDC_spring 接入flink cdc

spring 接入flink cdc

1. 前言

1.1. 什么是数据采集

大数据采集是指通过各种技术手段,收集和整理大量数据的过程。采集的数据可以来自不同的数据源,包括结构化数据和非结构化数据,如网站数据、社交媒体数据、电子邮件、日志数据等。根据数据源分类,主要有web数据采集、系统日志采集、数据库采集等方向。

1.2. 常用的采集工具

  • Python(爬虫脚本)

利用requests和BeautifulSoup模块爬取网页,解析HTML中非结构化数据,保存到数据库中。

  • Filebeat

监听日志文件变化,输出到Logstash、Kafka等目标,可与Elasticsearch、Kibana等工具集成组成ELK日志监控平台。

  • Canal

模拟Mysql从数据库节点,从主节点读取Binlog并解析,输出到Kafka、RocketMQ,RabbitMQ等消息队列

  • Flink

支持从多数据源获取数据(Mysql、Oracle、MongoDB等),处理数据,利用Sink任务自定义输出数据。

1.3. Flink-CDC

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件或者编码的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。

官网:项目介绍 | Apache Flink CDC

2. 简单使用

集成springboot,以采集Mysql数据,输出到Kafka为例。

2.1. 参考文档

MySQL | Apache Flink CDC

2.2. 测试环境准备

  • Mysql

创建测试数据库并初始化数据

  1. version: '1'
  2. services:
  3. mysql:
  4. image: mysql
  5. container_name: mysql8
  6. environment:
  7. - MYSQL_ROOT_PASSWORD=123456
  8. - TZ=Asia/Shanghai
  9. volumes:
  10. - D:\Docker\Mysql\log:/var/log/mysql
  11. - D:\Docker\Mysql\data:/var/lib/mysql
  12. - D:\Docker\Mysql\conf.d:/etc/mysql/conf.d
  13. ports:
  14. - 3306:3306
  1. -- 创建数据库
  2. CREATE DATABASE `flink-cdc`;
  3. USE `flink-cdc`;
  4. -- 创建 orders 表
  5. CREATE TABLE `orders` (
  6. `id` INT NOT NULL,
  7. `price` DECIMAL(10,2) NOT NULL,
  8. `amount` DECIMAL(10,2) NOT NULL,
  9. PRIMARY KEY (`id`)
  10. );
  11. -- 插入数据
  12. INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (1, 4.00, 3.00);
  13. INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (2, 100.00, 3.00);
  • Kafka

准备好可视化工具并连接上Kafka。

  1. version: "1"
  2. services:
  3. kafka:
  4. image: 'bitnami/kafka:latest'
  5. hostname: kafka
  6. ports:
  7. - 9092:9092
  8. - 9093:9093
  9. volumes:
  10. - 'D:\Docker\Kafka\data:/bitnami/kafka'
  11. networks:
  12. - kafka_net
  13. environment:
  14. # KRaft settings
  15. - KAFKA_CFG_NODE_ID=0
  16. - KAFKA_CFG_PROCESS_ROLES=controller,broker
  17. - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
  18. # Listeners
  19. - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
  20. - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.51:9092
  21. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
  22. - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
  23. - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  24. networks:
  25. kafka_net:
  26. driver: bridge

2.3. 导入相关依赖

  1. <flink.version>1.19.0</flink.version>
  2. <flink-cdc.version>3.1.1</flink-cdc.version>
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-runtime-web</artifactId>
  6. <version>${flink.version}</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-connector-base</artifactId>
  11. <version>${flink.version}</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-table-api-java</artifactId>
  16. <version>${flink.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-table-common</artifactId>
  21. <version>${flink.version}</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.flink</groupId>
  25. <artifactId>flink-clients</artifactId>
  26. <version>${flink.version}</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-java</artifactId>
  31. <version>${flink.version}</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-streaming-java</artifactId>
  36. <version>${flink.version}</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.flink</groupId>
  40. <artifactId>flink-connector-mysql-cdc</artifactId>
  41. <version>${flink-cdc.version}</version>
  42. </dependency>

2.4. 创建Datasource和Sink

用流式执行环境创建Datasource和Sink,并运行springboot项目

  1. package com.zzj.flinkcdcmysqldemo.listener;
  2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  3. import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
  4. import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
  5. import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.configuration.RestOptions;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.springframework.boot.ApplicationArguments;
  11. import org.springframework.boot.ApplicationRunner;
  12. import org.springframework.stereotype.Component;
  13. /**
  14. * 参考
  15. * @link <a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/">...</a>
  16. */
  17. @Component
  18. public class MysqlEventListener implements ApplicationRunner {
  19. private final MysqlDataChangeSink dataChangeSink;
  20. public MysqlEventListener(MysqlDataChangeSink dataChangeSink) {
  21. this.dataChangeSink = dataChangeSink;
  22. }
  23. @Override
  24. public void run(ApplicationArguments args) throws Exception {
  25. //Apache Flink Dashboard 配置
  26. Configuration config = new Configuration();
  27. config.set(RestOptions.PORT, 9090);
  28. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
  29. MySqlSource<String> dataChangeInfoMySqlSource = buildDataChangeSource();
  30. env.fromSource(dataChangeInfoMySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source")
  31. // 设置 source 节点的并行度为 4
  32. .setParallelism(4)
  33. //添加输出任务
  34. .addSink(dataChangeSink)
  35. // 设置 sink 节点并行度为 1
  36. .setParallelism(1);
  37. env.execute("mysql-stream-cdc");
  38. }
  39. private MySqlSource<String> buildDataChangeSource() {
  40. return MySqlSource.<String>builder()
  41. .hostname("127.0.0.1")
  42. .port(3306)
  43. .scanNewlyAddedTableEnabled(true)// 启用扫描新添加的表功能
  44. .username("root")
  45. .password("123456")
  46. .databaseList("pet")//设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
  47. .tableList("pet.foster", "pet.fosterConfig")// 设置捕获的表
  48. /*
  49. * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
  50. * latest:只进行增量导入(不读取历史变化)
  51. * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
  52. */
  53. .startupOptions(StartupOptions.latest())
  54. .deserializer(new JsonDebeziumDeserializationSchema())// 将 SourceRecord 转换为 JSON 字符串
  55. .serverTimeZone("GMT")//指定mysql的时区
  56. .build();
  57. }
  58. }
  1. package com.zzj.flinkcdcmysqldemo.listener;
  2. import cn.hutool.extra.spring.SpringUtil;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. @Slf4j
  10. public class MysqlDataChangeSink extends RichSinkFunction<String> {
  11. private KafkaTemplate<String, String> kafkaTemplate;
  12. @Override
  13. public void invoke(String value, Context context) {
  14. log.info("收到变更原始数据:{}", value);
  15. //转换后发送到对应的MQ
  16. kafkaTemplate.send("flink-cdc-mysql", value);
  17. }
  18. /**
  19. * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,
  20. * 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,
  21. * 故在Flink自定义的Sink的open()方法中初始化Spring容器
  22. */
  23. @Override
  24. public void open(Configuration parameters) throws Exception {
  25. super.open(parameters);
  26. kafkaTemplate = SpringUtil.getBean(KafkaTemplate.class);
  27. }
  28. }

2.5. 测试更变mysql数据

  • 在 MySQL 的 orders 表中插入一条数据
INSERT INTO `flink-cdc`.orders (`id`, `price`, `amount`) VALUES (3, 54.00, 6.00);

可以看到执行完成后,日志打印出刚才flink监听到Binlog并解析后的json数据。

  • 在 MySQL 的 orders 表中更新一条数据
UPDATE `flink-cdc`.orders SET price=100.00, amount=100.00 WHERE id=1;
  • 在 MySQL 的 orders 表中删除一条数据
DELETE FROM `flink-cdc`.orders WHERE id=2;
  • 执行完成后可以看到offset explorer中接受到的记录

2.5.1. 监听数据格式
  1. {
  2. "before": {//sql执行前
  3. "id": 1,
  4. "price": "AZA\u003d",
  5. "amount": "ASw\u003d"
  6. },
  7. "after": {//sql执行后
  8. "id": 1,
  9. "price": "JxA\u003d",
  10. "amount": "JxA\u003d"
  11. },
  12. "source": {
  13. "version": "1.9.8.Final",//debezium版本
  14. "connector": "mysql",
  15. "name": "mysql_binlog_source",
  16. "ts_ms": 1720361222000,
  17. "snapshot": "false",
  18. "db": "flink-cdc",//数据库名
  19. "sequence": null,
  20. "table": "orders",//表名
  21. "server_id": 1,
  22. "gtid": null,
  23. "file": "binlog.000002",
  24. "pos": 7045,
  25. "row": 0,
  26. "thread": 8,
  27. "query": null
  28. },
  29. "op": "u",//更新操作,c插入操作,d删除操作
  30. "ts_ms": 1720361222818,//时间
  31. "transaction": null
  32. }

3. 问题总结

3.1. decimal类型转换成了bytes

后续查阅资料找到设置方式Debezium connector for MySQL :: Debezium Documentation

修改decimal类型处理成string类型

  • 再次修改数据
UPDATE `flink-cdc`.orders SET price=200.00, amount=200.00 WHERE id=1;

查看kafka接收到的数据,已经变成正常数字。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/926228
推荐阅读
相关标签
  

闽ICP备14008679号