赞
踩
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq 整个消息投递的路径为:
producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
将利用这两个 callback 控制消息的可靠性投递
因SpringBoot 整合RabbitMQ 当队列或交换机不存在是,自动创建,所以可靠性检测的一般是服务是否宕机。与消费者是否接收/确认消息无无关
生产端
yaml
spring: rabbitmq: host: 192.168.0.134 port: 5672 username: admin password: admin virtual-host: /admin # 开启publisher-confirm 有以下可选值 # simple:同步等待confirm结果,直到超时 # correlated:异步回调,定义ConfirmCallback。mq返回结果时会回调这个ConfirmCallback # NONE:默认不开启 publisher-confirm-type: correlated # 开启publish-return功能。可以定义ReturnCallback # true:调用ReturnCallback # false:直接丢弃消息 publisher-returns: true
自定义Callback类
/** * 消息推送确认机制配置文件 * @author codinganhour */ @Component public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { @Autowired RabbitTemplate rabbitTemplate; /** * 初始化方法 */ @PostConstruct public void initMethod() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { Integer receivedDelay = null; if(null != correlationData){ correlationData.getReturned().getMessage().getMessageProperties().getReceivedDelay(); } if (receivedDelay != null && receivedDelay > 0) { // 是一个延迟消息,忽略这个错误提示 return; } if (ack) { System.out.println("消息已经送达Exchange,ack已发"); } else { System.out.println("消息没有送达Exchange"); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("消息没有送到队列中"); } }
在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。
RabbitMQ默认的消息确认机制是:自动确认的
队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。
其提供了三种确认方式:
自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。
手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。
根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等)。这种方式比较麻烦。
消费端
spring: rabbitmq: host: 192.168.0.134 port: 5672 username: admin password: admin virtual-host: /admin listener: # 容器类型simple或direct 简单理解为一对一;direct理解为一对多个消费者 simple: # ACK模式(none,auto,manual,默认为auto) acknowledge-mode: manual # 开启重试 retry: # 是否开启重试机制 enabled: true
/** * @author */ @Slf4j @Component public class DirectManualListener { /** * 消息最大重试次数 */ private static final int MAX_RETRIES = 3; /** * 重试间隔(秒) */ private static final long RETRY_INTERVAL = 5; /** * 手动进入死信队列 * RabbitListener中的参数用于表示监听的是哪一个队列 */ @RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE) public void manualListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception { // 重试次数 int retryCount = 0; boolean success = false; // 消费失败并且重试次数<=重试上限次数 while (!success && retryCount < MAX_RETRIES) { retryCount++; // 具体业务逻辑 System.out.println("处理业务逻辑"); // 如果失败则重试 if (!success) { String errorTip = "第" + retryCount + "次消费失败" + ((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列"); log.error(errorTip); Thread.sleep(RETRY_INTERVAL * 1000); } } if (success) { // 消费成功,确认 channel.basicAck(deliveryTag, false); log.info("创建订单数据消费成功"); } else { // requeue:false 手动拒绝,进入抛弃或进入死信队列 channel.basicNack(deliveryTag, false, false); log.info("创建订单数据消费失败"); } } }
spring: rabbitmq: host: 192.168.0.134 port: 5672 username: admin password: admin virtual-host: /admin listener: simple: # ACK模式(none,auto,manual,默认为auto) acknowledge-mode: auto # 开启重试 retry: # 是否开启重试机制 enabled: true # 最大重试次数,默认3 max-attempts: 5 # 重试间隔(ms) 默认1秒 initial-interval: 500 # 重试因子,默认是1。本次推送时间间隔 = 上一次间隔时间 * multiplier multiplier: 2 # 最大间隔时间(ms),默认10秒 maxInterval: 20000
@Slf4j @Component public class DirectAutoListener { /** * auto手动抛出异常方式进入死信队列,yaml中max-attempts,initial-interval生效 * RabbitListener中的参数用于表示监听的是哪一个队列 */ @RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE) public void autoListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception { log.info("消息信息:"+message+";消息deliveryTag="+deliveryTag); Thread.sleep(1000); if(deliveryTag != 8){ throw new RuntimeException("操作异常"); }else{ log.info("消息Ack deliveryTag="+deliveryTag); channel.basicAck(deliveryTag, false); } } }
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
场景:
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功7天后,发送短信问候。
实现方式:
定时器
缺点:触发时,会扫描数据库,难以精确定位触发时间,数据量大时数据库承受压力过大;
延迟队列(TTL+死信队列组合实现延迟队列的效果)
精确触发,触发时只查询单一数据即可
延迟队列
/** * 延迟队列 * @author */ @Slf4j @Configuration public class DirectTtlConfig { /** * direct路由模式-交换机 */ public static final String DIRECT_EXCHANGE = "direct_ttl_exchange"; /** * direct路由模式-队列 */ public static final String DIRECT_QUEUE = "direct_ttl_queue"; /** * direct路由模式-路由键 */ public static final String DIRECT_ROUTING = "direct.ttl.routing"; /** * direct路由模式-死信交换机 */ public static final String DIRECT_DLX_EXCHANGE = "direct_ttl_dlx_exchange"; /** * direct路由模式-死信队列 */ public static final String DIRECT_DLX_QUEUE = "direct_ttl_dlx_queue"; /** * direct路由模式-路由键 */ public static final String DIRECT_DLX_ROUTING = "direct.ttl.dlx.routing"; /** * 1、声明交换机 * direct路由模式,默认持久化,非自动删除 * @return */ @Bean(DIRECT_EXCHANGE) public Exchange directTtlExchange(){ return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).build(); } /** * 2、声明队列 * direct路由模式 * @return */ @Bean(DIRECT_QUEUE) public Queue directTtlQueue(){ // ttl:延迟队列时间,超时为消费则进入死信队列中 // deadLetterExchange:绑定死信交换机 // deadLetterRoutingKey:绑定死信路由 return QueueBuilder.durable(DIRECT_QUEUE).ttl(1000).deadLetterExchange(DIRECT_DLX_EXCHANGE).deadLetterRoutingKey(DIRECT_DLX_ROUTING).build(); } /** * 3、队列与交换机进行绑定 * direct路由模式 * @param queue @Qualifier 将 value 对应的bean 注入到参数中 * @param exchange @Qualifier 将 value 对应的bean 注入到参数中 * @return */ @Bean public Binding directTtlQueueExchange(@Qualifier(DIRECT_QUEUE) Queue queue, @Qualifier(DIRECT_EXCHANGE) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING).noargs(); } /** * 1、声明死信交换机 * direct路由模式,默认持久化,非自动删除 * @return */ @Bean(DIRECT_DLX_EXCHANGE) public Exchange directDlxExchange(){ return ExchangeBuilder.directExchange(DIRECT_DLX_EXCHANGE).build(); } /** * 2、声明死信队列 * direct路由模式 * @return */ @Bean(DIRECT_DLX_QUEUE) public Queue directDlxQueue(){ return QueueBuilder.durable(DIRECT_DLX_QUEUE).build(); } /** * 3、死信队列与死信交换机进行绑定 * direct路由模式 * @param queue @Qualifier 将 value 对应的bean 注入到参数中 * @param exchange @Qualifier 将 value 对应的bean 注入到参数中 * @return */ @Bean public Binding directDlxQueueExchange(@Qualifier(DIRECT_DLX_QUEUE) Queue queue, @Qualifier(DIRECT_DLX_EXCHANGE) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DIRECT_DLX_ROUTING).noargs(); } }
消费者只需要监听死信队列中消息即可
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
死信的三种情况:
队列消息长度到达限制;
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(手动ack(auto,manual)都可以触发)
原队列存在消息过期设置,消息到达超时时间未被消费;
死信队列与延期队列实现方式一致,只是会监听2个消费者,正常队列采用ack(auto,manual)触发是否进入死信队列
QueueBuilder.durable(DIRECT_QUEUE).maxLength():队列中等待消费的数量大于maxLength的数量就会进入死信队列
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
处理方式
传递消息唯一值记录数据库中或者redis中,消费时判断,防止重复消费
更新数据库时可以采用乐观锁方式,关键字段值发生变化则不消费
解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。