赞
踩
什么是TTL
什么是RabbitMQ的死信队列
什么是RabbitMQ的死信交换机
消息有哪几种情况成为死信
消费者拒收消息(basic.reject/basic.nack),并且没有重新入队requeue=false
消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time to live)
队列的消息长度达到极限
结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
RabbitMQ管控台消息TTL测试
队列过期时间使用参数,对整个队列消息统一过期
消息过期时间使用参数(如果队列头部消息未过期,队列中间消息已经过期,该消息还在队列里面)
两者都配置的话,时间短的先触发
RabbitMQ Web控制台测试
新建死信交换机(和普通交换机没区别)
新建死信队列(和普通队列没区别)
死信交换机和死信队列绑定
新建普通队列,设置过期时间、指定死信交换机
测试:直接在Web控制台往普通队列发送消息即可
什么是延迟队列
业界的一些实现延迟方式
交换机和队列注册代码
package com.gen.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { /** * 死信交换机 */ public static final String DEAD_EXCHANGE = "dead_exchange"; /** * 死信队列 */ public static final String DEAD_QUEUE = "dead_queue"; /** * 死信路由键 */ public static final String DEAD_ROUTING_KEY = "dead_routing_key"; /** * 死信交换机 * * @return */ @Bean public Exchange deadExchange() { return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build(); } /** * 死信队列 * * @return */ @Bean public Queue deadQueue() { return QueueBuilder.durable(DEAD_QUEUE).build(); } /** * 死信交换机与死信队列进行绑定 * * @param deadQueue * @param deadExchange * @return */ @Bean public Binding deadBinding(Queue deadQueue, Exchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } /** * 普通交换机 */ public static final String ORDER_EXCHANGE = "order_exchange"; /** * 普通队列 */ public static final String ORDER_QUEUE = "order_queue"; /** * 普通路由键 */ public static final String ORDER_ROUTING_KEY = "order_routing_key"; /** * 普通交换机 * * @return */ @Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build(); } /** * 普通队列 * * @return */ @Bean public Queue orderQueue() { Map<String, Object> args = new HashMap<>(3); // 过期时间,单位毫秒 args.put("x-message-ttl", 10000); // 消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 消息过期后,进入到死信交换机的路由键 args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY); return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build(); } /** * 普通交换机与普通队列进行绑定 * * @param orderQueue * @param orderExchange * @return */ @Bean public Binding orderBinding(Queue orderQueue, Exchange orderExchange) { return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_ROUTING_KEY).noargs(); } }
消息生产者
package com.gen; import com.gen.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class GenRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void send() { this.rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, RabbitMQConfig.ORDER_ROUTING_KEY, "测试延迟队列,设置10s"); } }
消息消费者(只监听消费死信队列,不监听消费普通队列)
package com.gen.listener; import com.gen.config.RabbitMQConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) public class DeadMQListener { @RabbitHandler public void deadConsumer(String msg, Message message, Channel channel) throws IOException { System.out.println(msg); // 成功确认,消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。