赞
踩
目录
如果你没有Kafka,可以参考这篇文章进行安装【Docker】手把手教你使用Docker搭建kafka【详细教程】_docker 安装kafka-CSDN博客
Kafka是一个开源的流式平台,用于构建实时数据流应用程序和实时数据管道。Kafka旨在处理大规模的数据流,具有高吞吐量、可扩展性、持久性和容错性的特点。
发布-订阅模型: Kafka采用发布-订阅模型,数据生产者将消息发布到一个或多个主题(topics),而数据消费者则订阅这些主题以接收消息。
分布式架构: Kafka是一个分布式系统,允许横向扩展,通过分布式存储和分区机制来实现高吞吐量和可扩展性。
持久性存储: Kafka使用持久性存储来保留消息,可以在消息发送后保留一定的时间,确保消费者可以在需要时检索历史消息。
数据分区: 主题被划分为多个分区,每个分区可以在不同的服务器上,以实现并行处理和提高性能。
流式处理: Kafka提供了流处理功能,允许应用程序实时处理和分析数据流,执行复杂的事件处理操作。
高可用性: Kafka在集群中的多个节点之间复制数据,提高了系统的容错性和可用性。
数据保证: Kafka提供了不同级别的数据传递保证,包括至多一次、至少一次和精确一次语义。
生态系统: Kafka生态系统丰富,包括连接器(Connectors)、Kafka Streams、MirrorMaker等组件,用于与各种外部系统集成和实现各种应用场景。
Producer(生产者): 负责向Kafka主题发布消息。
Broker(代理): Kafka集群中的服务器,负责存储和管理消息。
Consumer(消费者): 订阅并处理Kafka主题中的消息。
Topic(主题): 消息的类别或标签,生产者将消息发布到主题,而消费者从主题订阅消息。
Partition(分区): 主题可以划分为多个分区,每个分区独立存储和处理消息。
消息队列:
场景: 在电子商务平台上,订单服务产生订单消息,并将其发布到Kafka主题。支付服务、物流服务等通过订阅相应主题,异步处理订单信息,实现订单处理的解耦和异步通信。
实时数据流处理:
场景: 在在线广告平台上,使用Kafka Streams处理实时产生的广告点击数据。可以实时计算点击率、过滤无效点击、将数据与用户信息连接,以实现实时广告效果分析。
日志收集与分析:
场景: 在一个大规模的云服务中,使用Kafka收集分布在不同服务器上的应用程序日志。日志分析服务通过消费Kafka主题,实时分析日志以监控系统性能、检测异常和进行故障排除。
事件溯源(Event Sourcing):
场景: 在金融领域的交易系统中,使用Kafka追踪交易事件。每笔交易引发一个事件,将其发布到Kafka主题,以便在需要时进行审计、回溯和重新处理。
数据同步:
场景: 在企业的分布式系统中,使用Kafka同步用户信息。用户服务在用户数据变更时将事件发布到Kafka主题,其他服务通过消费主题以保持用户数据同步。
消息广播:
场景: 在社交媒体应用中,使用Kafka将用户发布的状态更新广播给其关注者。关注者通过订阅用户状态的Kafka主题,实现实时消息广播。
分布式应用解耦:
场景: 在电子商务微服务架构中,购物车服务、订单服务、支付服务等通过Kafka进行异步通信。例如,购物车服务可以通过Kafka发布购物车更新的事件,订单服务通过订阅事件来处理相关订单逻辑。
大数据集成:
场景: 在一个大数据处理流水线中,使用Kafka将产生的数据传输到Spark进行实时分析。生产者将数据发布到Kafka主题,而Spark应用程序通过订阅主题来接收实时数据。
实时推荐系统:
场景: 在在线视频平台上,使用Kafka收集用户观看记录。推荐引擎通过消费Kafka主题,实时更新用户的个性化推荐列表,提高用户体验。
异步通信:
场景: 在电商平台中,使用Kafka实现异步订单处理。当订单支付成功时,订单服务通过Kafka发布订单处理完成的消息,而邮件服务通过订阅该主题来异步发送订单确认邮件。
下面就使用SpringBoot整合kafka的发布订阅机制,实现消息的发布和订阅。
确保在你的pom.xml文件中包含了Spring Boot和Spring Kafka的依赖。
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <!-- Spring Kafka Starter -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
在application.properties或application.yml中配置 Kafka 连接信息。
- spring:
- kafka:
- bootstrap-servers: your-kafka-server:9092
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.bootstrap-servers=localhost:9092
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
-
- @Service
- public class KafkaMessageProducer {
-
- private static final String TOPIC = "admin-messages";
-
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- public void sendAdminMessage(String message) {
- kafkaTemplate.send(TOPIC, message);
- }
- }
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Service;
-
- @Service
- public class KafkaMessageConsumer {
-
- @KafkaListener(topics = "admin-messages")
- public void receiveAdminMessage(String message) {
- System.out.println("Received message: " + message);
- // ...
- }
- }
在管理员需要发布消息的地方调用KafkaMessageProducer的 sendAdminMessage 方法。
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
-
- @RestController
- @RequestMapping("/message")
- public class AdminController {
-
- @Autowired
- private KafkaMessageProducer kafkaMessageProducer;
-
- @GetMapping("/publish")
- public void publishAdminMessage(@RequestParam("message") String message) {
- kafkaMessageProducer.sendAdminMessage(message);
- }
- }
当调用 publishAdminMessage方法时,所有监听 admin-messages 主题的用户将会接收到相应的消息。
控制台输出结果:
这样就使用SpringBoot整合了Kafka并写了一个简单的案例。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。