赞
踩
这篇文章,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
目录
RabbitMQ消息队列中,生产者发送消息给RabbitMQ的时候,可能会出现发送失败的情况,如果不进行处理,此时这一条消息就将丢失。如何确保生产者一定能够将消息发送到RabbitMQ里面呢???
RabbitMQ提出了一种发布确认模式,这种模式大致思想是:生产者发送消息给RabbitMQ时候,如果RabbitMQ正确接收到消息后,需要发给一个ACK标识给生产者,生产者接收到ACK标记后,就可以确认这一条消息发送成功啦。如果生产者没有接收到ACK标识,则可以重复发送这一条消息给RabbitMQ,这就可以确保消息不丢失。
发布确认模式有三种实现,分别是:逐条确认机制、批量确认机制、异步确认机制。
- <!-- 引入 RabbitMQ 依赖 -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.16.0</version>
- </dependency>
- package com.rabbitmq.demo.confirm;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- * @version 1.0.0
- * @Date: 2023/2/25 16:23
- * @Copyright (C) ZhuYouBin
- * @Description: 消息生产者
- */
- public class Producer {
- public static void main(String[] args) {
- // 1、创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 2、设置连接的 RabbitMQ 服务地址
- factory.setHost("127.0.0.1"); // 默认就是本机
- factory.setPort(5672); // 默认就是 5672 端口
- // 3、获取连接
- Connection connection = null; // 连接
- Channel channel = null; // 通道
- try {
- connection = factory.newConnection();
- // 4、获取通道
- channel = connection.createChannel();
- // TODO 开启消息确认机制
- channel.confirmSelect();
- // 5、声明 Exchange,如果不存在,则会创建
- String exchangeName = "exchange_direct_2023";
- channel.exchangeDeclare(exchangeName, "direct");
- // 6、发送消息
- for (int i = 0; i < 10; i++) {
- // 路由键唯一标识
- String routingKey = "error";
- if (i % 3 == 0) {
- routingKey = "info";
- } else if (i % 3 == 1) {
- routingKey = "warn";
- }
- String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
- channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
- // 等待RabbitMQ返回ACK标识
- boolean wait = channel.waitForConfirms();
- System.out.println("RabbitMQ是否接收成功: " + wait);
- if (!wait) {
- // 消息发送失败,则可以重新发送
- channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (null != channel) {
- try {
- channel.close();
- } catch (Exception e) {}
- }
- if (null != connection) {
- try {
- connection.close();
- } catch (Exception e) {}
- }
- }
- }
- }
- package com.rabbitmq.demo.confirm;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
-
- /**
- * @version 1.0.0
- * @Date: 2023/2/25 16:23
- * @Copyright (C) ZhuYouBin
- * @Description: 消息生产者
- */
- public class ProducerBatch {
- public static void main(String[] args) {
- // 1、创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 2、设置连接的 RabbitMQ 服务地址
- factory.setHost("127.0.0.1"); // 默认就是本机
- factory.setPort(5672); // 默认就是 5672 端口
- // 3、获取连接
- Connection connection = null; // 连接
- Channel channel = null; // 通道
- try {
- connection = factory.newConnection();
- // 4、获取通道
- channel = connection.createChannel();
- // TODO 开启消息确认机制
- channel.confirmSelect();
- // 5、声明 Exchange,如果不存在,则会创建
- String exchangeName = "exchange_direct_2023";
- channel.exchangeDeclare(exchangeName, "direct");
- // 6、发送消息
- int batchSize = 3;
- int count = 0;
- for (int i = 0; i < 10; i++) {
- // 路由键唯一标识
- String routingKey = "error";
- if (i % 3 == 0) {
- routingKey = "info";
- } else if (i % 3 == 1) {
- routingKey = "warn";
- }
- String message = "这是发布确认模式,发送的第【" + (i+1) + "】条【" + routingKey + "】消息数据";
- channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
- // 批量确认
- if (count == batchSize) {
- // 等待RabbitMQ返回ACK标识
- channel.waitForConfirmsOrDie();
- count = 0;
- }
- count++;
- }
- } catch (IOException e) {
- System.out.println("消息发送失败啦");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (null != channel) {
- try {
- channel.close();
- } catch (Exception e) {}
- }
- if (null != connection) {
- try {
- connection.close();
- } catch (Exception e) {}
- }
- }
- }
- }
- package com.rabbitmq.demo.confirm;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConfirmCallback;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
-
- /**
- * @version 1.0.0
- * @Date: 2023/2/25 16:23
- * @Copyright (C) ZhuYouBin
- * @Description: 消息生产者
- */
- public class ProducerAsync {
- public static void main(String[] args) {
- // 1、创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 2、设置连接的 RabbitMQ 服务地址
- factory.setHost("127.0.0.1"); // 默认就是本机
- factory.setPort(5672); // 默认就是 5672 端口
- // 3、获取连接
- Connection connection = null; // 连接
- Channel channel = null; // 通道
- try {
- connection = factory.newConnection();
- // 4、获取通道
- channel = connection.createChannel();
- // TODO 开启消息确认机制
- channel.confirmSelect();
- // 5、声明 Exchange,如果不存在,则会创建
- String exchangeName = "exchange_confirm_2023";
- channel.exchangeDeclare(exchangeName, "direct");
- // TODO 一定要先调用监听接口,在发送消息
- channel.addConfirmListener(new ConfirmCallback() {
- @Override
- public void handle(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("RabbitMQ接收成功啦.....消息的标识deliveryTag=" + deliveryTag
- + ",批量发送多条消息multiple=" + multiple);
- }
- }, new ConfirmCallback() {
- @Override
- public void handle(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("RabbitMQ接收失败啦.....");
- }
- });
- for (int i = 0; i < 10; i++) {
- // 6、发送消息
- String message = "这是发布确认模式,发送的消息数据";
- channel.basicPublish(exchangeName, "queue_confirm_2023", null, message.getBytes());
- }
- } catch (IOException e) {
- System.out.println("消息发送失败啦");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (null != channel) {
- try {
- channel.close();
- } catch (Exception e) {}
- }
- if (null != connection) {
- try {
- connection.close();
- } catch (Exception e) {}
- }
- }
- }
- }
到此,RabbitMQ消息队列中的发布确认模式就介绍完啦。
综上,这篇文章结束了,主要接收消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。