赞
踩
大数据采集是指通过各种技术手段,收集和整理大量数据的过程。采集的数据可以来自不同的数据源,包括结构化数据和非结构化数据,如网站数据、社交媒体数据、电子邮件、日志数据等。根据数据源分类,主要有web数据采集、系统日志采集、数据库采集等方向。
利用requests和BeautifulSoup模块爬取网页,解析HTML中非结构化数据,保存到数据库中。
监听日志文件变化,输出到Logstash、Kafka等目标,可与Elasticsearch、Kibana等工具集成组成ELK日志监控平台。
模拟Mysql从数据库节点,从主节点读取Binlog并解析,输出到Kafka、RocketMQ,RabbitMQ等消息队列
支持从多数据源获取数据(Mysql、Oracle、MongoDB等),处理数据,利用Sink任务自定义输出数据。
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件或者编码的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。
集成springboot,以采集Mysql数据,输出到Kafka为例。
创建测试数据库并初始化数据
- version: '1'
- services:
- mysql:
- image: mysql
- container_name: mysql8
- environment:
- - MYSQL_ROOT_PASSWORD=123456
- - TZ=Asia/Shanghai
- volumes:
- - D:\Docker\Mysql\log:/var/log/mysql
- - D:\Docker\Mysql\data:/var/lib/mysql
- - D:\Docker\Mysql\conf.d:/etc/mysql/conf.d
- ports:
- - 3306:3306
- -- 创建数据库
- CREATE DATABASE `flink-cdc`;
-
- USE `flink-cdc`;
-
- -- 创建 orders 表
- CREATE TABLE `orders` (
- `id` INT NOT NULL,
- `price` DECIMAL(10,2) NOT NULL,
- `amount` DECIMAL(10,2) NOT NULL,
- PRIMARY KEY (`id`)
- );
-
- -- 插入数据
- INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (1, 4.00, 3.00);
- INSERT INTO `orders` (`id`, `price`, `amount`) VALUES (2, 100.00, 3.00);
准备好可视化工具并连接上Kafka。
- version: "1"
- services:
- kafka:
- image: 'bitnami/kafka:latest'
- hostname: kafka
- ports:
- - 9092:9092
- - 9093:9093
- volumes:
- - 'D:\Docker\Kafka\data:/bitnami/kafka'
- networks:
- - kafka_net
- environment:
- # KRaft settings
- - KAFKA_CFG_NODE_ID=0
- - KAFKA_CFG_PROCESS_ROLES=controller,broker
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- # Listeners
- - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.51:9092
- - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- networks:
- kafka_net:
- driver: bridge
- <flink.version>1.19.0</flink.version>
- <flink-cdc.version>3.1.1</flink-cdc.version>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime-web</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>${flink-cdc.version}</version>
- </dependency>
用流式执行环境创建Datasource和Sink,并运行springboot项目
- package com.zzj.flinkcdcmysqldemo.listener;
-
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
- import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
- import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.configuration.RestOptions;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.stereotype.Component;
-
- /**
- * 参考
- * @link <a href="https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/">...</a>
- */
- @Component
- public class MysqlEventListener implements ApplicationRunner {
-
- private final MysqlDataChangeSink dataChangeSink;
-
- public MysqlEventListener(MysqlDataChangeSink dataChangeSink) {
- this.dataChangeSink = dataChangeSink;
- }
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- //Apache Flink Dashboard 配置
- Configuration config = new Configuration();
- config.set(RestOptions.PORT, 9090);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
- MySqlSource<String> dataChangeInfoMySqlSource = buildDataChangeSource();
- env.fromSource(dataChangeInfoMySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source")
- // 设置 source 节点的并行度为 4
- .setParallelism(4)
- //添加输出任务
- .addSink(dataChangeSink)
- // 设置 sink 节点并行度为 1
- .setParallelism(1);
- env.execute("mysql-stream-cdc");
- }
-
- private MySqlSource<String> buildDataChangeSource() {
- return MySqlSource.<String>builder()
- .hostname("127.0.0.1")
- .port(3306)
- .scanNewlyAddedTableEnabled(true)// 启用扫描新添加的表功能
- .username("root")
- .password("123456")
- .databaseList("pet")//设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*".
- .tableList("pet.foster", "pet.fosterConfig")// 设置捕获的表
- /*
- * initial初始化快照,即全量导入后增量导入(检测更新数据写入)
- * latest:只进行增量导入(不读取历史变化)
- * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
- */
- .startupOptions(StartupOptions.latest())
- .deserializer(new JsonDebeziumDeserializationSchema())// 将 SourceRecord 转换为 JSON 字符串
- .serverTimeZone("GMT")//指定mysql的时区
- .build();
- }
- }
- package com.zzj.flinkcdcmysqldemo.listener;
-
- import cn.hutool.extra.spring.SpringUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
-
- @Component
- @Slf4j
- public class MysqlDataChangeSink extends RichSinkFunction<String> {
-
- private KafkaTemplate<String, String> kafkaTemplate;
-
- @Override
- public void invoke(String value, Context context) {
- log.info("收到变更原始数据:{}", value);
- //转换后发送到对应的MQ
- kafkaTemplate.send("flink-cdc-mysql", value);
- }
-
- /**
- * 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,
- * 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,
- * 故在Flink自定义的Sink的open()方法中初始化Spring容器
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- kafkaTemplate = SpringUtil.getBean(KafkaTemplate.class);
- }
- }
INSERT INTO `flink-cdc`.orders (`id`, `price`, `amount`) VALUES (3, 54.00, 6.00);
可以看到执行完成后,日志打印出刚才flink监听到Binlog并解析后的json数据。
UPDATE `flink-cdc`.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM `flink-cdc`.orders WHERE id=2;
- {
- "before": {//sql执行前
- "id": 1,
- "price": "AZA\u003d",
- "amount": "ASw\u003d"
- },
- "after": {//sql执行后
- "id": 1,
- "price": "JxA\u003d",
- "amount": "JxA\u003d"
- },
- "source": {
- "version": "1.9.8.Final",//debezium版本
- "connector": "mysql",
- "name": "mysql_binlog_source",
- "ts_ms": 1720361222000,
- "snapshot": "false",
- "db": "flink-cdc",//数据库名
- "sequence": null,
- "table": "orders",//表名
- "server_id": 1,
- "gtid": null,
- "file": "binlog.000002",
- "pos": 7045,
- "row": 0,
- "thread": 8,
- "query": null
- },
- "op": "u",//更新操作,c插入操作,d删除操作
- "ts_ms": 1720361222818,//时间
- "transaction": null
- }
后续查阅资料找到设置方式Debezium connector for MySQL :: Debezium Documentation
修改decimal类型处理成string类型
UPDATE `flink-cdc`.orders SET price=200.00, amount=200.00 WHERE id=1;
查看kafka接收到的数据,已经变成正常数字。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。