赞
踩
接下来使用 SpringBoot 来集成 RabbitMQ
环境配置:
JDK 1.8
IDEA 2019.1
SpringBoot 2.5.1
RabbitMQ 3.7.8
Erlang 21.1
工程结构图:
第一步》、新建一个 SpringBoot 工程,添加 POM 依赖:
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步》、添加 application.yml 文件配置
server:
port: 8080
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
上面两步执行完后,就得创建生产者和消费者了。
那么是先创建生产者,还是消费者呢?
如果先创建好生产者,然后向队列发送消息,如果该队列提前不存在的话,消息会发送失败;如果先创建消费者,让消费者提前创建好队列后,监听队列,再创建生产者,向队列中发送消息。
第三步》、创建消费者
在消费端声明队列
@Component
@RabbitListener(queuesToDeclare = @Queue("simple_queue")) //如果simple_queue队列不存在,则创建simple_queue队列。默认队列是持久化,非独占式的
public class SimpleConsumer {
// //消费者如果监听到消息队列有消息传入,则会自动消费
@RabbitHandler
public void receive(String message) {
System.out.println("消费者接收到的消息是:" + message);
}
}
@RabbitListener 注解的属性的作用:
@RabbitListener(queues = {"simple_queue2"})
,如果队列 simple_queue2 不存在,那么启动消费者就会报错第四步》、创建生产者
这里使用单元测试的方式创建,简单、方便
@SpringBootTest
@RunWith(SpringRunner.class)
public class AppTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMsg() {
String queueName = "simple_queue";
String message = "Hello RabbitMQ SimpleQueue";
rabbitTemplate.convertAndSend(queueName, message);
}
}
第五步》、启动单元测试方法
执行 sendMsg()
方法,它会启动整个 SpringBoot 项目,将消费者 SimpleConsumer 注入 Spring 容器中,监听队列 simple_queue
(如果没有此队列,则会创建它)。然后,生产者就会发送消息,被消费者消费。
入门案例是将生产者放入了一个单元测试方法中,一启动单元测试方法,就会发送消息,然后立马被消费者消息了。这样,在 RabbitMQ 中看不到消息。
接下来将生产者放入一个单独的工程中,将它和消费者进行分离。
生产者工程图:
第一步》、新建一个 SpringBoot 工程,添加 POM 依赖
同上
第二步》、添加 application.yml 配置
同上
【注意】:如果两个工程同时启动,则需要修改 server.port
号,这个值不能重复
第三步》、创建生产者
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 0; i < 5; i++) {
String msg = "简单模式下的消息 " + i;
rabbitTemplate.convertAndSend(RabbitMqConstant.SIMPLE_QUEUE_NAME, msg);
}
}
}
此生产者发送了 5 条消息
常量类:
public interface RabbitMqConstant {
// 队列名称
String SIMPLE_QUEUE_NAME = "simple_queue";
}
接口调用:
@RestController
@RequestMapping("/simple")
public class SimpleController {
@Autowired
private SimpleProducer simpleProducer;
@RequestMapping("/sendMsg")
public String sendMsg() {
simpleProducer.sendMsg();
return "SIMPLE-QUEUE";
}
}
调用此接口后,会向 MQ 发送 5 条消息,在 MQ 中查看如下:
启动消费者后,会消费消息
注意:消费者和生产者一定要消费相同的队列
当我们在 MQ 中删除这个 simple_queue 队列后,再次通过生产者发送消息,发现:虽然队列已不存在,但往这个队列中发送消息时也没有报错。
这就有一个问题:当你发送一条消息时,不管这个队列是否存在,这条消息都不会报错,那我们就会默认它发送成功,如果,这个队列不存在,那么就会存在误判的问题了。
所以,为了避免这种情况,我们在启动生产者时,就去声明一个队列:如果它不存在就创建;存在,就不进行处理。
那么,又该如何做呢?
很简单,在生产者端中添加一个配置类即可:
@Configuration
public class SimpleQueueConfig {
// 创建队列
@Bean
public Queue simpleQueue() {
return new Queue(RabbitMqConstant.SIMPLE_QUEUE_NAME);
}
}
启动生产者端,调用接口,如果 simple_queue 队列不存在,那么就会创建了。
注解 @RabbitListener 既可以标记在类上,也可以标记在方法上:
看看生产者发送消息的代码:
public void sendMsg() {
for (int i = 0; i < 5; i++) {
String msg = "简单模式下的消息 " + i;
rabbitTemplate.convertAndSend(RabbitMqConstant.SIMPLE_QUEUE_NAME, msg);
}
}
往此队列中发送的是 String 类型的数据
看看消费者接收消息的代码:
@RabbitHandler
public void receive(String message) {
System.out.println("消费者接收到的消息是:" + message);
}
方法的入参也是 String 类型的。它们类型相匹配,所以,可以消费消息。
现在,我们修改下生产者发送消息的数据类型:
public void sendMsg() {
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < 5; i++) {
String msg = "简单模式下的消息 " + i;
map.put(String.valueOf(i), msg);
rabbitTemplate.convertAndSend(RabbitMqConstant.SIMPLE_QUEUE_NAME, map);
}
}
从上面可以看出,我们往队列中发送消息的数据类型为 HashMap。
然后,我们使用消费者接收消息,发现会报错:
数据类型不匹配。
然后,我们再往消费者中添加如下代码:
@RabbitHandler
public void receive(Map<String, Object> message) {
System.out.println("消费者接收到的消息是:" + message);
}
这样,就消费消息成功了。
生产者代码不变,消费者代码修改如下:
@Component
public class SimpleConsumer2 {
@RabbitListener(queues = {"simple_queue"})
public void receive(String message) {
System.out.println("消费者接收到的消息是:" + message);
}
}
只添加 @RabbitListener
注解。注意:simple_queue
队列必须存在。
修改 application.yml 配置:
server: port: 8080 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual #手动应答 concurrency: 1 # 最小消费者数量 max-concurrency: 1 # 最多消费者数量 retry: enabled: true # 是否支持重试
消费者代码修改如下:
@Component
public class SimpleConsumer2 {
@RabbitListener(queues = {"simple_queue"})
public void receive(Message message, Channel channel) throws Exception{
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println("我是消费信息:"+new String(message.getBody()));
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。