赞
踩
常用的延迟消息实现方式有:
利用 队列TTL + 死信队列 方式实现
利用消息延迟插件实现
消息变成死信的原因有:
消息过期。消息TTL或队列TTL
消息被拒绝。消费者调用了 channel.basicNack
或 channel.basicReject
,并且设置 requeue=false
队列满。
当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的
overflow
设置
这里直接贴出 rabbitConfig 代码,其他的代码参考该文章:RabbitMQ (三)消息重试
主要操作:
创建死信队列和交换器,并绑定
创建队列,同时设置队列的TTL、绑定死信队列;创建交换器,并绑定,
- package com.fmi110.rabbitmq.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
-
-
- /**
- * @author fmi110
- * @description rabbitMQ 配置类
- * @date 2021/7/1 15:08
- */
- @Configuration
- @Slf4j
- public class RabbitConfig {
-
- String dlQueueName = "my-queue-dl"; // 普通队列名称
- String dlExchangeName = "my-exchange-dl"; // 死信交换器名称
- String dlRoutingKey = "rabbit.test";
-
- String queueName = "retry-queue";
- String exchangeName = "my-exchange"; // 普通交换器名称
-
- /**
- * 创建死信队列
- *
- * @return
- */
- @Bean
- public Queue queueDL() {
-
- return QueueBuilder
- .durable(dlQueueName) // 持久化队列
- .build();
- }
-
- /**
- * 创建死信交换机
- *
- * @return
- */
- @Bean
- public TopicExchange exchangeDL() {
- return new TopicExchange(dlExchangeName, true, false);
- }
-
- /**
- * 绑定操作
- */
- @Bean
- public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {
- log.info(">>>> 队列与交换器绑定");
- return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);
- }
-
- /**
- * 创建持久化队列,同时绑定死信交换器
- *
- * @return
- */
- @Bean
- public Queue queue() {
- log.info(">>>> 创建队列 retry-queue");
- HashMap<String, Object> params = new HashMap<>();
- params.put("x-dead-letter-exchange", dlExchangeName);
- params.put("x-dead-letter-routing-key", dlRoutingKey);
-
- params.put("x-message-ttl", 10 * 1000); // 队列过期时间 10s
-
- return QueueBuilder
- .durable(queueName) // 持久化队列
- .withArguments(params) // 关联死信交换器
- .build();
- }
-
-
- /**
- * 创建交换机
- *
- * @return
- */
- @Bean
- public TopicExchange exchange() {
- log.info(">>>> 创建交换器 my-exchange");
- boolean durable = true; // 持久化
- boolean autoDelete = false; // 消费者全部解绑时不自动删除
- return new TopicExchange(exchangeName, durable, autoDelete);
- }
-
- /**
- * 绑定队列到交换机
- *
- * @param queue
- * @param exchange
- * @return
- */
- @Bean
- public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {
- log.info(">>>> 队列与交换器绑定");
- return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");
- }
-
- }
延迟消息通过队列的TTL产生,所以这里不应该设置普通队列的消费者,让消息过期然后自动转入死信队列,此时再进行消费以此实现延迟消息
- package com.fmi110.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.atomic.AtomicInteger;
-
-
- /**
- * @author fmi110
- * @description 消息消费者
- * @date 2021/7/1 16:08
- */
- @Component
- @Slf4j
- public class RabbitConsumer {
- /**
- * 死信队列消费者
- * @param data
- * @param channel
- * @param tag
- * @throws Exception
- */
- @RabbitListener(queues="my-queue-dl")
- public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- log.info(">>>> {} 死信队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);
- }
- }
如上图所示实现了延迟10s的消息,但是如果需要实现延迟5s的消息,则需要新建一个TTL为5s的队列,所以如果延迟时间需要很多的话,就需要创建很多队列,实现起来比较麻烦。
再贴一段对消息设置TTL的代码:
- AtomicInteger aint = new AtomicInteger();
- public void send(String msg) {
- String exchangeName = "my-exchange";
- String routingKey = "rabbit.test";
- // rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setCorrelationId(UUID.randomUUID().toString().replace("-", ""));
- // TTL 为5s
- int i = 9 * 1000;
-
- if (aint.incrementAndGet() % 2 == 0) {
- i = 5 * 1000;
- }
- msg = "message send at " + dateFormat.format(new Date()) +", expired at "+dateFormat.format(new Date().getTime()+i);
- messageProperties.setExpiration(String.valueOf(i)); // 设置过期时间
- Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
- rabbitTemplate.send(exchangeName, routingKey, message);
- }
可以看到消息的过期时间与期望的不一致。因为只有在头部的消息,系统才对其进行过期检测。所以如果消息不再队列头部,即使时间已经过期,也不会导致消息进入死信队列!!!
当同时设置了消息的TTL和队列的TTL时,过期时间谁小谁生效(队列头部的消息才进行TTL检测)。
插件的安装参考 docker安装rabbitMQ
使用延迟插件实现,需要创建延迟交换器,使用 CustomExchange 类创建,同时指定交换器类型为 x-delayed-message ,此外还需要设置属性 x-delayed-type ,创建的交换器如下图所示
- package com.fmi110.rabbitmq.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.CustomExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
-
- /**
- * @author fmi110
- * @description 配置交换器、队列
- * @date 2021/7/3 9:58
- */
- @Slf4j
- @Configuration
- public class RabbitConfig2 {
-
- String exchangeName = "delay-exchange";
- String queueName = "delay-queue";
- String exchangeType = "x-delayed-message";
-
- @Bean
- public CustomExchange exchange() {
-
- HashMap<String, Object> args = new HashMap<>();
- args.put("x-delayed-type", "topic");
- return new CustomExchange(exchangeName, exchangeType, true, false, args);
- }
-
- @Bean
- public Queue queue() {
- return new Queue(queueName, true, false, false);
- }
-
- @Bean
- public Binding binding(CustomExchange exchange, Queue queue) {
- return BindingBuilder.bind(queue)
- .to(exchange)
- .with("rabbit.delay")
- .noargs();
-
- }
- }
这里开启了消息投递失败回调。测试中发现,使用延迟插件,虽然消息正常投递了,但是始终会报 “NO_ROUTER” 提示路由失败。虽然不影响功能。运行截图见后文。目前不确定是我设置问题还是框架的问题...
- package com.fmi110.rabbitmq;
-
- import com.rabbitmq.client.AMQP;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageDeliveryMode;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StringUtils;
-
- import javax.annotation.PostConstruct;
- import java.nio.charset.StandardCharsets;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.UUID;
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * @author fmi110
- * @description 消息生产者
- * @date 2021/7/1 15:08
- */
- @Component
- @Slf4j
- public class RabbitProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * 1 设置 confirm 回调,消息发送到 exchange 时回调
- * 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调
- * <p>
- * correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用
- */
- @PostConstruct
- public void enableConfirmCallback() {
- // #1
- /**
- * 连接不上 exchange或exchange不存在时回调
- */
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (!ack) {
- log.error("消息发送失败");
- // TODO 记录日志,发送通知等逻辑
- }
- });
-
- // #2
- /**
- * 消息投递到队列失败时,才会回调该方法
- * message:发送的消息
- * exchange:消息发往的交换器的名称
- * routingKey:消息携带的路由关键字信息
- */
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- log.error("{},exchange={},routingKey={}",replyText,exchange,routingKey);
- // TODO 路由失败后续处理逻辑
- });
- }
-
- public void sendDelayMsg(String delay) {
- int delayInt = StringUtils.isEmpty(delay) ? 0 : Integer.valueOf(delay);
- String exchangeName = "delay-exchange";
- String routingKey = "rabbit.delay";
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String msg = "message send at " + dateFormat.format(new Date()) + ", expired at " + dateFormat.format(new Date().getTime() + delayInt * 1000);
- // MessageProperties messageProperties = new MessageProperties();
- // messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息
- // messageProperties.setDelay(delayInt * 1000);
- // Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
- // rabbitTemplate.send(exchangeName,routingKey,message);
-
- rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, message ->{
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //消息持久化
- message.getMessageProperties().setDelay(delayInt * 1000); // 单位为毫秒
- return message;
- });
-
- }
- }
消费者,指定监听对应的消息队列即可。
- package com.fmi110.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.concurrent.atomic.AtomicInteger;
-
-
- /**
- * @author fmi110
- * @description 消息消费者
- * @date 2021/7/1 16:08
- */
- @Component
- @Slf4j
- public class RabbitConsumer {
-
- @RabbitListener(queues="delay-queue")
- public void consumeDelay(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- log.info(">>>> {} 延迟队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);
- }
- }
- package com.fmi110.rabbitmq.controller;
-
-
- import com.fmi110.rabbitmq.RabbitProducer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.ResponseBody;
- import org.springframework.web.bind.annotation.RestController;
- import sun.rmi.runtime.Log;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.HashMap;
- @Slf4j
- @RestController
- public class TestController {
- @Autowired
- RabbitProducer rabbitProducer;
-
- @GetMapping("/delay")
- public Object delay(String delay) {
- rabbitProducer.sendDelayMsg(delay); // 发送消息
- HashMap<String, Object> result = new HashMap<>();
- result.put("code", 0);
- result.put("msg", "success");
- return result;
- }
- }
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <scope>compile</scope>
- </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。