当前位置:   article > 正文

RabbitMQ 学习教程(三)消息确认机制 ACK_rabbitmq autoack

rabbitmq autoack

1. 代码优化

在这讲正式之前,咱们先把之前的生产者、消费者代码,简单优化下吧

看之前的代码:生产者、消费者的连接和关闭都是一样的代码,那么,我们就可以将相同的代码提取出来,放在一个工具类里面,然后在相应的地方用工具类进行替换。

提取一个工具类 RabbitMqUtil,用来处理连接、关闭功能:

public class RabbitMqUtil {

    // 私有构造
    private RabbitMqUtil() {}

    // 获取连接
    public static Connection getConnection(String name) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        Connection conn = connectionFactory.newConnection(name);
        return conn;
    }

    // 关闭信道、连接
    public static void close(Connection conn, Channel channel) throws Exception{
        if (null != channel && channel.isOpen()) {
            channel.close();
        }
        if (null != conn && conn.isOpen()) {
            conn.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

生产者代码修改为:

public class Producer {

    public static void main(String[] args) {
        // 1. 获取连接
        Connection connection = null;
        try {
            connection = RabbitMqUtil.getConnection("生产者");
        } catch (Exception e) {
            System.out.println("获取连接时,出现异常");
        }

        Channel channel = null;
        try {
            // 2. 通过连接获取通道 Channel
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";
            // 3. 通过通道创建声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 4. 准备消息内容
            String message = "Hello RabbitMQ";
            // 5. 发送消息给队列 Queue
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送完成~~~发送的消息为:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                RabbitMqUtil.close(connection, channel);
            } catch (Exception e) {
                System.out.println("关闭时,出现异常");
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

消费者代码为:

public class Consumer {

    public static void main(String[] args) {

        Connection connection = null;
        try {
            connection = RabbitMqUtil.getConnection("消费者");
        } catch (Exception e) {
            System.out.println("获取连接异常");
        }

        Channel channel = null;
        try {
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";

            // 定义消费者
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 交换机
                    String exchange = envelope.getExchange();
                    // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    // body 消息体
                    String msg = new String(body,"utf-8");
                    System.out.println("收到消息:" + msg);
                }
            };
            // 监听队列。自动确认
            channel.basicConsume(queueName, true, consumer);

            System.out.println("开始接收消息~~~");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                RabbitMqUtil.close(connection, channel);
            } catch (Exception e) {
                System.out.println("关闭时,出现异常");
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

咱们再来回顾下 消费者中的 channel.basicConsume() 方法,它有三个参数,分别为:

  1. queueName:队列名称
  2. autoAck:设置是否自动确认
  3. callback:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息

以上的示例中,咱们是将自动确认参数设置true,那么,这就会导致一个现象:消息一旦被消费者接收,队列中的消息就会被删除。

问题:RabbitMQ 怎么知道消息被消费者接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?总之:如果消费消息失败了,RabbitMQ 无从得知,这样消息就丢失了!

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制 ACK:当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。

不过这种回执 ACK 分两种情况:

  • 自动 ACK:autoAck 设置为 true。RabbitMQ 会自动把发送出去的消息置为确认,然后从内存或磁盘中删除,而不管消费者是否真正地消费了这些消息
  • 手动 ACK:autoAck 设置为 false。RabbitMQ 会等待消费者显示地回复确认信号后才从内存或磁盘中移去消息

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程挂掉后消息丢失的问题。因为,RabbitMQ 会一直等待持有消息,直到消费者显示调用 Basic.Ack 命令为止。

并且,当 autoAck 参数设置为 false 后,对于 RabbitMQ 而言,队列中的消息分成了两部分:一是等待投递给消费者的消息;二是已经投递给消费者,但还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者。

对于以上的两种回执 ACK,如何选择哪一种,这得看消息的重要性了(建议选择手动确认):

  1. 如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便
  2. 如果消息非常重要,不容丢失。那么,最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

2. 手动 ACK

将自动 ACK 修改为手动 ACK,只需要修改消费者的代码:

public class Consumer {

    public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");

        final Channel channel = connection.createChannel();
        String queueName = "code_simple_queue1";

        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 消息体
                String msg = new String(body,"utf-8");
                System.out.println("收到消息:" + msg);
                /**
                 * @param1:deliveryTag:用来标识消息的id
                 * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
                 */
                // 手动确认
                channel.basicAck(deliveryTag, false);
            }
        };

        // 监听队列  手动 ACK
        channel.basicConsume(queueName, false, consumer);

        System.out.println("开始接收消息~~~");
        System.in.read();

        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

主要作出一下两点变化:

  1. channel.basicConsume() 方法的第二个参数修改为 false
  2. 在消费者中,必须得调用 channel.basicAck() 方法,进行手动确认,

修改后,发现上述的消费者依旧可以消费消息

3. 自动 ACK 带来的问题

问题:那么,自动 ACK 会带来什么问题呢?接下来我们通过实例进行演示一下

我们修改下自动 ACK 的消费者代码:在 handleDelivery() 方法中手动添加异常代码,让其抛出异常,这样,就中断了正在执行的 handleDelivery() 方法:

channel = connection.createChannel();
String queueName = "code_simple_queue1";

// 定义消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		// 抛出异常,后续代码都不会执行
	    int result = 1 / 0;
	    
	    // 交换机
	    String exchange = envelope.getExchange();
	    // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
	    long deliveryTag = envelope.getDeliveryTag();
	    // body 消息体
	    String msg = new String(body,"utf-8");
	    System.out.println("收到消息:" + msg);
	}
};
// 监听队列
channel.basicConsume(queueName, true, consumer);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

上述的 handleDelivery() 方法确实会抛出异常,但它又被 ConsumerDispatcher#handleDelivery() 方法捕获并进行处理了:

public void handleDelivery(final Consumer delegate, final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
    this.executeUnlessShuttingDown(new Runnable() {
        public void run() {
            try {
                delegate.handleDelivery(consumerTag, envelope, properties, body);
            } catch (Throwable var2) {
                ConsumerDispatcher.this.connection.getExceptionHandler().handleConsumerException(ConsumerDispatcher.this.channel, var2, delegate, consumerTag, "handleDelivery");
            }

        }
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

然后,运行生产者,去生产消息,这时,得注意消息的总数。再运行消费者程序,发现它会抛出异常并被处理了,然而,异常后面的代码都没有被执行,但刚才生产的消息却被消费了。实际上消费者还没有去真正地消费消息。

但将这行异常代码添加到手动 ACK 中的代码中去,发现这条消息并没有被消费!!

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号