赞
踩
https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03
消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。
为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。
如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。
这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。
package com.example.rabbitmq03.business.test7; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { // 简单模式 public static void main(String[] args) { // 1. 获取连接 Connection connection = null; try { connection = RabbitMqUtil.getConnection("生产者"); } catch (Exception e) { System.out.println("获取连接时,出现异常"); } Channel channel = null; try { // 2. 通过连接获取通道 Channel channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 3. 通过通道创建声明队列 channel.queueDeclare(queueName, false, false, false, null); // 4. 准备消息内容 String message = "你好"; // 5. 发送消息给队列 Queue channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("消息发送完成~~~发送的消息为:" + message); } catch (Exception e) { e.printStackTrace(); } finally { try { RabbitMqUtil.close(connection, channel); } catch (Exception e) { System.out.println("关闭时,出现异常"); } } } }
修改的地方
package com.example.rabbitmq03.business.test7; import java.io.IOException; import java.util.concurrent.TimeUnit; import com.rabbitmq.client.*; public class Consumer { public static void main(String[] args) throws Exception{ // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); final Channel channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); /** * @param1:deliveryTag:用来标识消息的id * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息 */ // 手动确认 channel.basicAck(deliveryTag, false); } }; // 监听队列 手动 ACK channel.basicConsume(queueName, false, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
package com.example.rabbitmq03.business.test7; import java.io.IOException; import java.util.concurrent.TimeUnit; import com.rabbitmq.client.*; public class Consumer { public static void main(String[] args) throws Exception { // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); final Channel channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { int result = 1 / 0; // body 消息体 String msg = new String(body, "utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列 自动 ACK channel.basicConsume(queueName, true, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
消费者代码报错,没有收到消息,但是队列的消息少了,原因就是,MQ将异常内部消化了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。