当前位置:   article > 正文

【RabbitMQ教程】消息堆积与消息可靠性保证_rabbitmq队列如何处理消息堆积

rabbitmq队列如何处理消息堆积

目录

避免消息堆积?

消息可靠性保证

生产端-消息持久化

解决场景

未解决场景

生产端-消息可靠性投递

解决场景

confirm模式

return模式

两种模式全局配置代码示例

消费端-消费者ACK机制

消费端三种确认方式

代码示例

消息可靠性总结


避免消息堆积?

1) 采用workqueue,多个消费者监听同一队列。

2)接收到消息以后,而是通过线程池,异步消费。

消息可靠性保证

如何保证消息的可靠性呢?也就是说如何保证消息不丢失呢?

生产端-消息持久化

解决场景

如果在消费者消费之前,MQ就宕机了,消息就没了?可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化。

交换机持久化:

队列持久化:

消息持久化:

未解决场景

注意,虽然我们标记了消息是需要持久化的,但RabbitMQ接收到消息-持久化到磁盘仍然需要一定时间,这就意味着消息可能在缓存,依然有丢失的可能。不过对于简单的任务队列这也够用了,如果还需要更强的保证消息不丢失,则需要使用"发布者确认”pubLisher、confirms。

消息持久化保证了消息正常投递后,消息不丢失。但是如果消息没有正常投递呢

生产端-消息可靠性投递

解决场景

生产者消息确认机制来监视消息有没有安全的投递。

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return退回模式

rabbitmq整个消息投递的路径为:

producer--->rabbitmq broker--->exchange--->queue--->consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递。

confirm模式

(1)application.properties:

  1. # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
  2. spring.rabbitmq.publisher-confirms=true

(2)生产者:

  1. package com.baiqi.test;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.test.context.ContextConfiguration;
  9. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  10. /**
  11. * @author CMS
  12. */
  13. @RunWith(SpringJUnit4ClassRunner.class)
  14. public class ProducerTest {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. //测试 Confirm 模式
  18. @Test
  19. public void testConfirm() {
  20. //定义回调
  21. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  22. /**
  23. *
  24. * @param correlationData 相关配置信息
  25. * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
  26. * @param cause 失败原因
  27. */
  28. @Override
  29. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  30. //System.out.println("confirm方法被执行了...."+correlationData.getId());
  31. //ack 为 true表示 消息已经到达交换机
  32. if (ack) {
  33. //接收成功
  34. System.out.println("接收成功消息" + cause);
  35. } else {
  36. //接收失败
  37. System.out.println("接收失败消息" + cause);
  38. //做一些处理,让消息再次发送。
  39. }
  40. }
  41. });
  42. //进行消息发送
  43. for (int i = 0; i < 5; i++) {
  44. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
  45. }
  46. //进行睡眠操作(原因:发送消息之后有个回调,回调需要时间,所以此处阻塞1s)
  47. try {
  48. Thread.sleep(1000);
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }

(3)运行程序及结果

1、正常运行

2、模拟confirm

return模式

(1)application.properties:

  1. # 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
  2. spring.rabbitmq.publisher-returns=true

(2)生产者

  1. //测试 return模式
  2. @Test
  3. public void testReturn() {
  4. //设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
  5. rabbitTemplate.setMandatory(true);
  6. //定义回调
  7. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  8. /**
  9. *
  10. * @param message 消息对象
  11. * @param replyCode 错误码
  12. * @param replyText 错误信息
  13. * @param exchange 交换机
  14. * @param routingKey 路由键
  15. */
  16. @Override
  17. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  18. System.out.println("return 执行了....");
  19. System.out.println("message:"+message);
  20. System.out.println("replyCode:"+replyCode);
  21. System.out.println("replyText:"+replyText);
  22. System.out.println("exchange:"+exchange);
  23. System.out.println("routingKey:"+routingKey);
  24. //处理
  25. }
  26. });
  27. //进行消息发送
  28. rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
  29. //进行睡眠操作
  30. try {
  31. Thread.sleep(5000);
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. }

 (3)运行程序及结果

1、程序正常运行

2、模拟return

3、注释掉rabbitTemplate.setMandatory(true)

两种模式全局配置代码示例

application.properties:

  1. # 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
  2. spring.rabbitmq.publisher-confirms=true
  3. # 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
  4. spring.rabbitmq.publisher-returns=true

confirm(config配置):

这种写法的好处:rabbitTemplate作为整个项目的一个‘单例对象’,只能绑定一个confirm和return回调。如果项目中多处用到了rabbitTemplate,那么为了保证消息可靠性,我们只需要在全局声明一次rabbitTemplate和confirm、return的绑定就好。

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. @Component
  8. public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. /**
  12. * * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
  13. * * 设置消息确认回调方法
  14. * * 设置消息回退回调方法
  15. *
  16. */
  17. @PostConstruct
  18. public void initRabbitTemplate() {
  19. //设置消息确认回调方法
  20. rabbitTemplate.setConfirmCallback(this::confirm);
  21. rabbitTemplate.setReturnCallback(this::returnedMessage);
  22. }
  23. /**
  24. * 投递到交换机,不论投递成功还是失败都回调次方法
  25. * @param correlationData 投递相关数据
  26. * @param ack 是否投递到交换机
  27. * @param cause 投递失败原因
  28. *
  29. */
  30. @Override
  31. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  32. if (ack) {
  33. System.out.println("消息进入交换机成功");
  34. } else {
  35. System.out.println("消息进入交换机失败, 失败原因:" + cause);
  36. }
  37. }
  38. /**
  39. * 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方法
  40. * @param message 投递消息内容
  41. * @param replyCode 返回错误状态码
  42. * @param replyText 返回错误内容
  43. * @param exchange 交换机名称
  44. * @param routingKey 路由键
  45. *
  46. */
  47. @Override
  48. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  49. System.out.println("交换机路由至消息队列出错:>>>>>>>");
  50. System.out.println("交换机:" + exchange);
  51. System.out.println("路由键:" + routingKey);
  52. System.out.println("错误状态码:" + replyCode);
  53. System.out.println("错误原因:" + replyText);
  54. System.out.println("发送消息内容:" + message.toString());
  55. System.out.println("<<<<<<<<");
  56. }
  57. }

消费端-消费者ACK机制

消费者的手动ACK机制。可以防止消费者丢失消息。

如果是自动ACK,那么当消费者接收到消息就会立马自动签收,如果这时消费者还没有来得及消费就宕机或者消费出现异常都会导致消息丢失;

消费端三种确认方式

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual" -- 推荐
  • 根据异常情况确认:acknowledge="auto" -- 不怎么使用

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()或者basicReject()方法,让其自动重新发送消息

代码示例

application.properties:

  1. # 配置开启手动签收
  2. # 简单模式的开启手动签收
  3. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  4. # 路由模式开启手动签收
  5. spring.rabbitmq.listener.direct.acknowledge-mode=manual
  6. # 是否支持重试
  7. spring.rabbitmq.listener.direct.retry.enabled=true

消费者:

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageProperties;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @RabbitListener(queues = "routing_queue1")
  9. public class RoutingListener01 {
  10. @RabbitHandler
  11. public void simpleHandler(String msg, Message message, Channel channel) throws IOException {
  12. System.out.println("routing_queue1: " + msg);
  13. //获取投递标签
  14. MessageProperties messageProperties = message.getMessageProperties();
  15. long deliveryTag = messageProperties.getDeliveryTag();
  16. try {
  17. // 模拟异常
  18. // if (msg.contains("苹果")) {
  19. // throw new RuntimeException("不允许卖苹果手机!!!");
  20. //}
  21. /**
  22. * 手动签收消息
  23. * 参数1:消息投递标签
  24. * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
  25. */
  26. channel.basicAck(deliveryTag, false);
  27. System.out.println("手动签收完成:{}");
  28. } catch (Exception ex) {
  29. /**
  30. * 手动拒绝签收
  31. * 参数1:当前消息的投递标签
  32. * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
  33. * 参数3:是否重回队列,true为重回队列,false为不重回
  34. */
  35. channel.basicNack(deliveryTag, false, true);
  36. System.out.println("拒绝签收,重回队列:{}" + ex);
  37. }
  38. }
  39. }

常见错误-Channel shutdown: channel error

CachingConnectionFactory.java:1517) o.s.a.r.c.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80) 

原因:

当采用‘springboot’方式开发rabbitmq的时候,计划使用‘收到ack’;然而,代码中的确进行了ack(调用了channel.basicAck),但是yaml配置中没有声明:spring.rabbitmq.listener.simple.acknowledge-mode=manual。

分析:

根本原因是我们调用了两次channel.basicAck:一次是我,一次是springboot默认的自动提交;

如何把springboot的默认自动提交忽略掉:yaml配置中声明:spring.rabbitmq.listener.simple.acknowledge-mode=manual。(springboot底层有个判断,如果发现配置了manual,则不会再次basicAck)。

消息可靠性总结

1、持久化

  • exchange要持久化
  • queue要持久化
  • message要持久化

2、生产方确认Confirm

3、消费方确认Ack

4、Broker高可用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/546322
推荐阅读
相关标签
  

闽ICP备14008679号