赞
踩
死信,在官网中对应的单词为“Dead Letter”
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列。
死信的处理方式大致有以下几种:
工具类:
public class RabbitmqUtil { public static Channel getChannel() throws IOException, TimeoutException { //通过连接工厂创建新的连接和mq建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
场景:用户下的订单消息,设置过期时间后放入队列,超过时间消息还未被消费,则放入死信队列
代码演示,生产者:
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitmqUtil.getChannel(); // 正常队列 String orderExchangeName = "order_exchange"; String orderQueueName = "order_queue"; String orderRoutingKey = "order.#"; Map<String, Object> arguments = new HashMap<String, Object>(16); // 死信队列 String dlxExchangeName = "dlx.exchange"; String dlxQueueName = "dlx.queue"; String dlxRoutingKey = "#"; // 为队列设置队列交换器 arguments.put("x-dead-letter-exchange", dlxExchangeName); // 设置队列中的消息 10s 钟后过期 arguments.put("x-message-ttl", 10000); // 创建正常交换器和队列 // 第三个参数表示持久化交换机 // 第四个参数表示交换机不再使用,不自动删除交换机 channel.exchangeDeclare(orderExchangeName, "topic", true, false, null); // 第二个参数表示持久化队列 // 第三个参数表示该消息队列是否只在当前connection有效,默认是false // 第四个参数表示队列没在使用时是否自动删除 // 第五个参数是其他属性,设置死信队列 channel.queueDeclare(orderQueueName, true, false, false, arguments); channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey); // 创建死信交换器和队列 channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null); channel.queueDeclare(dlxQueueName, true, false, false, null); channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey); String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单."; channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("发送消息:" + message); } }
消费者:
public class Comsumer { private static final String QUEUE_NAME = "dlx.queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitmqUtil.getChannel(); com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("死信队列收到消息:"+new String(body)); System.err.println("deliveryTag:" + envelope.getDeliveryTag()); // 第二个参数:如果为true,确认之前收到的消息,如果为false,确认当前收到的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); TimeUnit.SECONDS.sleep(10000000L); } }
场景,消费之过滤某些消息
代码演示,生产者:
public class Producer2 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitmqUtil.getChannel(); String exchangeName = "test_ack_exchange"; String routingKey = "ack.save"; //通过在properties设置来标识消息的相关属性 for(int i=0;i<5;i++){ Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num",i); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) // 传送方式 2:持久化投递 .contentEncoding("UTF-8") // 编码方式 //.expiration("10000") // 过期时间 .headers(headers) //自定义属性 .build(); String message = "hello this is ack message ....." + i; System.out.println("发送消息:" + message); channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes()); } } }
消费者:
public class Comsumer2 { public static void main(String[] args) throws Exception{ final Channel channel = RabbitmqUtil.getChannel(); String exchangeName = "test_ack_exchange"; String exchangeType="topic"; final String queueName = "test_ack_queue"; String routingKey = "ack.#"; //死信队列配置 ---------------- String deadExchangeName = "dead_exchange"; String deadQueueName = "dead_queue"; String deadRoutingKey = "#"; //死信队列配置 ---------------- //如果需要将死信消息路由 Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange",deadExchangeName); channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); channel.queueDeclare(queueName,false,false,false,arguments); channel.queueBind(queueName,exchangeName,routingKey); //死信队列绑定配置 ---------------- channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null); channel.queueDeclare(deadQueueName,true,false,false,null); channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey); //死信队列绑定配置 ---------------- System.out.println("consumer启动 ....."); com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ Thread.sleep(2000); }catch (Exception e){ } Integer num = (Integer)properties.getHeaders().get("num"); if(num==0){ // 第二个参数:true,表示处理消息失败,将消息重新放回队列,如果抛异常或nack(并且第三个参数是true),消息会重写入队列 // 第三个参数:true,表示把消费失败的消息重新放入队列的尾部,false不会回到队列 channel.basicNack(envelope.getDeliveryTag(),false,false); String message = new String(body, "UTF-8"); System.out.println("consumer端的Nack消息是:" + message); }else { channel.basicAck(envelope.getDeliveryTag(),false); String message = new String(body, "UTF-8"); System.out.println("consumer端的ack消息是:" + message); } } }; //消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收 channel.basicConsume(queueName,false, consumer); } }
生产者:
消费者:
死信队列中有一条消息:
代码演示,生产者:
注:这里和方式1类似,并且用的队列名称和方式1相同,由于修改了队列的属性,需要删除旧的队列,因为队列一旦建立好,就不能修改
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitmqUtil.getChannel(); // 正常队列 String orderExchangeName = "order_exchange"; String orderQueueName = "order_queue"; String orderRoutingKey = "order.#"; Map<String, Object> arguments = new HashMap<String, Object>(16); // 死信队列 String dlxExchangeName = "dlx.exchange"; String dlxQueueName = "dlx.queue"; String dlxRoutingKey = "#"; // 为队列设置队列交换器 arguments.put("x-dead-letter-exchange", dlxExchangeName); // 设置队列的最大长度 arguments.put("x-max-length",3); // 创建正常交换器和队列 // 第三个参数表示持久化交换机 // 第四个参数表示交换机不再使用,不自动删除交换机 channel.exchangeDeclare(orderExchangeName, "topic", true, false, null); // 第二个参数表示持久化队列 // 第三个参数表示该消息队列是否只在当前connection有效,默认是false // 第四个参数表示队列没在使用时是否自动删除 channel.queueDeclare(orderQueueName, true, false, false, arguments); channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey); // 创建死信交换器和队列 channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null); channel.queueDeclare(dlxQueueName, true, false, false, null); channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey); String message; for(int i = 0; i < 5; i++) { message = "message-" + i; System.out.println("发送消息:" + message); channel.basicPublish(orderExchangeName, "order.save", null, message.getBytes()); } } }
消费者:
public class Comsumer { private static final String QUEUE_NAME = "order_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitmqUtil.getChannel(); com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("死信队列收到消息:"+new String(body)); System.err.println("deliveryTag:" + envelope.getDeliveryTag()); // 第二个参数:如果为true,确认之前收到的消息,如果为false,确认当前收到的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME,false, consumer); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。