当前位置:   article > 正文

【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式_publisher confirm机制

publisher confirm机制

这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

目录

一、消息队列

1.1、发布确认模式

1.2、案例代码

(1)引入依赖

(2)编写生产者【消息确认--单条确认】

(3)编写生产者【消息确认--批量确认】

(4)编写生产者【消息确认--异步确认】


一、消息队列

1.1、发布确认模式

RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???

RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。

发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。

1.2、案例代码

(1)引入依赖

  1. <!-- 引入 RabbitMQ 依赖 -->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.16.0</version>
  6. </dependency>

(2)编写生产者【消息确认--单条确认】

  • 生产者发送消息的时候,需要调用【confirmSelect()】方法开启消息确认机制。
  • 生产者将消息发送完成之后,需要调用【waitForConfirms()】方法,阻塞等待RabbitMQ消息队列返回ACK标识。这个方法返回一个boolean类型,true表示RabbitMQ接收消息成功,false表示接收失败。
  • 【waitForConfirms()】方法还可以指定一个超时时间,如果在这个超时时间里面RabbitMQ还没有返回ACK标识,那么该方法将抛出一个InterruptedException中断异常。
  1. package com.rabbitmq.demo.confirm;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. /**
  6. * @version 1.0.0
  7. * @Date: 2023/2/25 16:23
  8. * @Copyright (C) ZhuYouBin
  9. * @Description: 消息生产者
  10. */
  11. public class Producer {
  12. public static void main(String[] args) {
  13. // 1、创建连接工厂
  14. ConnectionFactory factory = new ConnectionFactory();
  15. // 2、设置连接的 RabbitMQ 服务地址
  16. factory.setHost("127.0.0.1"); // 默认就是本机
  17. factory.setPort(5672); // 默认就是 5672 端口
  18. // 3、获取连接
  19. Connection connection = null; // 连接
  20. Channel channel = null; // 通道
  21. try {
  22. connection = factory.newConnection();
  23. // 4、获取通道
  24. channel = connection.createChannel();
  25. // TODO 开启消息确认机制
  26. channel.confirmSelect();
  27. // 5、声明 Exchange,如果不存在,则会创建
  28. String exchangeName = "exchange_direct_2023";
  29. channel.exchangeDeclare(exchangeName, "direct");
  30. // 6、发送消息
  31. for (int i = 0; i < 10; i++) {
  32. // 路由键唯一标识
  33. String routingKey = "error";
  34. if (i % 3 == 0) {
  35. routingKey = "info";
  36. } else if (i % 3 == 1) {
  37. routingKey = "warn";
  38. }
  39. String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
  40. channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  41. // 等待RabbitMQ返回ACK标识
  42. boolean wait = channel.waitForConfirms();
  43. System.out.println("RabbitMQ是否接收成功: " + wait);
  44. if (!wait) {
  45. // 消息发送失败,则可以重新发送
  46. channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  47. }
  48. }
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. } finally {
  52. if (null != channel) {
  53. try {
  54. channel.close();
  55. } catch (Exception e) {}
  56. }
  57. if (null != connection) {
  58. try {
  59. connection.close();
  60. } catch (Exception e) {}
  61. }
  62. }
  63. }
  64. }

(3)编写生产者【消息确认--批量确认】

  • 前一种方式,是一条消息就调用一次【waitForConfirms()】方法,阻塞等待RabbitMQ的ACK确认标识。
  • 但是这种方式是非常耗时的,当需要发送的消息非常多的时候,会严重影响系统性能,所以为了解决这个问题,提出了批量确认的方法。
  • 批量确认调用【waitForConfirmsOrDie()】方法,此时会等待一批消息的ACK确认标识,如果这一批消息中存在一个消息没有被RabbitMQ成功接收,此时该方法将抛出一个【IOException】异常。
  • 所以,可以通过捕获IOException异常来判断消息是否发送成功。
  • 这种方式的缺点:当一批消息出现失败的情况时候,我们没办法知道是哪一条消息失败了,只能够重新将这一批消息重新发送。
  1. package com.rabbitmq.demo.confirm;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. /**
  7. * @version 1.0.0
  8. * @Date: 2023/2/25 16:23
  9. * @Copyright (C) ZhuYouBin
  10. * @Description: 消息生产者
  11. */
  12. public class ProducerBatch {
  13. public static void main(String[] args) {
  14. // 1、创建连接工厂
  15. ConnectionFactory factory = new ConnectionFactory();
  16. // 2、设置连接的 RabbitMQ 服务地址
  17. factory.setHost("127.0.0.1"); // 默认就是本机
  18. factory.setPort(5672); // 默认就是 5672 端口
  19. // 3、获取连接
  20. Connection connection = null; // 连接
  21. Channel channel = null; // 通道
  22. try {
  23. connection = factory.newConnection();
  24. // 4、获取通道
  25. channel = connection.createChannel();
  26. // TODO 开启消息确认机制
  27. channel.confirmSelect();
  28. // 5、声明 Exchange,如果不存在,则会创建
  29. String exchangeName = "exchange_direct_2023";
  30. channel.exchangeDeclare(exchangeName, "direct");
  31. // 6、发送消息
  32. int batchSize = 3;
  33. int count = 0;
  34. for (int i = 0; i < 10; i++) {
  35. // 路由键唯一标识
  36. String routingKey = "error";
  37. if (i % 3 == 0) {
  38. routingKey = "info";
  39. } else if (i % 3 == 1) {
  40. routingKey = "warn";
  41. }
  42. String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
  43. channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  44. // 批量确认
  45. if (count == batchSize) {
  46. // 等待RabbitMQ返回ACK标识
  47. channel.waitForConfirmsOrDie();
  48. count = 0;
  49. }
  50. count++;
  51. }
  52. } catch (IOException e) {
  53. System.out.println("消息发送失败啦");
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. } finally {
  57. if (null != channel) {
  58. try {
  59. channel.close();
  60. } catch (Exception e) {}
  61. }
  62. if (null != connection) {
  63. try {
  64. connection.close();
  65. } catch (Exception e) {}
  66. }
  67. }
  68. }
  69. }

(4)编写生产者【消息确认--异步确认】

  • 异步确认在消息发送之后,调用【addConfirmListener()】方法,该方法介绍两个参数,第一个参数是成功接收到ACK标识的回调方法,第二个参数是失败接收到NACK标识的回调方法。
  • 注意:一定要先调用【addConfirmListener()】监听方法,然后再发送消息,如果两者顺序反了,则监听方法不生效。
  1. package com.rabbitmq.demo.confirm;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.ConfirmCallback;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import java.io.IOException;
  7. /**
  8. * @version 1.0.0
  9. * @Date: 2023/2/25 16:23
  10. * @Copyright (C) ZhuYouBin
  11. * @Description: 消息生产者
  12. */
  13. public class ProducerAsync {
  14. public static void main(String[] args) {
  15. // 1、创建连接工厂
  16. ConnectionFactory factory = new ConnectionFactory();
  17. // 2、设置连接的 RabbitMQ 服务地址
  18. factory.setHost("127.0.0.1"); // 默认就是本机
  19. factory.setPort(5672); // 默认就是 5672 端口
  20. // 3、获取连接
  21. Connection connection = null; // 连接
  22. Channel channel = null; // 通道
  23. try {
  24. connection = factory.newConnection();
  25. // 4、获取通道
  26. channel = connection.createChannel();
  27. // TODO 开启消息确认机制
  28. channel.confirmSelect();
  29. // 5、声明 Exchange,如果不存在,则会创建
  30. String exchangeName = "exchange_confirm_2023";
  31. channel.exchangeDeclare(exchangeName, "direct");
  32. // TODO 一定要先调用监听接口,在发送消息
  33. channel.addConfirmListener(new ConfirmCallback() {
  34. @Override
  35. public void handle(long deliveryTag, boolean multiple) throws IOException {
  36. System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag
  37. + ",批量发送多条消息multiple=" + multiple);
  38. }
  39. }, new ConfirmCallback() {
  40. @Override
  41. public void handle(long deliveryTag, boolean multiple) throws IOException {
  42. System.out.println("RabbitMQ接收失败啦.....");
  43. }
  44. });
  45. for (int i = 0; i < 10; i++) {
  46. // 6、发送消息
  47. String message = "这是发布确认模式,发送的消息数据";
  48. channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());
  49. }
  50. } catch (IOException e) {
  51. System.out.println("消息发送失败啦");
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. } finally {
  55. if (null != channel) {
  56. try {
  57. channel.close();
  58. } catch (Exception e) {}
  59. }
  60. if (null != connection) {
  61. try {
  62. connection.close();
  63. } catch (Exception e) {}
  64. }
  65. }
  66. }
  67. }

到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。

综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/798554
推荐阅读
相关标签
  

闽ICP备14008679号