赞
踩
延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。
ex:
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。
TTL,全称Time To Live,消息过期时间设置。若没有队列进行消息消费,此消息过期后将被丢弃。
但RabbitMq只会检查第一个消息是否过期,如果过期则丢到死信队列。
ex:若有两条消息,第一个消息延迟20秒执行,第二个消息延迟10秒执行,但RabbitMq只会检测队首第一条消息的过期时间。这样就会造成第二条消息延迟30秒执行。
DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
为了解决 “队列阻塞”问题,新的插件发布了,再消息粒度上实现 消息过期控制。
<dependencies> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- web相关依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- json相关依赖--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
插件下载官方链接: rabbitmq_delayed_message_exchange
export PATH=$PATH:/opt/middle/rabbitmq/erlang/bin
cd /opt/middle/rabbitmq/sbin
./rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
./rabbitmq-plugins list
import lombok.RequiredArgsConstructor; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import java.util.HashMap; import java.util.Map; /** * @author liuzz * @date 2023/5/18 0018下午 4:09 */ @Configuration("relevancyRabbitMqConfig") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class RelevancyRabbitMqConfig { private final CachingConnectionFactory factory; /** * rabbitmq 下发计划延时队列 **/ public static final String RELEVANCY_DELAYED_EXCHANGE = "saas.cbs.relevancy.delayed.exchange"; /** * rabbitmq 下发延时队列订阅路由key **/ public static final String RELEVANCY_DELAYED_ROUTINGKEY = "saas.cbs.relevancy.delayed.routingkey"; /** * rabbitmq 下发延时队列 **/ public static final String RELEVANCY_DELAYED_QUEUE = "saas.cbs.relevancy.delayed.queue"; @Bean("relevancyRabbitTemplate") @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); //开启发送失败退回 rabbitTemplate.setMandatory(true); //消息转换器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } //插件版本 -- 实现延迟队列 @Bean("relevancyDelayedQueue") public Queue relevancyDelayedQueue() { return new Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE); } //定义延时交换机 -- 插件版本 //指定交换器类型为 x-delayed-message //设置属性 x-delayed-type @Bean("relevancyDelayedExchange") public CustomExchange relevancyDelayedExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE, "x-delayed-message", true, false, args); } /** * 绑定延时队列与交换机信息 */ @Bean public Binding bindingNotify(@Qualifier("relevancyDelayedQueue") Queue relevancyDelayedQueue, @Qualifier("relevancyDelayedExchange") CustomExchange relevancyDelayedExchange) { return BindingBuilder .bind(relevancyDelayedQueue) .to(relevancyDelayedExchange) .with(RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY).noargs(); } }
@Slf4j @Component public class RelevancyExecuteMqConsume { @Autowired @Qualifier("relevancyRabbitTemplate") RabbitTemplate rabbitTemplate; /** * @Desc: 发送下发计划过期MQ * @param relevancyFrsMqSendMsgBo * @param finalExpirationTime **/ public void sendSnapshotPlanMsg(RelevancyFrsMqSendMsgBo relevancyFrsMqSendMsgBo, Integer finalExpirationTime) { MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //1.设置message的信息 message.getMessageProperties().setDelay(finalExpirationTime);//消息的过期时间 //2.返回该消息 return message; } }; rabbitTemplate.convertAndSend(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY,relevancyFrsMqSendMsgBo,messagePostProcessor); } }
@Slf4j @Component public class RelevancyExecuteMqConsume { @Autowired @Qualifier("relevancyRabbitTemplate") RabbitTemplate rabbitTemplate; @RabbitListener(bindings = { @QueueBinding(value = @Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE), exchange = @Exchange(name = RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,type ="x-delayed-message" ), key= RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY) }) public void process(Message message, Channel channel) { //消费数据 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。