当前位置:   article > 正文

RabbitMQ消息队列实战(3)—— RabbitMQ中消息的可靠性传输和确认机制_rabbitmq传递租户id

rabbitmq传递租户id

有过网络编程的经历,或者了解TCP协议的童鞋都知道:TCP协议通过它的ack机制(应答机制)保证了TCP包传输的可靠性,在面试中必问的TCP连接的三次握手和四次挥手就是对该知识点的考验。我们也知道,RabbitMQ是建立在AMQP协议之上的,这个应用层的协议为了保证消息传递的可靠性,也借鉴了TCP的这种思想,构成了自己一套消息ACK机制。

AMQP的ACK实际上包含了两个方面:第一个是在生产者端,当消息到达了指定的交换机,进行一次确认;另外一个是在消费端,消息从队列到了消费者,消费者进行自动或者手动ack。

但是从应用层面上来讲,在确保消息的可靠性这方面,单单靠RabbitMQ的应答机制,还远远不能满足需求。让我们来看下,一个消息从产生到消费的过程中要经过什么环节:

(1)生产者连接RabbitMQ代理

(2)生产者发布消息到交换机

(3)交换机路由消息到队列

(4)消费者连接RabbitMQ代理

(5)消费者从队列获取到消息

上面5个环节中,无论哪个环节出错,导致的最终结果就是生产者发送的消息无法发送到消费者,也就无法完成业务。遇到这种情况,最简单的解决方案就是把出错的消息持久化到数据库中,然后使用定时任务定时轮询去发送,如果超过一定的次数,人工进行介入。这也是在下文中笔者的处理思路。不过本文旨在说明处理方法,不会细化到实现保存出错消息的代码。下面,我们就来分别学习下,针对上述5中不同的场景下出错时,在代码中怎样感知,并进行处理。

一、生产者或者消费者连接不到RabbitMQ代理

首先,我们将上述5个环节中的第1个和第4个环节进行了合并,因为本质上都是由于RabbitMQ的代理发生了宕机或者通信问题,导致生产者或者消费者连接不到服务器所致。

此时,会在我们会在代码中捕获到两种异常:IOException和TimeoutException,IOException是在RabbitMQ的服务器宕机时抛出,而TimeoutException是由于通信原因或者其它原因,连接RabbitMQ服务器超时时抛出,不管是哪种,我们都能在代码中捕获到,并进行下一步的处理:

代码片段1-1:

  1. ConnectionFactory factory = new ConnectionFactory();
  2. factory.setHost(this.rabbitMqHost);
  3. factory.setPort(this.rabbitMqPort);
  4. factory.setConnectionTimeout(this.rabbitMqTimeOut);
  5. factory.setUsername(this.rabbitMqUsername);
  6. factory.setPassword(this.rabbitMqPassword);
  7. factory.setVirtualHost("/");
  8. Connection connection = null;
  9. Channel channel = null;
  10. try {
  11. try {
  12. connection = factory.newConnection();
  13. channel = connection.createChannel();
  14. channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
  15. channel.queueDeclare("test-queue", true, false, false, null);
  16. com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
  17. channel.basicPublish("1direct-exchange", "test-queue", null, msg.getBytes("UTF-8"));
  18. } catch (IOException e) {
  19. System.out.println("服务器拒绝连接。");
  20. e.printStackTrace();
  21. saveToDB(msg);
  22. } catch (TimeoutException e) {
  23. System.out.println("连接服务器超时。");
  24. e.printStackTrace();
  25. saveToDB(msg);
  26. }
  27. } finally {
  28. closeChanelAndConnection(connection, channel);
  29. }

saveToDB就是我们用来将消息保存到数据库中的方法,这里只是为了演示,并未真正实现;

closeChanelAndConnection()用来关闭Connnect和Channel资源,其代码如下:

代码片段1-2:

  1. private void closeChanelAndConnection(Connection connection, Channel channel) {
  2. if (channel != null && channel.isOpen()) {
  3. try {
  4. channel.close();
  5. } catch (IOException e) {
  6. e.printStackTrace();
  7. } catch (TimeoutException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. if (connection != null && connection.isOpen()) {
  12. try {
  13. connection.close();
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }

二、生产者消息发送到交换机

在上文发送消息的代码片段中,我们先创建了交换机、队列并进行了绑定,然后在第17行进行消息的发送。但是如果在第17行代码执行之前,目标交换机被删除或者干脆第17代码写的交换机的名称错误,要发送到的交换机根本不存在。此时RabbitMQ的客户端会抛出一个

ShutdownSignalException类型的异常,我们可以在这个异常处理里面进行同样错误消息的持久化:

代码片段2-1:

  1. catch (ShutdownSignalException e) {
  2. System.out.println("交换机故障或者不存在。");
  3. e.printStackTrace();
  4. saveToDB(msg);
  5. }

这是最简单的处理交换机不存在的方法。

正常情况,下生产者客户端向交换机发送消息之后,采用的是发后即忘的方式。也就是说,消息发送后,不管消息有没有到达,生产者都不会管,也不会重试。但是现在我们也需要实现:确保消息一定到达了交换机,没有到达则持久化消息。要实现这一功能,则需要开启通道的手动confirm功能,并且有三种方案:

  • 方案一,消息逐条阻塞confirm

请看下面的代码片段:

第7行:开启channel的确认模式

第11行:调用channel.waitForConfirms()方法,阻塞等待通道确认的消息。这个方法会阻塞当前的线程,它的返回结果是一个boolean值,true代表成功将消息发布到交换机,false代表消息没有发布到交换机,在这里我们还是把它存储到数据库中。

代码片段2-2:

  1. try {
  2. connection = factory.newConnection();
  3. channel = connection.createChannel();
  4. channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
  5. channel.queueDeclare("test-queue", true, false, false, null);
  6. com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
  7. channel.confirmSelect();
  8. for (int i = 0; i < 10; i++) {
  9. String newMsg = msg + i;
  10. channel.basicPublish("direct-exchange", "test-queue", null, newMsg.getBytes("UTF-8"));
  11. if (channel.waitForConfirms()) {
  12. System.out.println("发送成功");
  13. } else {
  14. System.out.println("消息发送失败");
  15. saveToDB(newMsg);
  16. }
  17. }
  18. }
  • 方案二,消息批量阻塞confirm

上述进行消息确认的过程,由于是逐条阻塞等待,势必影响消息发送的效率,RabbitMQ还提供了另外一种批量确认的方式,以加快通道confirm模式下消息的处理效率:

代码片段2-3:

  1. try {
  2. connection = factory.newConnection();
  3. channel = connection.createChannel();
  4. channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
  5. channel.queueDeclare("test-queue", true, false, false, null);
  6. com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
  7. channel.confirmSelect();
  8. for (int i = 0; i < 10; i++) {
  9. String newMsg = msg + i;
  10. channel.basicPublish("direct-exchange", "test-queue", null, newMsg.getBytes("UTF-8"));
  11. }
  12. channel.waitForConfirmsOrDie();
  13. }

第10行就是我们调用waitForConfirmsOrDie方法,等待批量的消息确认,在此模式下发送消息的效率的确提高了,但是结果是我们没有办法处理失败的消息,因为无从知道哪些消息发送失败了。

  • 方案三,异步方式confirm

仅仅批量方式阻塞confirm对于RabbitMQ的吞吐量要求来说,还是远远不够的,为了更进一步提升效率,RabbitMQ还支持异步Confirm的方式。这种异步方式,要求先创建一个确认信息的监听器:

代码片段2-4:

  1. public class MyConfirmListener implements ConfirmListener {
  2. private Map<Long, String> msgMap;
  3. public MyConfirmListener(Map<Long, String> msgMap) {
  4. this.msgMap = msgMap;
  5. }
  6. @Override
  7. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  8. System.out.println("发送的消息得到了回应");
  9. System.out.println("ack: deliveryTag = " + deliveryTag + " multiple: " + multiple);
  10. }
  11. @Override
  12. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  13. System.out.println("发送的消息没有得到回应");
  14. System.out.println("nack: deliveryTag = " + deliveryTag + " multiple: " + multiple);
  15. if (msgMap != null && msgMap.containsKey(deliveryTag)) {
  16. saveToDB(msgMap.get(deliveryTag));
  17. }
  18. }
  19. private void saveToDB(String msg) {
  20. System.out.println("消息保存到数据库成功:" + msg);
  21. }
  22. }

可以看到,监听器主要实现了两个方法handleAck和handleNack,这里要先解释下ack和nack的区别:在正常情况下,当RabbitMQ接收到消息后会返回一个ack消息。但是当由于RabbitMQ内存出现错误,它没有接受到消息或是接受到不完整消息时,它会返回一个nack消息。handleAck和handleNack就是用来处理这两种消息的。

handleAck和handleNack具有两个参数,一个参数是deliveryTag,信道在设置成confirm模式后,每个信道会对其发送的消息进行计数,deliveryTag就是计数的数值。deliveryTag也可以理解为消息ID,针对每个通道来说,它是唯一的。第3行的msgMap就是用来存储deliveryTag和消息的对应关系,而如果一旦接受到了nack消息,就可以通过这个编号获取到消息,并保存到数据库中(第19~21行)。multiple是指是否是批量确认的消息,比如deliveryTag是10,而multiple是true的话,表明deliveryTag10之前的消息全部批量confirm完成。

完成了监听器的定义,接下来就需要在发送时使用,使用的代码如下:

代码片段2-5:

  1. try {
  2. connection = factory.newConnection();
  3. channel = connection.createChannel();
  4. channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
  5. channel.queueDeclare("test-queue", true, false, false, null);
  6. com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
  7. channel.confirmSelect();
  8. HashMap<Long, String> map = new HashMap(32);
  9. for (int i = 0; i < 10; i++) {
  10. String newMsg = msg + i;
  11. channel.basicPublish("direct-exchange", "test-queue", null, newMsg.getBytes("UTF-8"));
  12. map.put((long) (i + 1), newMsg);
  13. }
  14. channel.addConfirmListener(new MyConfirmListener(map));
  15. }

与批量阻塞的方式确认消息相比,异步确认的方式在性能上具有大的提升,而且还支持错误消息的处理。其实 channel.addConfirmListener方法(第14行),除了接受一个ConfirmListener类型的参数之外,还有另外一个方法,可以接受两个ConfirmCallback参数:

ConfirmListener addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);

其本质只不过是使用时定义两个ConfirmCallback类,分别处理ack和nack消息,其内部实现其实也是调用了addConfirmListener(ConfirmListener var1)方法,这可以通过查看其源码看到:

代码片段2-6:

  1. public ConfirmListener addConfirmListener(final ConfirmCallback ackCallback, final ConfirmCallback nackCallback) {
  2. ConfirmListener confirmListener = new ConfirmListener() {
  3. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  4. ackCallback.handle(deliveryTag, multiple);
  5. }
  6. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  7. nackCallback.handle(deliveryTag, multiple);
  8. }
  9. };
  10. this.addConfirmListener(confirmListener);
  11. return confirmListener;
  12. }

三、交换机消息路由到队列

想要弄清楚消息由交换机到队列的的可靠性传输的机制,我们先要看下basicPublish方法完整的定义:

代码片段3-1:

public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

在上文的代码中,我们只用了basicPublish包含四个参数的重载的版本,在代码片段3-1中的版本中多了两个参数:mandatory和immediate。

  • mandatory为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue时,那么会调用basic.return方法将消息返还给生产者;为false时,出现上述情形broker会直接将消息扔掉。

  • immediate为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。为false时,出现上述情形,broker会直接将消息扔掉。

显然我们之前一直用的四个参数的basicPublish,mandatory和immediate默认值都是false,也就是说一旦交换机找不到目标队列,那么它会把消息给抛弃。而即使所有的队列都没有消费者订阅,消息也会存放到队列中。所以在使用默认的basicPublish和mandatory下,我们是无法确认消息已经准确路由到队列的。如果要达到这个目的,首先要使用basicPublish的全参数版本,而且mandatory设置为true,immediate设置为默认值false即可。

在采用了上述全参数版本的basicPublish之后,我们想要接收到交换机返回来的消息,还需要自定义一个ReturnListener,用来监听消息返回的事件:

  1. public class MyReturnListener implements ReturnListener {
  2. @Override
  3. public void handleReturn(int replyCode, String replyText,
  4. String exchange, String routingKey,
  5. AMQP.BasicProperties properties, byte[] body) throws IOException {
  6. System.out.println("以下消息不可达");
  7. System.out.println("replyCode: " + replyCode);
  8. System.out.println("replyText: " + replyText);
  9. System.out.println("exchange: " + exchange);
  10. System.out.println("routingKey: " + routingKey);
  11. System.out.println("properties: " + properties);
  12. System.out.println("body: " + new String(body));
  13. }
  14. }

ReturnListener中的handleReturn方法就是用来处理返回消息的回调方法,我们先看下,它的几个参数:

  • replyCode —— 在消息被返回时,为返回原因定义的错误码

  • replyText —— 返回原因的描述

  • exchange —— 交换机名称

  • routingKey —— 路由键

  • properties —— 消息附带的属性

  • body —— 消息内容

自定义完成返回消息的监听器之后,就可以使用它来监听交换机返回的消息了,完整的发送代码如下:

  1. try {
  2. try {
  3. connection = factory.newConnection();
  4. channel = connection.createChannel();
  5. channel.exchangeDeclare("direct-exchange", "direct", true, false, null);
  6. channel.queueDeclare("test-queue", true, false, false, null);
  7. com.rabbitmq.client.AMQP.Queue.BindOk ok = channel.queueBind("test-queue", "direct-exchange", "test-queue");
  8. channel.basicPublish("direct-exchange", "test-queue1", true, false, MessageProperties.PERSISTENT_BASIC, msg.getBytes("UTF-8"));
  9. channel.addReturnListener(new MyReturnListener());
  10. } catch (IOException e) {
  11. System.out.println("服务器拒绝连接。");
  12. e.printStackTrace();
  13. saveToDB(msg);
  14. } catch (TimeoutException e) {
  15. System.out.println("连接服务器超时。");
  16. e.printStackTrace();
  17. saveToDB(msg);
  18. } catch (ShutdownSignalException e) {
  19. System.out.println("交换机故障或者不存在。");
  20. e.printStackTrace();
  21. saveToDB(msg);
  22. }
  23. }

第8行:在使用basicPublish时,第3个参数mandatory设置为了true

第9行:将自定义的监听器添加到了channel

完成了上述的设置之后,如果不出意外,当我们发送消息,如果消息不能到达目标的队列,消息就会被返回,程序就会打印类似下面的提示:

同样的,和addConfirmListener方法类似,addReturnListener也有一个接受ReturnCallback类型参数的版本,它实际上在内部也是使用了自定义的ReturnListener,如下所示:

  1. public ReturnListener addReturnListener(ReturnCallback returnCallback) {
  2. ReturnListener returnListener = (replyCode, replyText, exchange, routingKey, properties, body) -> {
  3. returnCallback.handle(new Return(replyCode, replyText, exchange, routingKey, properties, body));
  4. };
  5. this.addReturnListener(returnListener);
  6. return returnListener;
  7. }

四、消息发送到消费者

上文中,我们介绍了生产者端几个关键环节的消息的确认或者返还机制。接下来,再学习下消费者端消息的确认机制。我们先来看下,消费者处理消息的方式有哪些:

  • 订阅并消费消息

这是最常用的一个api,主要用来订阅队列上的消息并进行消费,如果有多个消费者订阅了同一个队列,默认采用轮询的方式分别发送给某一个消费者。它还有其它几个重载版本,但是大同小异,感兴趣的童鞋可以去专门研究下。

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException ;

queue —— 队列名称。

autoAck —— 是否自动确认,默认是true,消息者在接受到消息后马上就会进行自动的ack;如果是false,则需要我们调用channel.basicAck手动进行ack

callback —— 装配的Consumer对象用来处理消息。

返回值 —— 返回值是consumerTag,为每个消费者都定义了一个唯一的consumerTag,如果需要取消消息订阅需要用到consumerTag。

  • 取消订阅消息

取消对队列的订阅,取消之后消费者不再获取该队列中的消息。

void basicCancel(String consumerTag) throws IOException;

consumerTag —— basicConsume返回的basicConsume

  • 主动获取消息

并不建议使用这种主动获取消息的方式来读取消息,实际上在basicGet内部所调用的是rabbitmq获取消息的get命令,而这个get命令内部又包含了订阅消息和取消订阅两个过程,也就是说每一次get,都伴随着订阅和取消订阅两个过程,造成了极大的资源浪费。笔者在工作过程中就看到过有开发人员在一个while循环里面用的get方法来获取消息,造成的结果就是占用了大量的CPU,队列一直处于running状态。

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

queue —— 队列名称。

autoAck —— 是否自动确认,默认是true,消息者在接受到消息后马上就会进行自动的ack;如果是false,则需要我们调用channel.basicAck手动进行ack

  • 手动确认消息

 void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag—— 每个通道投递消息的计数,从1开始递增

multiple——是否是批量处理,如果是true,消费者在确认时会等待若干消息一起确认。

  • 手动拒绝单个消息

使用basicReject可以手动拒绝消息,拒绝后的消息可以选择销毁,也可以选择放回到原来的队列中。但是需要注意,如果消息返回到了原来的队列,在只有这一个消费者的情况下,可能会造成无限循环发送和拒绝。

void basicReject(long deliveryTag, boolean requeue) throws IOException

deliveryTag—— 每个通道投递消息的计数,从1开始递增

requeue —— 是否将消息返回到原队列,true是是,false是否。

  • 手动拒绝批量消息

使用basicNack可以进行批量消息的拒绝。

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

deliveryTag—— 每个通道投递消息的计数,从1开始递增

multiple——是否是批量处理,如果是true,消费者在确认时会等待若干消息一起拒绝。

requeue —— 是否将消息返回到原队列,true是是,false是否。

要使用手动的ack,第1步,我们需要在消费者中手动调用basicAck进行确认,如下代码中第26行:

  1. public class TestConsumer implements Consumer {
  2. Channel channel = null;
  3. public TestConsumer(Channel channel) {
  4. this.channel = channel;
  5. }
  6. /**
  7. * create by: Hyman
  8. * description: 成功接受到消息后调用该方法
  9. * create time: 2021/7/6
  10. */
  11. @Override
  12. public void handleConsumeOk(String consumerTag) {
  13. System.out.println(consumerTag);
  14. }
  15. /* 其它方法 */
  16. /**
  17. * create by: Hyman
  18. * description: 处理消息的方法
  19. * create time: 2021/7/6
  20. */
  21. @Override
  22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
  23. String str = new String(bytes);
  24. Long deliveryTag = envelope.getDeliveryTag();
  25. System.out.println("TestConsumer —— 接受到的字符串是:" + str + ",deliveryTag是:" + deliveryTag);
  26. channel.basicAck(deliveryTag, true);
  27. }
  28. }

第2步,是在使用basicConsume时设置第2个参数autoAck为false

  1. Consumer consumer = new TestConsumer(channel);
  2. String consumerTag = channel.basicConsume("test-queue", false, consumer);

有的朋友可能会有疑问?RabbitMQ中消费者手动ack有什么用,用自动ack不是挺好的吗?试想一下这种场景:假设消费者在处理消息时由于业务非常复杂,处理的效率很低。如果我们进行手动ack,则完全可以做到先处理好业务,最后再去ack,这样在业务处理时,由于上一个消息没有ack,即使队列中有新消息,broker也不会再发送给当前的消费者。这就避免了自动ack时,大量消息堆积到消费者的内存中造成内存的急剧膨胀,甚至最后内存溢出。而且在多个消费者订阅同一个队列时,手动ack还可以做到只把消息发送给闲置的消费者,增加整体消息的处理效率。

五、关于RabbitMQ的事务

因为在介绍RabbitMQ的可靠性传输,这里有必要提一下RabbitMQ的事务。在RabbitMQ中有自己的事务机制,可以通过下面的方法使用事务:

  1. channel.txSelect();//开启事务
  2. //发送多个消息
  3. channel.txCommit();//提交事务

RabbitMQ事务保证的是在开启事务后发送的多条消息一定能够到达broker,一旦其中有消息没有到达,那么该条消息之后的所有消息就不会再发送了,而且前面的消息也会被回滚。但是,这里要划重点,RabbitMQ事务性能很差,开启事务之后会降低2~10倍的吞吐量。

所以,基本上不会在实际中使用RabbitMQ事务。

六、总结:

(1)生产者或者消费者可以捕获IOException 和TimeoutException 来处理连接不到broker的问题,暂时将消息持久化到数据库,然后利用定时任务重试发送。

(2)生产者可以捕获ShutdownSignalException异常来处理找不到交换机的问题。

(3)消息从生产者到交换机可以进行手动的confirm,confirm的方式有三种:单个消息阻塞confirm、批量消息阻塞confirm和异步confirm(ConfirmListener),异步confirm消息的处理效率最高

(4)可以设置消息找不到队列时,交换机将消息返回给生产者,生产者通过添加ReturnListener实例的方式监听返回消息的事件

(5)消费者在获取消息时可以自动ack,也可以手动ack,在消费者处理消息耗时很长的业务场景中,建议使用手动ack。

(6)RabbitMQ有自己的事务,可以保证多条消息一起到达broker或者回滚,但是使用其的性能很低,在实际中基本不会使用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/939050
推荐阅读
相关标签
  

闽ICP备14008679号