当前位置:   article > 正文

RabbitMQ通过死信队列延迟发送消息_通过rabbitmq死信队列延时发布信息

通过rabbitmq死信队列延时发布信息

RabbitMQ通过创建死信队列延迟发送消息

demo:

1、声明一个延时5分钟死信队列(按自己项目提供的进行替换即可)
BAJI_GENERIC_DELAY_5M_QUEUE_DEAD("jihuanji_risk","baji_generic_delay_5m_queue_dead","300000","通用延时5分钟死信队列")
  • 1
	交换机:jihuanji_risk
	routing_key:baji_generic_delay_5m_queue_dead
	延时时间:300000(单位:毫秒)
	描述:通用延时5分钟死信队列
  • 1
  • 2
  • 3
  • 4
2、声明一个通用延时队列
YINNIDAI_GENERIC_DELAY_QUEUE("jihuanji_risk","baji_generic_delay_queue","0","通用延时队列"),
  • 1
	交换机:jihuanji_risk
	routing_key:baji_generic_delay_queue
	延时时间:0(单位:毫秒)
	描述:通用延时队列
  • 1
  • 2
  • 3
  • 4
3、将以上两个队列进行绑定
//交换机
    @Value("${exchange.baji.risk}")
    private String exchangeBajiRisk;

创建普通队列:

		 /*创建普通队列,并绑定交换机*/
			String BAJI_GENERIC_DELAY_QUEUE = QueueSendEnum.BAJI_GENERIC_DELAY_QUEUE.getRoutingKey();
			
			@Bean(name = "queueGenericDelayQueue")
			public Queue queueGenericDelayQueue() {
				return new Queue(BAJI_GENERIC_DELAY_QUEUE);
			}
			@Bean
			Binding bindingQueueGenericDelayQueue(@Qualifier("queueGenericDelayQueue") Queue queue, DirectExchange exchange) {
				return BindingBuilder.bind(queue).to(exchange).with(BAJI_GENERIC_DELAY_QUEUE);
			}
			
将普通交换机绑定到死信队列:
			
		/*创建延时5分钟的死信队列,并绑定消息过期后的交换机*/
			String BAJI_GENERIC_DELAY_5M_QUEUE_DEAD = QueueSendEnum.BAJI_GENERIC_DELAY_5M_QUEUE_DEAD.getRoutingKey();
			@Bean(name = "queueGenericDelay5MQueueDead")
			public Queue queueGenericDelay5MQueueDead() {
				Map<String, Object> map = new HashMap<>(2);
				map.put("x-dead-letter-exchange", exchangeBajiRisk);
				map.put("x-dead-letter-routing-key", BAJI_GENERIC_DELAY_QUEUE);
				return new Queue(BAJI_GENERIC_DELAY_5M_QUEUE_DEAD,true,false,false, map);
			}
			@Bean
			Binding bindingQueueGenericDelay5MQueueDead(@Qualifier("queueGenericDelay5MQueueDead") Queue queue, DirectExchange exchange) {
				return BindingBuilder.bind(queue).to(exchange).with(BAJI_GENERIC_DELAY_5M_QUEUE_DEAD);
			}		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
注:绑定名字要对应上

通过RabbitMQ发送消息

新增发送消息接口
		public void sendMQDelay5M(String message) {
				sendMQ(QueueSendEnum.BAJI_GENERIC_DELAY_5M_QUEUE_DEAD, message, true);
			}	
  • 1
  • 2
  • 3
发送消息接口业务
/**
		 * 推送RabbitMQ消息队列
		 *
		 * @param queueSendEnum 推送消息类型枚举
		 * @param message       推送消息
		 * @param delayFlag     若是延迟队列,添加消息过期时间(也可在死信队列里设置,专属死信队列需自己创建)
		 */
		public synchronized void sendMQ(QueueSendEnum queueSendEnum, String message, boolean delayFlag) {
			String traceId = UUIDUtil.getUUID();
			String exchange = queueSendEnum.getExchange();
			String routingKey = queueSendEnum.getRoutingKey();
			logger.info("RabbitMqProducer.sendMQ-begin-traceId:{}, exchange:{}, routingKey:{}, message:{}", traceId, exchange, routingKey, message);
			if (StringUtils.isBlank(exchange) || StringUtils.isBlank(routingKey) || StringUtils.isBlank(message)) {
				logger.error("RabbitMqProducer.sendMQ-end-parameter is required-traceId:{}, exchange:{}, routingKey:{}, message:{}", traceId, exchange, routingKey, message);
				return;
			}
			try {
				if (delayFlag) {
					logger.info("RabbitMqProducer.sendMQ-setExpiration:{} millis, traceId:{}", queueSendEnum.getExpiration(), traceId);
					rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
						@Override
						public Message postProcessMessage(Message message) throws AmqpException {
							// 设置消息过期时间
							message.getMessageProperties().setExpiration(queueSendEnum.getExpiration());
							return message;
						}
					});
				} else {
					rabbitTemplate.convertAndSend(exchange, routingKey, message);
				}
				logger.info("RabbitMqProducer.sendMQ-end-success-traceId:{}, exchange:{}, routingKey:{}", traceId, exchange, routingKey);
			} catch (AmqpException e) {
				logger.error("RabbitMqProducer.sendMQ-end-error-traceId:{}, exchange:{}, routingKey:{}, message:{}", traceId, exchange, routingKey, message, e);
			}
		}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

监听消费RabbitMQ发送的消息

	@RabbitHandler
    @RabbitListener(queues = "baji_generic_delay_queue")
    public void leakageRecallListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    //message为接收的消息,对消息进行处理即可
    }
  • 1
  • 2
  • 3
  • 4
  • 5

原理:

先将消息放入死信队列中,消息过期后会把消息放入通用延时队列中,监听消费延时队列的消息即可。场景可用于定时发送push。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/509798
推荐阅读
相关标签
  

闽ICP备14008679号