赞
踩
本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤:
演示SpringCloud Stream 整合RabbitMQ,项目可以在一个工程里完成,本次建立了一个工程mq-service,其中包含三个Module:
注: 完全可以在一个工程里实现,这里为了区分,并为了后续单独启动或停止生产者或消费者做实验,也为了适应实际应用项目,所以创建了不同Module
这里作为公共模块引入SpringCloud、Spring Cloud Stream等,其中也再此引入fastjson、lombok等工具依赖
(完整代码见文章最下面)
其中Spring Cloud Stream如下:
<!-- Spring Cloud Stream, 用于MQ消息发送-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
定义共用的变量,如CollectionRequest.java
导入base的依赖即可,因为相关共用依赖在base中已经引入
<dependency>
<groupId>com.zrk</groupId>
<artifactId>mq-service-base</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
OutputMessageBinding.java
public interface OutputMessageBinding {
/** Topic 名称*/
String OUTPUT = "message-center-out";
@Output(OUTPUT)
MessageChannel output();
}
# rabbitmq连接信息
spring.rabbitmq.addresses=192.168.1.125
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.message-center-out.destination=message-center
spring.cloud.stream.rabbit.bindings.message-center-out.consumer.exchangeType=fanout
CollectionServiceImpl.java
@Service
@EnableBinding(OutputMessageBinding.class)
public class CollectionServiceImpl implements CollectionService{
@Resource
private OutputMessageBinding outputMessageBinding;
/**
* @param schoolName
* @param content
*/
@Override
public void getCollection(String schoolName, String content) {
CollectionRequest request = new CollectionRequest();
request.setSchoolName(schoolName);
request.setContent(content);
outputMessageBinding.output().send(MessageBuilder.withPayload(request).build());
}
}
注: 主要是两点
导入base的依赖即可,因为相关共用依赖在base中已经引入
<dependency>
<groupId>com.zrk</groupId>
<artifactId>mq-service-base</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
InputMessageBinding.java
public interface InputMessageBinding {
String INPUT = "message-center-input";
@Input(INPUT)
SubscribableChannel input();
}
注: 消费者这里与生产者不同,用的是SubscribableChannel ,而生产者用的是MessageChannel
# rabbitmq连接信息
spring.rabbitmq.addresses=192.168.1.125
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.message-center-input.destination=message-center
spring.cloud.stream.bindings.message-center-input.group=${spring.application.name}
CollectionReceiver.java
@Slf4j
@EnableBinding(InputMessageBinding.class)
public class CollectionReceiver {
@StreamListener(InputMessageBinding.INPUT)
public void handle(String value){
log.info("[消息] 接收到发送消息MQ: {}", value);
CollectionRequest request = JSON.parseObject(value, CollectionRequest.class);
log.info("处理收集信息:" + request.toString());
}
}
注: 主要是两点
至此,生产者与消费者都创建完成,分别启动两个项目,并调用生产者接口进行验证:
localhost:30110/collection/getCollectionschoolName=‘zrk’&content=‘send message to rabbitmq’
则证明已经整合成功,接下来将研究一下更多的配置与用法。
如果有需要,可以参考项目完整代码:https://github.com/zrk333/mq-service
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。