赞
踩
前言
rabbitmq本身是没有实现延时队列的,但是我们可以通过死信队列或延时插件来实现延时队列。
如果在队列中的消息没有消费者消费,那么该消息就会成为一个死信。如果这个消息被发送到另外一个exchange的话,那么后面的exchange就是死信队列
死信队列也是一个正常的exchange,也可以通过routingkey绑定到具体的队列上。
在上面我们知道一个消息过期后会成为死信,而死信可以被转发到另外一个exchange也就是死信队列。死信队列也是一个正常的exchange,它可以将消息发送到绑定的队列上,该队列被消费者消费监听。
举个生活中的栗子,我们买东西下单后,一般会要求我们半小时内支付。半小时后,如果没有支付将取消订单,换句话说,就是我们需要在半小时后去检查订单的状态。那么如何用死信队列实现该功能?
流程图如下:
上图中,生产者下单后,发送消息到orderQueue,30分钟后,消息后期,将被发送到dlxQueue中,此时消费者监听到消息,去查看订单的状态,如果订单没有支付,取消订单,回滚库存。
知道了理论后,接下来就是代码实践了,这里采用SpringBoot实现:
1、rabbitmq配置类,声明交换机和队列
@Configuration public class DlxRabbitmqConfig { //====================1.声明正常队列==================== @Bean public DirectExchange orderExchange(){ return new DirectExchange("springboot.orderExchange", true, false); } @Bean public Queue orderQueue(){ Map<String, Object> queueArgs = new HashMap<>(); //10秒中后消息过期 queueArgs.put("x-message-ttl",10000); //过期后,将消息转发给"springboot.OrderDlxExchange"交换机 queueArgs.put("x-dead-letter-exchange","springboot.OrderDlxExchange"); return new Queue("springboot.orderQueue",true,false,false,queueArgs); } @Bean public Binding orderBinding(){ return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.key"); } //====================2.声明死信队列==================== @Bean public DirectExchange OrderDlxExchange(){ return new DirectExchange("springboot.OrderDlxExchange",true,false); } @Bean public Queue orderDlxQueue(){ return new Queue("springboot.orderDlxQueue",true,false,false); } @Bean public Binding dlxBinding(){ return BindingBuilder.bind(orderDlxQueue()).to(OrderDlxExchange()).with("order.key"); } }
2、订单实体类
@Data
@ToString
public class Order implements Serializable {
private String orderNo;
private Date createDt;
private double payMoney;
private String userName;
}
3、发送消息组件
@Component public class MsgSender { @Autowired private RabbitTemplate rabbitTemplate; //利用死信队列实现延时消息的发送 public void sendMsgDlx(Object order , Map<String,Object> msgProp) throws JsonProcessingException { //1. 构建消息头 MessageHeaders messageHeaders = new MessageHeaders(msgProp); //2. 构建消息体 ObjectMapper objectMapper = new ObjectMapper(); String json = objectMapper.writeValueAsString(order); Message message = MessageBuilder.createMessage(json.getBytes(), messageHeaders); //3. 构建correlationData 用于做可靠性投递得,ID:必须为全局唯一的 根据业务规则 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //4. 开启确认模式 rabbitTemplate.setConfirmCallback(new MyConfirmCallBack()); //5. 开启消息可达监听 rabbitTemplate.setReturnCallback(new MyReturnCallback()); //6. 发送消息 rabbitTemplate.convertAndSend("springboot.orderExchange","order.key",message,correlationData); } }
4、测试
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqProductTest { @Autowired private MsgSender msgSender; @Test public void testSendMsgDlx() throws JsonProcessingException { Order order = new Order(); order.setOrderNo(UUID.randomUUID().toString()); order.setUserName("fcp"); order.setPayMoney(10000.00); order.setCreateDt(new Date()); msgSender.sendMsgDlx(order,null); } }
代码已经上传到码云上https://gitee.com/F_promise/rabbitmq
,生产者的代码在springboot-rabbitmq-producer
模块上。另外代码中设计了消息确认机制,这我会在可靠性投递中讲到。
@Component @Slf4j public class MsgReceiver { //监听订单的死信队列 @RabbitListener(queues = {"springboot.orderDlxQueue"}) @RabbitHandler public void consumerOrderMsg(Message message, Channel channel) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); Order order = objectMapper.readValue((byte[]) message.getPayload(), Order.class); log.info("消费消息:{}",order); //手工签收 Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); log.info("接受deliveryTag:{}",deliveryTag); channel.basicAck(deliveryTag, false); } }
代码已上传到码云,在springboot-rabbitmq-consumer
模块上
插件地址:https://www.rabbitmq.com/community-plugins.html
下载与自己版本一致的插件后,解压到指定的目录下:
cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins
启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1、配置类(声明延迟队列)
@Configuration public class DelayRabbitmqConfig { @Bean public CustomExchange OrderDelayExchange(){ HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("orderDelayExchange", "x-delayed-message",true, false,args); } @Bean public Queue OrderDelayQueue(){ return new Queue("orderDelayQueue",true,false,false); } @Bean public Binding orderDelayBinding(){ return BindingBuilder.bind(OrderDelayQueue()).to(OrderDelayExchange()).with("order.key").noargs(); } }
2、发送消息组件
@Component public class MsgSender { @Autowired private RabbitTemplate rabbitTemplate; //利用插件发送延迟消息 public void sendDelayMsg(Object order , Map<String,Object> msgProp) throws JsonProcessingException { //1. 构建消息头 MessageHeaders messageHeaders = new MessageHeaders(msgProp); //2. 设置消息转换器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //3. 构建correlationData 用于做可靠性投递得,ID:必须为全局唯一的 根据业务规则 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //4. 开启确认模式 rabbitTemplate.setConfirmCallback(new MyConfirmCallBack()); //5. 开启消息可达监听 rabbitTemplate.setReturnCallback(new MyReturnCallback()); //6. 发送消息(这里是关键) rabbitTemplate.convertAndSend("orderDelayExchange", "order.key",order, new MessagePostProcessor() { @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { message.getMessageProperties().setDelay(5000); //设置延迟时间 return message; } },correlationData); } }
3、测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqProductTest {
@Test
public void testSendDelayMsg() throws JsonProcessingException {
Order order = new Order();
order.setOrderNo(UUID.randomUUID().toString());
order.setUserName("fcp");
order.setPayMoney(10000.00);
order.setCreateDt(new Date());
msgSender.sendDelayMsg(order,null);
}
}
消费者代码不变
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。