赞
踩
死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。
死信的来源
在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:
basic.reject
或 basic.nack
)且不重新入队(requeue
参数为 false
)。生产者
package com.weipch.rabbitmq.dlq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.GetResponse; import com.weipch.util.RabbitMqUtils; /** * @Author 方唐镜 * @Create 2024-03-03 14:08 * @Description */ public class Produce { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //模拟消息过期 10s //AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 10; i++) { String message = "hello world" + i; channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes()); } } }
消费者
正常队列:
package com.weipch.rabbitmq.dlq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.weipch.util.RabbitMqUtils; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * @Author 方唐镜 * @Create 2024-03-03 13:50 * @Description */ public class Consumer01 { private static final String NORMAL_EXCHANGE = "normal_exchange"; private static final String DEAD_EXCHANGE = "dead_exchange"; private static final String NORMAL_QUEUE = "normal_queue"; private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信交换机和队列 channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key"); //声明普通交换机和队列 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //死信配制 指定死信交换机和死信路由键 Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", "dead-routing-key"); //最大长度 //map.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, map); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); if (message.contains("5")){ System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝"); //拒绝消息并把消息丢入死信队列 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); }else { System.out.println("Consumer01接收消息:" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {}); } }
死信队列:
package com.weipch.rabbitmq.dlq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.weipch.util.RabbitMqUtils; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; /** * @Author 方唐镜 * @Create 2024-03-03 13:50 * @Description */ public class Consumer02 { private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.basicConsume(DEAD_QUEUE, true, (consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)), (consumerTag, e) -> {}); } }
生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。
延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。
使用场景
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
以docker方式安装
1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins
插件版本和rabbitmq版本一致
2、进入容器查看插件
3、启动插件
root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4、重启容器
docker restart 7c8726620871
5、安装成功
配置类
package springbootrabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DelayedQueueConfig { // 队列 public static final String DELAYED_QUEUE_NAME = "delayed.queue"; // 交换机 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; // routingKey public static final String DELAYED_ROUTING_KEY = "delayed.routingKey"; // 声明队列 @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } // 声明交换机 基于插件的交换机 @Bean public CustomExchange delayedExchange() { HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); /* * 1.交换机名称 * 2.交换机类型 * 3.是否需要持久化 * 4.是否需要自动删除 * 5.其他参数 * */ return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments); } // 绑定 @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
生产者
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
// 发送消息的时候 延迟时长
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消费者
@Slf4j
@Component
public class DelayedQueueConsumer {
//监听消息
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。