赞
踩
在添加链接描述下载rabbitmq_delayed_message_exchange 插件,本文以v3.10.0为例
scp /Users/hong/资料/rabbitmq_delayed_message_exchange-3.10.0.ez root@10.211.55.4:/usr/local/software
mv rabbitmq_delayed_message_exchange-3.10.0.ez /usr/local/software/rabbitmq_server-3.10.0/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-server start
package com.hong.springboot.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import java.util.HashMap; import java.util.Map; /** * @Description: 延迟队列配置类 * @Author: hong * @Date: 2024-02-25 20:19 * @Version: 1.0 **/ @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingKey"; /** * 基于延迟插件声明自定义交换机 * @return */ @Bean public CustomExchange delayedExchange(){ Map<String,Object> map = new HashMap<>(); map.put("x-delayed-type","direct"); /** * 声明自定义交换机 * 第1个参数:交换机名称 * 第2个参数:交换机类型 * 第3个参数:是否需要持久化 * 第4个参数:是否需要自动删除 * 第5个参数:其他参数 */ return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,map); } @Bean public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE_NAME); } @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
/**
* 基于延迟插件的发送消息
* @param message
* @param delayTime 延迟时间
*/
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , delayTime, message);
}
package com.hong.springboot.rabbitmq.consumer; import com.hong.springboot.rabbitmq.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; /** * @Description: 基于延迟插件的延迟消费者 * @Author: hong * @Date: 2024-02-25 21:27 * @Version: 1.0 **/ @Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayMessage(Message message){ String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg); } }
http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 1/20000
http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 2/2000
基于插件的延迟与基于死信队列的结果恰好相反更符合预期,因此在实际项目中通常采用延迟插件方式来实现rabbitMQ的延迟队列
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。