当前位置:   article > 正文

Spring Boot集成RabbitMQ实现消息队列生产者与消费者_java springboot 配置rabbitmq消费者

java springboot 配置rabbitmq消费者

目录

一、步骤概览

二、步骤说明

1.引入依赖

2.设置配置参数

①.RabbitMQ 服务器配置

②.消息模板(template)配置

③.消费端监听器(listener)配置

3.定义业务队列

①.定义交换机

②.定义队列

③.绑定队列和交换机

④.设置死信队列

4.生产端发送消息

①.处理流程

②.创建订单代码示例

③.异步通知发送结果

5.消费端消费消息

三、代码测试

1.测试代码

2.测试结果

①.生产端生成的100条消息

②.消费端消费日志


一、步骤概览

二、步骤说明

1.引入依赖

在 pom.xml 文件中引入 spring-boot-starter-amqp 依赖包

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

2.设置配置参数

在 application.yml 配置文件中设置 rabbitmq 配置参数。

  1. spring:
  2. #配置rabbitMq 服务器
  3. rabbitmq:
  4. host: 127.0.0.1 //你的RabbitMQ服务器地址
  5. port: 5672
  6. username: guest //用户名
  7. password: guest //密码
  8. virtual-host: /
  9. template:
  10. retry:
  11. enabled: true
  12. max-attempts: 5
  13. publisher-returns: true
  14. publisher-confirm-type: correlated
  15. ## 消费端配置
  16. listener:
  17. simple:
  18. concurrency: 5
  19. ## manual:手动 ack(确认)
  20. acknowledge-mode: manual
  21. max-concurrency: 10
  22. prefetch: 1

这段 RabbitMQ 配置信息主要包含以下三部分:

①.RabbitMQ 服务器配置

  • host:RabbitMQ 服务器的地址,这里设置为 127.0.0.1。

  • port:RabbitMQ 服务器的端口,这里设置为 5672。

  • username 和 password:连接 RabbitMQ 服务器的用户名和密码,这里设置为默认值 guest。

  • virtual-host:虚拟主机的名称,用于逻辑隔离不同应用之间的消息队列,这里设置为根目录 /。

②.消息模板(template)配置

  • retry.enabled:是否启用消息发送重试功能,这里设置为 true。

  • retry.max-attempts:最大重试次数,这里设置为 5。

  • publisher-returns:是否启用生产者消息发送失败返回功能,这里设置为 true。

  • publisher-confirm-type:生产者消息确认方式,这里设置为 correlated。

③.消费端监听器(listener)配置

  • simple.concurrency:消费者的并发消费数量,这里设置为 5。

  • acknowledge-mode:消息确认模式,这里设置为手动 ack(确认)模式。

  • max-concurrency:最大的并发消费数量,这里设置为 10。

  • prefetch:每个消费者预取的消息数量,这里设置为 1。

3.定义业务队列

我们就以平时常见的商品购买为例,订单下完了,需要通知发货,我们就可以使用消息队列对其进行解耦。这边我们就定义订单队列。代码概览如下图所示

①.定义交换机

  • RabbitQueueConfig#orderTopicExchange

  1. @Bean
  2. public TopicExchange orderTopicExchange() {
  3. return new TopicExchange(RabbitConstants.ORDER_EXCHANGE,true,false);
  4. }

②.定义队列

  • RabbitQueueConfig#orderQueue

  1. @Bean
  2. public Queue orderQueue() {
  3. //创建队列构造器并指定队列名称
  4. QueueBuilder queueBuilder = QueueBuilder.durable(RabbitConstants.ORDER_QUEUE);
  5. //如果队列持久化,这边不用设置队列过期时间
  6. //queueBuilder.ttl(orderQueueTTL)
  7. //设置死信队列的RouteKey
  8. queueBuilder.deadLetterRoutingKey(RabbitConstants.DEAD_ROUTE_KEY);
  9. //设置死信队列的Exchange
  10. queueBuilder.deadLetterExchange(RabbitConstants.DEAD_EXCHANGE);
  11. //创建队列
  12. return queueBuilder.build();
  13. }

③.绑定队列和交换机

  • RabbitQueueConfig#orderBinding

  1. @Bean
  2. public Binding orderBinding() {
  3. return BindingBuilder.bind(orderQueue())
  4. .to(orderTopicExchange())
  5. .with(RabbitConstants.ORDER_ROUTE_KEY);
  6. }

④.设置死信队列

设置死信队列的目的是当消费者在消费消息时发生错误或者消息被拒绝,并且不重新入队(requeue=false)时,这些消息会被认为是消费失败的消息,并进入死信队列。 具体来说,消费失败的情况包括:

  • 消费者抛出异常或者处理消息时发生错误。

  • 消费者手动拒绝消息(basic.reject)并将 requeue 参数设置为 false。

  • 消费者手动确认消息(basic.ack)之前,连接关闭导致消息未确认。

  1. // 死信队列
  2. @Bean
  3. public Queue deadQueue() {
  4. return new Queue(RabbitConstants.DEAD_QUEUE, true);
  5. }
  6. // 死信队列使用的交换机
  7. @Bean
  8. public TopicExchange deadExchange() {
  9. return new TopicExchange(RabbitConstants.DEAD_EXCHANGE);
  10. }
  11. // 绑定交换机和死信队列
  12. @Bean
  13. public Binding deadBinding() {
  14. return BindingBuilder.bind(deadQueue())
  15. .to(deadExchange())
  16. .with(RabbitConstants.DEAD_ROUTE_KEY);
  17. }

4.生产端发送消息

①.处理流程

②.创建订单代码示例

  • OrderServiceImpl#createOrder

  1. @Transactional(rollbackFor = Exception.class, isolation = Isolation.DEFAULT)
  2. public void createOrder(Order order) {
  3. Date orderTime = new Date();
  4. int addrow = orderMapper.insert(order);
  5. if (addrow <= 0) {
  6. throw new RuntimeException("订单入库失败");
  7. }
  8. log.info("订单入库成功");
  9. // 插入消息记录表数据
  10. BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
  11. // 消息唯一ID
  12. brokerMessageLog.setMessageId(order.getMessageId());
  13. // 保存消息整体 转为JSON格式存储入库
  14. brokerMessageLog.setMessage(JSONUtil.toJsonStr(order));
  15. // 设置消息状态为0 表示发送中
  16. brokerMessageLog.setStatus(BrokerMsgStatusEnum.SENDING.getCode());
  17. // 设置消息未确认超时时间窗口为 一分钟
  18. brokerMessageLog.setNextRetry(DateUtil.offset(orderTime, DateField.MINUTE,RabbitConstants.ORDER_ACK_TIMEOUT));
  19. brokerMessageLog.setCreateTime(new Date());
  20. brokerMessageLog.setUpdateTime(new Date());
  21. addrow = messageLogMapper.insert(brokerMessageLog);
  22. if (addrow <= 0) {
  23. throw new RuntimeException("订单生成失败");
  24. }
  25. log.info("rabbitMq消息日志入库成功");
  26. // 发送消息
  27. CorrelationData correlationData = new CorrelationData(order.getMessageId());
  28. rabbitTemplate.convertAndSend(RabbitConstants.ORDER_EXCHANGE,
  29. RabbitConstants.ORDER_ROUTE_KEY, JSONUtil.toJsonStr(order), correlationData);
  30. }

③.异步通知发送结果

想要获取异步通知发送结果,订单接口需要实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 接口,分别重写 confirm 和 returnedMessage 方法,他们分别的作用是:

  • RabbitTemplate.ConfirmCallback主要用于处理生产者发送消息后的确认结果。当消息成功发送到 RabbitMQ 服务器时,会调用 ConfirmCallback 接口中的 confirm 方法,并传入该消息的相关信息(包括消息 ID、交换机、路由键等)。

  1. /**
  2. * confirm机制只保证消息到达exchange,不保证消息路由到正确quene
  3. * 如果exchange错误,就会触发confirm机制
  4. *
  5. * @param correlationData 消息关联的数据
  6. * @param ack 消息是否正确到达exchange,true-到达;false-未到达
  7. * @param s 失败原因
  8. */
  9. @Override
  10. public void confirm(CorrelationData correlationData, boolean ack, String s) {
  11. log.info("correlationData:[{}],ack:[{}],cause:[{}]", correlationData, ack, s);
  12. String messageId = correlationData.getId();
  13. if (ack) {
  14. //如果confirm返回成功 则进行更新消息投递日志状态
  15. BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
  16. brokerMessageLog.setMessageId(messageId);
  17. brokerMessageLog.setUpdateTime(new Date());
  18. brokerMessageLog.setStatus(BrokerMsgStatusEnum.SUCCESS.getCode());
  19. messageLogMapper.updateById(brokerMessageLog);
  20. log.info("消息confirm成功,更新DB消息投递记录状态");
  21. } else {
  22. log.error("消息confirm失败,可尝试重试 或者补偿等手段");
  23. }
  24. }
  • RabbitTemplate.ReturnsCallback:用于处理生产者发送的消息无法被正确路由到队列时的返回结果。当消息无法被正确路由到队列时,RabbitMQ 会将该消息返回给生产者,并调用 ReturnsCallback 接口中的 returnedMessage 方法,并传入该消息的相关信息(包括消息 ID、交换机、路由键等)。通过实现 ReturnsCallback 接口,我们可以对无法路由的消息进行处理,例如记录日志、重新发送到其他队列等。

  1. /**
  2. * Return 消息机制用于处理一个不可路由的消息,触发条件有以下两种情况:
  3. * 1. exchange不存在导致消息不可达
  4. * 2. routingKey路由不到导致消息不可达
  5. *
  6. * @param returnedMessage 返回信息
  7. */
  8. @Override
  9. public void returnedMessage(ReturnedMessage returnedMessage) {
  10. log.error("路由信息不可达,message:[{}]", returnedMessage);
  11. }

5.消费端消费消息

消费端只需要在接口上添加 @RabbitListener(queues = "order_queue_test") 就可以接收到队列消息了,其中 order_queue_test 表示消费的队列名称

  • OrderReceiverService

  1. @Slf4j
  2. @Service
  3. public class OrderReceiverService {
  4. @RabbitListener(queues = "order_queue_test")
  5. public void onOrderMessage(@Payload String body,
  6. @Headers Map<String,Object> headers,
  7. Channel channel) throws IOException {
  8. log.info("接受到订单信息:[{}]",body);
  9. /**
  10. * Delivery Tag 用来标识信道中投递的消息。
  11. * RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
  12. * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
  13. * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
  14. */
  15. Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  16. /**
  17. * multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
  18. * 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
  19. */
  20. boolean multiple = false;
  21. //手动ACK,确认一条消息已经被消费
  22. channel.basicAck(deliveryTag,multiple);
  23. }
  24. }

三、代码测试

1.测试代码

  • 模拟生成100条订单

  1. @SpringBootTest
  2. public class ApiTest {
  3. @Autowired
  4. private IOrderService orderService;
  5. @Test
  6. public void test_createOrder() throws Exception {
  7. for(int i= 1;i<=100;i++) {
  8. Order order = new Order();
  9. order.setMessageId(UUID.randomUUID().toString());
  10. order.setName("第" + i + "条订单");
  11. orderService.createOrder(order);
  12. TimeUnit.SECONDS.sleep(1);
  13. }
  14. }
  15. }

2.测试结果

①.生产端生成的100条消息

②.消费端消费日志

从消费日志中我们可以看出,最多出现了6个消费线程进行消费,那是由于我们配置了最大消费线程数是10,rabbitmq 会动态调整消费者的数量进行消费,当消息消费完,它会恢复初始的消费者数量,我们定义的是5,mq 情况如图所示:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/618646
推荐阅读
相关标签
  

闽ICP备14008679号