赞
踩
1,基于死信队列
2,集成延迟插件
使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1)一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeadLetterMqConfig { // 声明一些变量 public static final String exchange_dead = "exchange.dead"; public static final String routing_dead_1 = "routing.dead.1"; public static final String routing_dead_2 = "routing.dead.2"; public static final String queue_dead_1 = "queue.dead.1"; public static final String queue_dead_2 = "queue.dead.2"; // 定义交换机 @Bean public DirectExchange exchange(){ return new DirectExchange(exchange_dead,true,false,null); } @Bean public Queue queue1(){ // 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上! HashMap<String, Object> map = new HashMap<>(); // 参数绑定 此处的key 固定值,不能随意写 map.put("x-dead-letter-exchange",exchange_dead); map.put("x-dead-letter-routing-key",routing_dead_2); // 设置延迟时间 map.put("x-message-ttl", 10 * 1000); // 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数 return new Queue(queue_dead_1,tr8 `ue,false,false,map); } @Bean public Binding binding(){ // 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上 return BindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1); } // 这个队列二就是一个普通队列 @Bean public Queue queue2(){ return new Queue(queue_dead_2,true,false,false,null); } // 设置队列二的绑定规则 @Bean public Binding binding2(){ // 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上! return BindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2); } }
@RestController @RequestMapping("/mq") @Slf4j public class MqController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitService rabbitService; @GetMapping("sendDeadLettle") public Result sendDeadLettle() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead, DeadLetterMqConfig.routing_dead_1, "ok"); System.out.println(sdf.format(new Date()) + " Delay sent."); return Result.ok(); } }
@Component
@Configuration
public class DeadLetterReceiver {
@RabbitListener(queues = DeadLetterMqConfig.queue_dead_2)
public void get(String msg) {
System.out.println("Receive:" + msg);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("Receive queue_dead_2: " + sdf.format(new Date()) + " Delay rece." + msg);
}
}
Rabbitmq实现了一个插件x-delay-message来实现延时队列
@Configuration public class DelayedMqConfig { public static final String exchange_delay = "exchange.delay"; public static final String routing_delay = "routing.delay"; public static final String queue_delay_1 = "queue.delay.1"; /** * 队列不要在RabbitListener上面做绑定,否则不会成功,如队列2,必须在此绑定 * * @return */ @Bean public Queue delayQeue1() { // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化 return new Queue(queue_delay_1, true); } @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(exchange_delay, "x-delayed-message", true, false, args); } @Bean public Binding delayBbinding1() { return BindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs(); } }
@GetMapping("sendDelay")
public Result sendDelay() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay, DelayedMqConfig.routing_delay, sdf.format(new Date()), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10 * 1000);
System.out.println(sdf.format(new Date()) + " Delay sent.");
return message;
}
});
return Result.ok();
}
@Component
public class DelayReceiver {
@RabbitListener(queues = DelayedMqConfig.queue_delay_1)
public void get(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("Receive queue_delay_1: " + sdf.format(new Date()) + " Delay rece." + msg);
}
}
启动测试即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。