赞
踩
其实,我们可以把这些死信放到自定义的死信队列里去。
“死信队列”,顾名思义,就是存放死信的队列。其实它和普通的队列并没有太大差别,唯一的区别就是他的routingkey是"#"。也就是说:只要你路由到我这个死信队列,我都接收。
package com.wy.testrabbitmq.dlx;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-12 17:20
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingkey = "dlx.dlx";
String msg = "test dlx message";
for (int i = 0; i < 3; i++) {
// deliveryMode=2 持久化,expiration 消息有效时间
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("utf-8")
.expiration("7000")
.build();
channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes());
}
}
}
package com.wy.testrabbitmq.dlx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-12 17:21
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingkey = "dlx.#";
String queueName = "test_dlx_queueName";
Map<String,Object>map =new HashMap<>();
//注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
map.put("x-dead-letter-exchange","dlx.exchange");
//声明队列
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
channel.queueDeclare(queueName, true, false, false, map);
channel.queueBind(queueName, exchangeName, routingkey);
//死信队列声明
channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
channel.queueDeclare("dlx.queue",true,false,false,null);
//routingkey指定为#就行,表示只要路由到死信队列的都接收
channel.queueBind("dlx.queue","dlx.exchange","#");
channel.basicConsume(queueName, true, new TestConsumer(channel));
}
}
如上代码中,我定义了消息的有效时间为7秒。按照逻辑,推送的消息超过7秒没有被消费,就会变成死信。下面我只启动生产端,看管控台变化。
如下,刚启动时我们可以看到我的test_dlx_queueName中有3条消息。
等待7秒后如下:
我们可以看到,由于超过了TTL的有效时间,这3条消息变成了死信,被自动转移到了我们自定义的dlx.queue当中。
//注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
Map<String,Object>map =new HashMap<>();
map.put(“x-dead-letter-exchange”,“dlx.exchange”);
//注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
channel.queueDeclare(queueName, true, false, false, map);
// routingkey指定为#就行,表示只要路由到死信队列的都接收
channel.queueBind(“dlx.queue”,“dlx.exchange”,“#”);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。