赞
踩
视频地址:
官方文档:
Spring Cloud Stream(SCS) 是一个用于构建消息驱动微服务的框架,它基于 Spring Boot,提供了一种简化的方式来处理消息和事件的传递。它旨在为不同消息代理(如 Kafka、RabbitMQ、Apache Kafka 等)提供统一的编程模型,使开发者能够更轻松地在微服务架构中使用消息通信。
以下是 Spring Cloud Stream 的一些关键概念和特性:
总体而言,Spring Cloud Stream 简化了在微服务架构中使用消息传递的复杂性,提供了一种与消息代理集成的高级抽象,让开发者能够更专注于业务逻辑的实现。它的灵活性使得您能够轻松地在不同的消息代理之间切换,同时提供了强大的工具来处理消息和事件的传递,从而使您的微服务系统更具可扩展性和弹性。
消息中间件的切换只需要 更换依赖 即可。
Kafka 是一个开源的分布式流数据平台,最初由 LinkedIn 开发并捐赠给 Apache 软件基金会。它被设计用于处理高吞吐量、可持久化的实时数据流。Kafka 的主要目标是提供一种高效、可扩展、持久化的消息传递系统,能够处理大规模的实时数据流,同时保证数据的可靠性和可用性。
以下是 Kafka 的一些关键特性和概念:
总体而言,Kafka 是一个强大的分布式消息流平台,适用于许多实时数据处理和消息传递的应用场景。它的可靠性、高性能和可扩展性使得它成为构建大规模实时数据处理系统的重要组件之一。
Spring Cloud Stream 是一个用于构建基于 Spring Boot 的消息驱动微服务的框架,它提供了统一的编程模型和抽象来处理消息流,而 Kafka 是 Spring Cloud Stream 支持的消息中间件之一。下面我们来讲一下它们之间的联系和区别:
联系:
消息驱动架构:Spring Cloud Stream 和 Kafka 都支持消息驱动架构,通过将消息作为信息传递的核心来构建应用程序。它们都支持发布-订阅模式,允许不同的微服务之间通过消息进行通信。
微服务和云原生:Spring Cloud Stream 是 Spring Cloud 生态系统中的一部分,专注于帮助开发人员构建云原生的微服务应用程序。Kafka 作为 Spring Cloud Stream 的一种消息中间件实现,与 Spring Cloud Stream 一起可以支持在微服务架构中使用消息传递来解耦微服务之间的通信。
可插拔的消息中间件:Spring Cloud Stream 提供了一个抽象层,使得在不同的消息中间件之间进行切换变得容易。它支持多种消息中间件,包括 Kafka、RabbitMQ 等,使开发人员可以根据实际需求选择合适的消息中间件。
区别:
定位和用途:Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它提供了统一的编程模型和抽象来简化消息传递。而 Kafka 是一个分布式流数据平台,专注于处理高吞吐量、可持久化的实时数据流。
功能广度:Kafka 是一个功能丰富的消息中间件,除了消息传递外,还提供了分区、副本、持久化、高吞吐量等特性。而 Spring Cloud Stream 更加专注于在微服务架构中实现消息传递。
编程模型:Spring Cloud Stream 提供了更加抽象的编程模型,通过 Binder 将应用程序与消息中间件解耦。开发人员只需关注业务逻辑,而不必过多关注底层的消息传递细节。Kafka 则需要更多的配置和代码来实现消息的生产和消费。
生态系统:Spring Cloud Stream 作为 Spring Cloud 生态系统的一部分,可以与其他 Spring Cloud 组件无缝集成,如服务发现、负载均衡等。Kafka 作为独立的消息中间件,可以在不同的技术栈中使用。
总的来说,Spring Cloud Stream 和 Kafka 都是用于处理消息的技术,但它们的定位和功能略有不同。Spring Cloud Stream 提供了更加抽象和便捷的方式来构建消息驱动的微服务,而 Kafka 提供了更丰富的特性来处理实时数据流。在使用时,开发人员可以根据项目需求和技术栈的选择来决定是否使用 Spring Cloud Stream 以及选择哪种消息中间件。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>stream-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR3</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
server: port: 7888 spring: # application: # name: producer cloud: stream: kafka: binder: brokers: ip:9092 #Kafka的消息中间件服务器 zk-nodes: ip:2181 #Zookeeper的节点,如果集群,后面加,号分隔 auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。 bindings: output: #这里用stream给我们提供的默认output,后面会讲到自定义output destination: topic #消息发往的目的地 content-type: text/plain #消息发送的格式,接收端不用指定格式,但是发送端要 input: destination: topic
source - output
@EnableBinding(Source.class)
public class SendService {
@Resource
private Source source;
public void sendMsg(String msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
}
public void sendBody(Object object) {
source.output().send(MessageBuilder.withPayload(object).build());
}
}
sink - input
@EnableBinding(Sink.class)
public class RecieveService {
@StreamListener(Sink.INPUT)
public void recieve(Object payload){
System.out.println(payload);
}
}
@RestController @RequestMapping("/send") public class StreamController { @Resource private SendService sendService; /** * 发送 * localhost:7888/send/hello * * @param msg 消息 */ @GetMapping("/{msg}") public void send(@PathVariable("msg") String msg){ sendService.sendMsg(msg); } @GetMapping("/body") public void sendFace() { // 创建一个对象 Q_FACE_INPUT qFaceInput = new Q_FACE_INPUT(); // set 写入数据... sendService.sendBody(qFaceInput); System.out.printf("发送 Face Data 成功"); } }
1、创建 MyChannel 接口
/** * 自定义通道 - 模仿source接口造轮子 * */ public interface MyChannel { String FACE_OUTPUT = "face_output"; String HUMAN_OUTPUT = "human_output"; String VEHICLE_INPUT = "vehicle_output"; @Output(FACE_OUTPUT) MessageChannel faceOutput(); @Output(HUMAN_OUTPUT) MessageChannel humanOutput(); @Output(VEHICLE_INPUT) MessageChannel vehicleOutput(); }
注意事项:
@Component
注解。@Component
注解用于将类标识为 Spring 容器管理的组件,而不是接口。实际上,将 @Component
注解放在接口上可能会引发问题。踩坑小记:
有时候在指定类中使用 @Resource
注入的时候会报错 – 创建不了这个接口 bean,但是使用 Spring 自带的 @Autowired
注解就会没问题。(可能是因为是两个注解默认注入 bean 的方式不一样引起的)
Autowired
默认的注入方式为 byType
(根据类型进行匹配),@Resource
默认注入方式为 byName
(根据名称进行匹配)详细可参考:autowired-和-resource-的区别是什么
2、修改 application.yml 文件
server: port: 7888 spring: profiles: active: dev # 表示开发环境 cloud: stream: kafka: binder: brokers: ip:9092 #Kafka的消息中间件服务器 auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。 bindings: # 自定义output - 生产 face_output: destination: Q_FACE_INPUT_TWO_DX #消息发往的目的地 human_output: destination: Q_HUMAN_INPUT_TWO_DX vehicle_output: destination: Q_VEHICLE_INPUT_TWO_DX
3、编写生产者业务类
@EnableBinding(value = {MyChannel.class}) public class SendService { @Resource private MyChannel myChannel; public void sendFaceMsg(String msg) { myChannel.faceOutput().send(MessageBuilder.withPayload(msg).build()); } public void sendHumanMsg(String msg) { myChannel.humanOutput().send(MessageBuilder.withPayload(msg).build()); } public void sendVehicleMsg(String msg) { myChannel.vehicleOutput().send(MessageBuilder.withPayload(msg).build()); } }
4、编写 Controller 控制器
@RestController @RequestMapping("/send") public class StreamController { @Resource private SendService sendService; /** * 发送人脸数据 * localhost:7888/send/face/hello * * @param msg 消息 */ @GetMapping("/face/{msg}") public void sendFace(@PathVariable("msg") String msg){ sendService.sendFaceMsg(msg); } /** * 发送人体数据 * * @param msg */ @GetMapping("/human/{msg}") public void sendHuman(@PathVariable("msg") String msg){ sendService.sendHumanMsg(msg); } /** * 发送车辆数据标准 * * @param msg */ @GetMapping("/veh/{msg}") public void sendVehicle(@PathVariable("msg") String msg){ sendService.sendVehicleMsg(msg); } }
1、创建一个订阅通道注解
@Component public interface MySubscribableChannel { String INPUT1 = "input1"; String INPUT2 = "input2"; String INPUT3 = "input3"; @Input(INPUT1) SubscribableChannel input1(); @Input(INPUT2) SubscribableChannel input2(); }
2、application.yml 文件配置对应的主题信息
spring: profiles: active: dev # 表示开发环境 cloud: stream: kafka: binder: brokers: ip:9092 #Kafka的消息中间件服务器 auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。 bindings: # 自定义output - 生产 output1: destination: topic1 #消息发往的目的地 output2: destination: topic2 # input订阅消息 input1: destination: topic1 input2: destination: topic2
3、监听消息
@StreamListener(INPUT1)
public void subChannel (String msg){
System.out.println(msg);
}
@StreamListener(INPUT2)
public void subChannel2 (String msg){
System.out.println(msg);
}
其他写法参考
注意配置文件中 input-in-0
的写法
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。