赞
踩
这个很难查看消息堆积的情况,因为他把要发送的延时消息存在本地的分布式mnesia
数据库中,其次过期时间为最大int
值,超过这个值(大概49
天)得代码判定重复过期设置。
要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
我的mq是docker安装的3.9.7的
下载完之后把插件copy
到mq
的plugin
目录下,然后启用插件。之后重启容器,我这里是docker-compose
安装的
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker-compose restart
在Type
里面查看是否有x-delayed-message
选项,如果存在就代表插件安装成功。
yaml配置mq,然后在mq管理页面创建虚拟host:fchan
spring:
rabbitmq:
host: 110.40.181.73
port: 35672
username: root
password: 10086
virtual-host: /fchan
配置延时队列和延时交换机的绑定
package com.fchan.mq.mqDelay; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MqDelayConfig { //最后经过死信队列转发后实际消费的交换机 private static final String EXCHANGE_NAME = "delayed_exchange"; //最后经过死信队列转发后实际消费的队列 private static final String QUEUE_NAME = "delayed_queue"; //最后经过死信队列转发后实际消费的路由key private static final String ROUTE_KEY = "delayed_key"; /** * 交换机 */ @Bean CustomExchange exchange() { //通过x-delayed-type参数设置fanout /direct / topic / header 类型 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args); } /** * 队列 */ @Bean public Queue queue() { return new Queue(QUEUE_NAME,true,false,false); } /** * 将队列绑定到交换机 */ @Bean public Binding binding(CustomExchange exchange, Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with(ROUTE_KEY) .noargs(); } }
消息生产者
package com.fchan.mq.mqDelay; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MyRabbitSender { Logger log = LoggerFactory.getLogger(MyRabbitSender.class); private static final String ROUTE_KEY = "delayed_key"; private static final String EXCHANGE_NAME = "delayed_exchange"; @Autowired private RabbitTemplate rabbitTemplate; /** * @param msg 消息 * @param delay 延时时间,秒 */ public void send2(String msg, int delay) { log.info("RabbitSender.send() msg = {}", msg); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //消息持久化 message.getMessageProperties().setDelay(delay * 1000); // 单位为毫秒 return message; }); } }
消息消费者
package com.fchan.mq.mqDelay; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MyRabbitConsume { Logger log = LoggerFactory.getLogger(MyRabbitConsume.class); @RabbitListener(queues = "delayed_queue") public void infoConsumption(String data) throws Exception { log.info("收到信息:{}",data); log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ"); } }
参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。