赞
踩
Community Plugins — RabbitMQhttps://www.rabbitmq.com/community-plugins.html下载 rabbitmg_delayed_message_exchange 插件
具体安装参详其它博主博客
1.死信也可以实现延迟队列,但是有很大的缺点,就是不管延迟时长,都得按顺序排队,切死信的方式实现延迟流程结构复杂
2.基于延迟插件实现,主要就是延迟交换机的类型,需要特殊去声明好
- package com.rabbitmqtest.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.CustomExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
-
- @Configuration
- public class DelayedQueueConfig {
-
- //队列
- public static final String DELAYED_QUEUE_NAME = "delayed.queue";
- //交换机
- public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
- //routingKey
- public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
-
- //声明队列
- @Bean
- public Queue delayedQueue() {
- return new Queue(DELAYED_QUEUE_NAME);
- }
-
- //声明交换机 基于延迟插件的交换机
- @Bean
- public CustomExchange delayedExchange() {
- HashMap<String, Object> arguments = new HashMap<>();
- arguments.put("x-delayed-type", "direct");
- /**
- * 1.交换机名称
- * 2.交换机的类型
- * 3.是否需要持久化
- * 4.是否需要自动删除
- * 5.其它参数
- */
- return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
- true, false, arguments);
- }
-
- //绑定 队列和延迟交换机
- @Bean
- public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
- return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
- }
- }
- package com.rabbitmqtest.consumer;
-
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import java.util.Date;
-
- /**
- * 监听死信队列
- * 过期时间TTL
- */
- @Component
- @Slf4j
- public class DeadLetterQueueConsumer {
- //接收消息,监听的队列名称对应config里声明的队列
- @RabbitListener(queues = "QD")
- public void receiveD(Message message, Channel channel){
- String s = new String(message.getBody());
- log.info("当前时间: [{}] ,收到死信队列的消息 : [{}] ",new Date().toString(),s);
- }
- }
- package com.rabbitmqtest.controller;
-
- import com.rabbitmqtest.config.DelayedQueueConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.util.Date;
-
- @Slf4j
- @RestController
- public class RabbitTestController {
- @Resource
- private RabbitTemplate rabbitTemplate;
- /**
- * 基于延迟插件发送消息
- * @param message
- * @param delayTime
- */
- @GetMapping("/sendDelayMsg/{message}/{delayTime}")
- public void rabbitSimple1(@PathVariable("message") String message,@PathVariable("delayTime") Integer delayTime){
- log.info("当前时间: [{}] ,发送一条时长:[{}] ms 发送信息给延迟队列delayed.queue : [{}] ",new Date().toString(),delayTime,message);
- //发送消息对应config声明的交换机关系
- rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{
- //发送消息的时候 延迟时长 单位:ms
- msg.getMessageProperties().setDelay(delayTime);
- return msg;
- });
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。