赞
踩
在这讲正式之前,咱们先把之前的生产者、消费者代码,简单优化下吧
看之前的代码:生产者、消费者的连接和关闭都是一样的代码,那么,我们就可以将相同的代码提取出来,放在一个工具类里面,然后在相应的地方用工具类进行替换。
提取一个工具类 RabbitMqUtil,用来处理连接、关闭功能:
public class RabbitMqUtil { // 私有构造 private RabbitMqUtil() {} // 获取连接 public static Connection getConnection(String name) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection conn = connectionFactory.newConnection(name); return conn; } // 关闭信道、连接 public static void close(Connection conn, Channel channel) throws Exception{ if (null != channel && channel.isOpen()) { channel.close(); } if (null != conn && conn.isOpen()) { conn.close(); } } }
生产者代码修改为:
public class Producer { public static void main(String[] args) { // 1. 获取连接 Connection connection = null; try { connection = RabbitMqUtil.getConnection("生产者"); } catch (Exception e) { System.out.println("获取连接时,出现异常"); } Channel channel = null; try { // 2. 通过连接获取通道 Channel channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 3. 通过通道创建声明队列 channel.queueDeclare(queueName, false, false, false, null); // 4. 准备消息内容 String message = "Hello RabbitMQ"; // 5. 发送消息给队列 Queue channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("消息发送完成~~~发送的消息为:" + message); } catch (Exception e) { e.printStackTrace(); } finally { try { RabbitMqUtil.close(connection, channel); } catch (Exception e) { System.out.println("关闭时,出现异常"); } } } }
消费者代码为:
public class Consumer { public static void main(String[] args) { Connection connection = null; try { connection = RabbitMqUtil.getConnection("消费者"); } catch (Exception e) { System.out.println("获取连接异常"); } Channel channel = null; try { channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 交换机 String exchange = envelope.getExchange(); // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列。自动确认 channel.basicConsume(queueName, true, consumer); System.out.println("开始接收消息~~~"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { try { RabbitMqUtil.close(connection, channel); } catch (Exception e) { System.out.println("关闭时,出现异常"); } } } }
咱们再来回顾下 消费者中的 channel.basicConsume()
方法,它有三个参数,分别为:
以上的示例中,咱们是将自动确认参数设置为 true
,那么,这就会导致一个现象:消息一旦被消费者接收,队列中的消息就会被删除。
问题:RabbitMQ 怎么知道消息被消费者接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?总之:如果消费消息失败了,RabbitMQ 无从得知,这样消息就丢失了!
为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制 ACK:当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。
不过这种回执 ACK 分两种情况:
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程挂掉后消息丢失的问题。因为,RabbitMQ 会一直等待持有消息,直到消费者显示调用 Basic.Ack 命令为止。
并且,当 autoAck 参数设置为 false 后,对于 RabbitMQ 而言,队列中的消息分成了两部分:一是等待投递给消费者的消息;二是已经投递给消费者,但还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者。
对于以上的两种回执 ACK,如何选择哪一种,这得看消息的重要性了(建议选择手动确认):
将自动 ACK 修改为手动 ACK,只需要修改消费者的代码:
public class Consumer { public static void main(String[] args) throws Exception{ // 获取连接 Connection connection = RabbitMqUtil.getConnection("消费者"); final Channel channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); /** * @param1:deliveryTag:用来标识消息的id * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息 */ // 手动确认 channel.basicAck(deliveryTag, false); } }; // 监听队列 手动 ACK channel.basicConsume(queueName, false, consumer); System.out.println("开始接收消息~~~"); System.in.read(); // 关闭信道、连接 RabbitMqUtil.close(connection, channel); } }
主要作出一下两点变化:
channel.basicConsume()
方法的第二个参数修改为 falsechannel.basicAck()
方法,进行手动确认,修改后,发现上述的消费者依旧可以消费消息
问题:那么,自动 ACK 会带来什么问题呢?接下来我们通过实例进行演示一下
我们修改下自动 ACK 的消费者代码:在 handleDelivery()
方法中手动添加异常代码,让其抛出异常,这样,就中断了正在执行的 handleDelivery()
方法:
channel = connection.createChannel(); String queueName = "code_simple_queue1"; // 定义消费者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 抛出异常,后续代码都不会执行 int result = 1 / 0; // 交换机 String exchange = envelope.getExchange(); // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 消息体 String msg = new String(body,"utf-8"); System.out.println("收到消息:" + msg); } }; // 监听队列 channel.basicConsume(queueName, true, consumer);
上述的 handleDelivery()
方法确实会抛出异常,但它又被 ConsumerDispatcher#handleDelivery()
方法捕获并进行处理了:
public void handleDelivery(final Consumer delegate, final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
this.executeUnlessShuttingDown(new Runnable() {
public void run() {
try {
delegate.handleDelivery(consumerTag, envelope, properties, body);
} catch (Throwable var2) {
ConsumerDispatcher.this.connection.getExceptionHandler().handleConsumerException(ConsumerDispatcher.this.channel, var2, delegate, consumerTag, "handleDelivery");
}
}
});
}
然后,运行生产者,去生产消息,这时,得注意消息的总数。再运行消费者程序,发现它会抛出异常并被处理了,然而,异常后面的代码都没有被执行,但刚才生产的消息却被消费了。实际上消费者还没有去真正地消费消息。
但将这行异常代码添加到手动 ACK 中的代码中去,发现这条消息并没有被消费!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。