赞
踩
一种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息
需求场景:
一 第三方支付平台的支付连接都是有时效性,创建订单后,需要再一定的时间内支付完成
二 消息提醒
没有被及时消费的消息存放的队列
Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机
** * @ClassName RabbitMQConfig * <p> * 发送 关单消息-》延迟exchange-》order.close.delay.queue-》死信exchange-》order.close.queue * @Description 自定义消息队列配置 * @Author CabbageDevil * @Version 1.0 **/ @Configuration @Data public class RabbitMQConfig { /** * 交换机 */ private String orderEventExchange = "order.event.exchange"; /** * 延迟队列,不能被消费者监听 */ private String orderCloseDelayQueue = "order.close.delay.queue"; /** * 关单队列,延迟队列消息过期后转发的队列,用于被消费者监听 */ private String orderCloseQueue = "order.close.queue"; /** * 进入到延迟队列的routingKey */ private String orderCloseDelayRoutingKey = "order.close.delay.routing.key"; /** * 进入到延迟队列的routingKey */ private String orderCloseRoutingKey = "order.close.delay.key"; /** * 过期时间,毫秒为单位,临时1分钟 */ private Integer ttl = 1000 * 60; /** * 消息转换器 * * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 创建交换机topic类型,一般一个业务一个交换机 * * @return */ @Bean public Exchange orderEventExchange() { return new TopicExchange(orderEventExchange, true, false); } /** * 延迟队列 * * @return */ @Bean public Queue orderCloseDelayQueue() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", orderEventExchange); args.put("x-dead-letter-routing-key", orderCloseRoutingKey); args.put("x-message-ttl", ttl); return new Queue(orderCloseDelayQueue, true, false, false, args); } /** * 死信队列,是一个普通队列,用于被监听 * * @return */ @Bean public Queue orderCloseQueue() { return new Queue(orderCloseQueue, true, false, false); } /** * 第一个队列,延迟队列和交换机建立绑定关系 * @return */ @Bean public Binding orderCloseDelayBinding(){ return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null); } /** * 私信队列和死信交换机建立绑定关系 * @return */ @Bean public Binding orderCloseBinding(){ return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null); } }
/** * @ClassName ProductOrderMQListener * @Description 订单关闭监听器 * @Author CabbageDevil * @Version 1.0 **/ @Component @Slf4j @RabbitListener(queuesToDeclare = {@Queue("order.close.queue")}) public class ProductOrderMQListener { @Autowired private ProductService productService; @RabbitHandler public void productOrderHandler(EventMessage eventMessage, Message message, Channel channel){ log.info("监听到消息ProductOrderMQListener message消息内容:{}",message); try{ // 业务逻辑 todo }catch (Exception e){ log.error("消费失败:{}",eventMessage); throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION); } log.info("消费成功:{}",eventMessage); } }
@Configuration @Data public class RabbitMQErrorConfig { private String orderErrorExchange = "order.error.exchange"; private String orderErrorQueue = "order.error.queue"; private String orderErrorRoutingKey = "order.error.routing.key"; @Autowired private RabbitTemplate rabbitTemplate; /** * 异常交换机 * @return */ @Bean public TopicExchange errorTopicExchange(){ return new TopicExchange(orderErrorExchange,true,false); } /** * 异常队列 * @return */ @Bean public Queue errorQueue(){ return new Queue(orderErrorQueue,true); } /** * 队列与交换机进行绑定 * @return */ @Bean public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange){ return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey); } /** * 配置 RepublishMessageRecoverer * 用途:消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警 * * 顶层是 MessageRecoverer接口,多个实现类 * * @return */ @Bean public MessageRecoverer messageRecoverer(){ return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。