赞
踩
有过网络编程的经历,或者了解TCP协议的童鞋都知道:TCP协议通过它的ack机制(应答机制)保证了TCP包传输的可靠性,在面试中必问的TCP连接的三次握手和四次挥手就是对该知识点的考验。我们也知道,RabbitMQ是建立在AMQP协议之上的,这个应用层的协议为了保证消息传递的可靠性,也借鉴了TCP的这种思想,构成了自己一套消息ACK机制。
AMQP的ACK实际上包含了两个方面:第一个是在生产者端,当消息到达了指定的交换机,进行一次确认;另外一个是在消费端,消息从队列到了消费者,消费者进行自动或者手动ack。
但是从应用层面上来讲,在确保消息的可靠性这方面,单单靠RabbitMQ的应答机制,还远远不能满足需求。让我们来看下,一个消息从产生到消费的过程中要经过什么环节:
(1)生产者连接RabbitMQ代理
(2)生产者发布消息到交换机
(3)交换机路由消息到队列
(4)消费者连接RabbitMQ代理
(5)消费者从队列获取到消息
上面5个环节中,无论哪个环节出错,导致的最终结果就是生产者发送的消息无法发送到消费者,也就无法完成业务。遇到这种情况,最简单的解决方案就是把出错的消息持久化到数据库中,然后使用定时任务定时轮询去发送,如果超过一定的次数,人工进行介入。这也是在下文中笔者的处理思路。不过本文旨在说明处理方法,不会细化到实现保存出错消息的代码。下面,我们就来分别学习下,针对上述5中不同的场景下出错时,在代码中怎样感知,并进行处理。
一、生产者或者消费者连接不到RabbitMQ代理
首先,我们将上述5个环节中的第1个和第4个环节进行了合并,因为本质上都是由于RabbitMQ的代理发生了宕机或者通信问题,导致生产者或者消费者连接不到服务器所致。
此时,会在我们会在代码中捕获到两种异常:IOException和TimeoutException,IOException是在RabbitMQ的服务器宕机时抛出,而TimeoutException是由于通信原因或者其它原因,连接RabbitMQ服务器超时时抛出,不管是哪种,我们都能在代码中捕获到,并进行下一步的处理:
代码片段1-1:
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(this.rabbitMqHost);
- factory.setPort(this.rabbitMqPort);
- factory.setConnectionTimeout(this.rabbitMqTimeOut);
- factory.setUsername(this.rabbitMqUsername);
- factory.setPassword(this.rabbitMqPassword);
- factory.setVirtualHost("/");
- Connection connection = null;
- Channel channel = null;
- try {
- try {
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
- channel.queueDeclare("test-queue", true, false, false, null);
- com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
- channel.basicPublish("1direct-exchange", "test-queue", null, msg.getBytes("UTF-8"));
- } catch (IOException e) {
- System.out.println("服务器拒绝连接。");
- e.printStackTrace();
- saveToDB(msg);
- } catch (TimeoutException e) {
- System.out.println("连接服务器超时。");
- e.printStackTrace();
- saveToDB(msg);
- }
- } finally {
- closeChanelAndConnection(connection, channel);
- }

saveToDB就是我们用来将消息保存到数据库中的方法,这里只是为了演示,并未真正实现;
closeChanelAndConnection()用来关闭Connnect和Channel资源,其代码如下:
代码片段1-2:
- private void closeChanelAndConnection(Connection connection, Channel channel) {
- if (channel != null && channel.isOpen()) {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- if (connection != null && connection.isOpen()) {
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }

二、生产者消息发送到交换机
在上文发送消息的代码片段中,我们先创建了交换机、队列并进行了绑定,然后在第17行进行消息的发送。但是如果在第17行代码执行之前,目标交换机被删除或者干脆第17代码写的交换机的名称错误,要发送到的交换机根本不存在。此时RabbitMQ的客户端会抛出一个
ShutdownSignalException类型的异常,我们可以在这个异常处理里面进行同样错误消息的持久化:
代码片段2-1:
- catch (ShutdownSignalException e) {
- System.out.println("交换机故障或者不存在。");
- e.printStackTrace();
- saveToDB(msg);
- }
这是最简单的处理交换机不存在的方法。
正常情况,下生产者客户端向交换机发送消息之后,采用的是发后即忘的方式。也就是说,消息发送后,不管消息有没有到达,生产者都不会管,也不会重试。但是现在我们也需要实现:确保消息一定到达了交换机,没有到达则持久化消息。要实现这一功能,则需要开启通道的手动confirm功能,并且有三种方案:
方案一,消息逐条阻塞confirm
请看下面的代码片段:
第7行:开启channel的确认模式
第11行:调用channel.waitForConfirms()方法,阻塞等待通道确认的消息。这个方法会阻塞当前的线程,它的返回结果是一个boolean值,true代表成功将消息发布到交换机,false代表消息没有发布到交换机,在这里我们还是把它存储到数据库中。
代码片段2-2:
- try {
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
- channel.queueDeclare("test-queue", true, false, false, null);
- com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
- channel.confirmSelect();
- for (int i = 0; i < 10; i++) {
- String newMsg = msg + i;
- channel.basicPublish("direct-exchange", "test-queue", null, newMsg.getBytes("UTF-8"));
- if (channel.waitForConfirms()) {
- System.out.println("发送成功");
- } else {
- System.out.println("消息发送失败");
- saveToDB(newMsg);
- }
- }
- }

方案二,消息批量阻塞confirm
上述进行消息确认的过程,由于是逐条阻塞等待,势必影响消息发送的效率,RabbitMQ还提供了另外一种批量确认的方式,以加快通道confirm模式下消息的处理效率:
代码片段2-3:
- try {
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
- channel.queueDeclare("test-queue", true, false, false, null);
- com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
- channel.confirmSelect();
- for (int i = 0; i < 10; i++) {
- String newMsg = msg + i;
- channel.basicPublish("direct-exchange", "test-queue", null, newMsg.getBytes("UTF-8"));
- }
- channel.waitForConfirmsOrDie();
- }
第10行就是我们调用waitForConfirmsOrDie方法,等待批量的消息确认,在此模式下发送消息的效率的确提高了,但是结果是我们没有办法处理失败的消息,因为无从知道哪些消息发送失败了。
方案三,异步方式confirm
仅仅批量方式阻塞confirm对于RabbitMQ的吞吐量要求来说,还是远远不够的,为了更进一步提升效率,RabbitMQ还支持异步Confirm的方式。这种异步方式,要求先创建一个确认信息的监听器:
代码片段2-4:
- public class MyConfirmListener implements ConfirmListener {
-
- private Map<Long, String> msgMap;
-
- public MyConfirmListener(Map<Long, String> msgMap) {
- this.msgMap = msgMap;
- }
-
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("发送的消息得到了回应");
- System.out.println("ack: deliveryTag = " + deliveryTag + " multiple: " + multiple);
- }
-
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("发送的消息没有得到回应");
- System.out.println("nack: deliveryTag = " + deliveryTag + " multiple: " + multiple);
- if (msgMap != null && msgMap.containsKey(deliveryTag)) {
- saveToDB(msgMap.get(deliveryTag));
- }
- }
- private void saveToDB(String msg) {
- System.out.println("消息保存到数据库成功:" + msg);
- }
- }

可以看到,监听器主要实现了两个方法handleAck和handleNack,这里要先解释下ack和nack的区别:在正常情况下,当RabbitMQ接收到消息后会返回一个ack消息。但是当由于RabbitMQ内存出现错误,它没有接受到消息或是接受到不完整消息时,它会返回一个nack消息。handleAck和handleNack就是用来处理这两种消息的。
handleAck和handleNack具有两个参数,一个参数是deliveryTag,信道在设置成confirm模式后,每个信道会对其发送的消息进行计数,deliveryTag就是计数的数值。deliveryTag也可以理解为消息ID,针对每个通道来说,它是唯一的。第3行的msgMap就是用来存储deliveryTag和消息的对应关系,而如果一旦接受到了nack消息,就可以通过这个编号获取到消息,并保存到数据库中(第19~21行)。multiple是指是否是批量确认的消息,比如deliveryTag是10,而multiple是true的话,表明deliveryTag10之前的消息全部批量confirm完成。
完成了监听器的定义,接下来就需要在发送时使用,使用的代码如下:
代码片段2-5:
- try {
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
- channel.queueDeclare("test-queue", true, false, false, null);
- com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
- channel.confirmSelect();
- HashMap<Long, String> map = new HashMap(32);
- for (int i = 0; i < 10; i++) {
- String newMsg = msg + i;
- channel.basicPublish("direct-exchange", "test-queue", null, newMsg.getBytes("UTF-8"));
- map.put((long) (i + 1), newMsg);
- }
- channel.addConfirmListener(new MyConfirmListener(map));
- }
与批量阻塞的方式确认消息相比,异步确认的方式在性能上具有大的提升,而且还支持错误消息的处理。其实 channel.addConfirmListener方法(第14行),除了接受一个ConfirmListener类型的参数之外,还有另外一个方法,可以接受两个ConfirmCallback参数:
ConfirmListener addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
其本质只不过是使用时定义两个ConfirmCallback类,分别处理ack和nack消息,其内部实现其实也是调用了addConfirmListener(ConfirmListener var1)方法,这可以通过查看其源码看到:
代码片段2-6:
- public ConfirmListener addConfirmListener(final ConfirmCallback ackCallback, final ConfirmCallback nackCallback) {
- ConfirmListener confirmListener = new ConfirmListener() {
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- ackCallback.handle(deliveryTag, multiple);
- }
-
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- nackCallback.handle(deliveryTag, multiple);
- }
- };
- this.addConfirmListener(confirmListener);
- return confirmListener;
- }
三、交换机消息路由到队列
想要弄清楚消息由交换机到队列的的可靠性传输的机制,我们先要看下basicPublish方法完整的定义:
代码片段3-1:
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
在上文的代码中,我们只用了basicPublish包含四个参数的重载的版本,在代码片段3-1中的版本中多了两个参数:mandatory和immediate。
mandatory为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue时,那么会调用basic.return方法将消息返还给生产者;为false时,出现上述情形broker会直接将消息扔掉。
immediate为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。为false时,出现上述情形,broker会直接将消息扔掉。
显然我们之前一直用的四个参数的basicPublish,mandatory和immediate默认值都是false,也就是说一旦交换机找不到目标队列,那么它会把消息给抛弃。而即使所有的队列都没有消费者订阅,消息也会存放到队列中。所以在使用默认的basicPublish和mandatory下,我们是无法确认消息已经准确路由到队列的。如果要达到这个目的,首先要使用basicPublish的全参数版本,而且mandatory设置为true,immediate设置为默认值false即可。
在采用了上述全参数版本的basicPublish之后,我们想要接收到交换机返回来的消息,还需要自定义一个ReturnListener,用来监听消息返回的事件:
- public class MyReturnListener implements ReturnListener {
- @Override
- public void handleReturn(int replyCode, String replyText,
- String exchange, String routingKey,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("以下消息不可达");
- System.out.println("replyCode: " + replyCode);
- System.out.println("replyText: " + replyText);
- System.out.println("exchange: " + exchange);
- System.out.println("routingKey: " + routingKey);
- System.out.println("properties: " + properties);
- System.out.println("body: " + new String(body));
- }
- }
ReturnListener中的handleReturn方法就是用来处理返回消息的回调方法,我们先看下,它的几个参数:
replyCode —— 在消息被返回时,为返回原因定义的错误码
replyText —— 返回原因的描述
exchange —— 交换机名称
routingKey —— 路由键
properties —— 消息附带的属性
body —— 消息内容
自定义完成返回消息的监听器之后,就可以使用它来监听交换机返回的消息了,完整的发送代码如下:
- try {
- try {
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
- channel.queueDeclare("test-queue", true, false, false, null);
- com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
- channel.basicPublish("direct-exchange", "test-queue1", true, false, MessageProperties.PERSISTENT_BASIC, msg.getBytes("UTF-8"));
- channel.addReturnListener(new MyReturnListener());
- } catch (IOException e) {
- System.out.println("服务器拒绝连接。");
- e.printStackTrace();
- saveToDB(msg);
- } catch (TimeoutException e) {
- System.out.println("连接服务器超时。");
- e.printStackTrace();
- saveToDB(msg);
- } catch (ShutdownSignalException e) {
- System.out.println("交换机故障或者不存在。");
- e.printStackTrace();
- saveToDB(msg);
- }
- }

第8行:在使用basicPublish时,第3个参数mandatory设置为了true
第9行:将自定义的监听器添加到了channel
完成了上述的设置之后,如果不出意外,当我们发送消息,如果消息不能到达目标的队列,消息就会被返回,程序就会打印类似下面的提示:
同样的,和addConfirmListener方法类似,addReturnListener也有一个接受ReturnCallback类型参数的版本,它实际上在内部也是使用了自定义的ReturnListener,如下所示:
- public ReturnListener addReturnListener(ReturnCallback returnCallback) {
- ReturnListener returnListener = (replyCode, replyText, exchange, routingKey, properties, body) -> {
- returnCallback.handle(new Return(replyCode, replyText, exchange, routingKey, properties, body));
- };
- this.addReturnListener(returnListener);
- return returnListener;
- }
四、消息发送到消费者
上文中,我们介绍了生产者端几个关键环节的消息的确认或者返还机制。接下来,再学习下消费者端消息的确认机制。我们先来看下,消费者处理消息的方式有哪些:
订阅并消费消息
这是最常用的一个api,主要用来订阅队列上的消息并进行消费,如果有多个消费者订阅了同一个队列,默认采用轮询的方式分别发送给某一个消费者。它还有其它几个重载版本,但是大同小异,感兴趣的童鞋可以去专门研究下。
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException ;
queue —— 队列名称。
autoAck —— 是否自动确认,默认是true,消息者在接受到消息后马上就会进行自动的ack;如果是false,则需要我们调用channel.basicAck手动进行ack
callback —— 装配的Consumer对象用来处理消息。
返回值 —— 返回值是consumerTag,为每个消费者都定义了一个唯一的consumerTag,如果需要取消消息订阅需要用到consumerTag。
取消订阅消息
取消对队列的订阅,取消之后消费者不再获取该队列中的消息。
void basicCancel(String consumerTag) throws IOException;
consumerTag —— basicConsume返回的basicConsume
主动获取消息
并不建议使用这种主动获取消息的方式来读取消息,实际上在basicGet内部所调用的是rabbitmq获取消息的get命令,而这个get命令内部又包含了订阅消息和取消订阅两个过程,也就是说每一次get,都伴随着订阅和取消订阅两个过程,造成了极大的资源浪费。笔者在工作过程中就看到过有开发人员在一个while循环里面用的get方法来获取消息,造成的结果就是占用了大量的CPU,队列一直处于running状态。
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
queue —— 队列名称。
autoAck —— 是否自动确认,默认是true,消息者在接受到消息后马上就会进行自动的ack;如果是false,则需要我们调用channel.basicAck手动进行ack
手动确认消息
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag—— 每个通道投递消息的计数,从1开始递增
multiple——是否是批量处理,如果是true,消费者在确认时会等待若干消息一起确认。
手动拒绝单个消息
使用basicReject可以手动拒绝消息,拒绝后的消息可以选择销毁,也可以选择放回到原来的队列中。但是需要注意,如果消息返回到了原来的队列,在只有这一个消费者的情况下,可能会造成无限循环发送和拒绝。
void basicReject(long deliveryTag, boolean requeue) throws IOException
deliveryTag—— 每个通道投递消息的计数,从1开始递增
requeue —— 是否将消息返回到原队列,true是是,false是否。
手动拒绝批量消息
使用basicNack可以进行批量消息的拒绝。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag—— 每个通道投递消息的计数,从1开始递增
multiple——是否是批量处理,如果是true,消费者在确认时会等待若干消息一起拒绝。
requeue —— 是否将消息返回到原队列,true是是,false是否。
要使用手动的ack,第1步,我们需要在消费者中手动调用basicAck进行确认,如下代码中第26行:
- public class TestConsumer implements Consumer {
- Channel channel = null;
- public TestConsumer(Channel channel) {
- this.channel = channel;
- }
- /**
- * create by: Hyman
- * description: 成功接受到消息后调用该方法
- * create time: 2021/7/6
- */
- @Override
- public void handleConsumeOk(String consumerTag) {
- System.out.println(consumerTag);
- }
- /* 其它方法 */
- /**
- * create by: Hyman
- * description: 处理消息的方法
- * create time: 2021/7/6
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
- String str = new String(bytes);
- Long deliveryTag = envelope.getDeliveryTag();
- System.out.println("TestConsumer —— 接受到的字符串是:" + str + ",deliveryTag是:" + deliveryTag);
- channel.basicAck(deliveryTag, true);
- }
- }

第2步,是在使用basicConsume时设置第2个参数autoAck为false
- Consumer consumer = new TestConsumer(channel);
- String consumerTag = channel.basicConsume("test-queue", false, consumer);
有的朋友可能会有疑问?RabbitMQ中消费者手动ack有什么用,用自动ack不是挺好的吗?试想一下这种场景:假设消费者在处理消息时由于业务非常复杂,处理的效率很低。如果我们进行手动ack,则完全可以做到先处理好业务,最后再去ack,这样在业务处理时,由于上一个消息没有ack,即使队列中有新消息,broker也不会再发送给当前的消费者。这就避免了自动ack时,大量消息堆积到消费者的内存中造成内存的急剧膨胀,甚至最后内存溢出。而且在多个消费者订阅同一个队列时,手动ack还可以做到只把消息发送给闲置的消费者,增加整体消息的处理效率。
五、关于RabbitMQ的事务
因为在介绍RabbitMQ的可靠性传输,这里有必要提一下RabbitMQ的事务。在RabbitMQ中有自己的事务机制,可以通过下面的方法使用事务:
- channel.txSelect();//开启事务
- //发送多个消息
- channel.txCommit();//提交事务
RabbitMQ事务保证的是在开启事务后发送的多条消息一定能够到达broker,一旦其中有消息没有到达,那么该条消息之后的所有消息就不会再发送了,而且前面的消息也会被回滚。但是,这里要划重点,RabbitMQ事务性能很差,开启事务之后会降低2~10倍的吞吐量。
所以,基本上不会在实际中使用RabbitMQ事务。
六、总结:
(1)生产者或者消费者可以捕获IOException 和TimeoutException 来处理连接不到broker的问题,暂时将消息持久化到数据库,然后利用定时任务重试发送。
(2)生产者可以捕获ShutdownSignalException异常来处理找不到交换机的问题。
(3)消息从生产者到交换机可以进行手动的confirm,confirm的方式有三种:单个消息阻塞confirm、批量消息阻塞confirm和异步confirm(ConfirmListener),异步confirm消息的处理效率最高
(4)可以设置消息找不到队列时,交换机将消息返回给生产者,生产者通过添加ReturnListener实例的方式监听返回消息的事件
(5)消费者在获取消息时可以自动ack,也可以手动ack,在消费者处理消息耗时很长的业务场景中,建议使用手动ack。
(6)RabbitMQ有自己的事务,可以保证多条消息一起到达broker或者回滚,但是使用其的性能很低,在实际中基本不会使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。