赞
踩
发布确认是解决消息不丢失的重要环节,在设置队列持久化、消息持久化的基础上,设置发布确认,一旦生产者投递消息之后,如果Broker接收到消息,会给生产者一个应答。
生产者进行接收应答,用来确认这条消息是否正常发送到Broker。生产者也可以根据收没有收到这条消息的应答进行相应的处理。
发布者确认在通道级别使用confirmSelect方法启用
Channel channel = connection.createChannel();
channel.confirmSelect();
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
- 异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但涉及到正确的实现,相对复杂
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Producer
* @description : [生产者]
* @createTime : [2023/2/1 9:38]
* @updateUser : [WangWei]
* @updateTime : [2023/2/1 9:38]
* @updateRemark : [描述说明本次修改内容]
*/
public class Producer {
private static final String QUEUE_NAME = "Confirm";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//建立连接
RabbitMQUtils.getConnection();
//声明通道
Channel channel = RabbitMQUtils.getChannel();
//开启确认模式
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
long start = System.currentTimeMillis();
//循环发送2条消息
for (int i = 0; i <200 ; i++) {
String msg="消息确认模式消息:"+i;
/*推送消息
*交换机命名,不填写使用默认的交换机
* routingKey -路由键-
* props:消息的其他属性-路由头等正文
* msg消息正文
*/
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
// uses a 5 second timeout
//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。
channel.waitForConfirmsOrDie(5_000);
}
long end = System.currentTimeMillis();
System.out.println("发送200条消息使用时间:"+(end-start));
}
}
waitForConfirmsOrDie(long)方法等待其确认。该方法在确认消息后立即返回。如果消息在超时内没有得到确认,或者消息被nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重新尝试发送该消息。
这种技术非常简单,但也有一个主要缺点:它大大减慢了发布速度,因为对消息的确认会阻塞所有后续消息的发布。这种方法交付的吞吐量不会超过每秒发布的几百条消息。
发布一批消息并等待整个批消息被确认。以100个为例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Producer
* @description : [生产者]
* @createTime : [2023/2/1 9:38]
* @updateUser : [WangWei]
* @updateTime : [2023/2/1 9:38]
* @updateRemark : [描述说明本次修改内容]
*/
public class Producer2 {
private static final String QUEUE_NAME = "Confirm";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//用于记录每发布100条消息进行一次确认
int batchSize = 100;
int outstandingMessageCount = 0;
//建立连接
RabbitMQUtils.getConnection();
//声明通道
Channel channel = RabbitMQUtils.getChannel();
//开启确认模式
channel.confirmSelect();
//声明持久化队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
long start = System.currentTimeMillis();
//循环发送200条消息
for (int i = 0; i <200 ; i++) {
String msg="消息确认模式消息:"+i;
/*推送消息
*交换机命名,不填写使用默认的交换机
* routingKey -路由键-
* props:消息的其他属性-路由头等正文
* msg消息正文
*/
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
outstandingMessageCount++;
//每100次确认一次
if (outstandingMessageCount == batchSize) {
// uses a 5 second timeout
//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。
channel.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5_000);
}
long end = System.currentTimeMillis();
System.out.println("发送200条消息使用时间:"+(end-start));
}
}
与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,最多可提高20-30倍)。一个缺点是,在失败的情况下,我们无法确切地知道哪里出了问题,因此我们可能不得不在内存中保留整个批处理,以记录有意义的内容或重新发布消息。而且这种解决方案仍然是同步的,因此它会阻止消息的发布。
异步处理发行商确认通常需要以下步骤:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Producer
* @description : [生产者]
* @createTime : [2023/2/1 9:38]
* @updateUser : [WangWei]
* @updateTime : [2023/2/1 9:38]
* @updateRemark : [描述说明本次修改内容]
*/
public class Producer3 {
private static final String QUEUE_NAME = "Confirm";
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
RabbitMQUtils.getConnection();
//声明通道
Channel channel = RabbitMQUtils.getChannel();
//开启确认模式
channel.confirmSelect();
//用于保存序列号与消息之前映射的容器
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
//清理映射关系的回调
//multiple 一个布尔值,如果为false则说明只有一条消息被确认或者丢失,如果为true这表示有小于或等于sequenceNumber的消息被确认或者丢失。
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
//如果包含值为true则返回该映射的部分视图,其键值小于或等于,sequenceNumber
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
//移除所有key和value
confirmed.clear();
} else {
//移除序列号对象的消息
outstandingConfirms.remove(sequenceNumber);
}
};
//两个回调一个用于确认消息,另一个用于nack-ed消息(可以被代理视为丢失的消息)。
//sequenceNumber用于标识确认的消息或者丢失的消息
//
// channel.addConfirmListener((sequenceNumber,multiple)->{
// String body = outstandingConfirms.get(sequenceNumber);
// System.out.println("ack message:"+body);
// },(sequenceNumber,multiple)->{
// //当消息被丢失时。。。。
// String body = outstandingConfirms.get(sequenceNumber);
// System.out.println("no ack message:"+body);
// });
//当消息缺失的回调
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.out.println("没有确认的消息:"+body);
//重新使用回调来清理映射
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
//声明持久化队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
long start = System.nanoTime();
//循环发送2条消息
for (int i = 0; i <200 ; i++) {
String msg="消息确认模式消息:"+i;
//使用map将发布序列号与消息的字符串体关联起来
//channel.getNextPublishSeqNo()获取下一个消息的序列号
outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);
/*推送消息
*交换机命名,不填写使用默认的交换机
* routingKey -路由键-
* props:消息的其他属性-路由头等正文
* msg消息正文
*/
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
System.out.println(msg+"发布成功");
}
long end = System.nanoTime();
System.out.println("发送200条消息使用时间:"+ Duration.ofNanos(end - start).toMillis());
}
}
示例使用ConcurrentNavigableMap来跟踪未完成的确认。这种数据结构的方便有以下几个原因。
它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地将条目清理到给定的序列id(以处理多个确认/nack)。
最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,应该与发布线程保持不同。
除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发散列映射和变量来跟踪发布序列的下界,但它们通常更复杂,
从相应的回调中重新发布nack-ed消息可能很诱人,但应该避免这种情况,因为确认回调是在I/O线程中分派的,其中通道不应该执行操作。更好的解决方案是将消息放入由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类可以很好地在确认回调和发布线程之间传输消息。
在某些应用程序中,确保发布的消息到达代理非常重要。发布者确认RabbitMQ的特性可以帮助满足这一要求。发布者确认本质上是异步的,但也可以同步地处理它们。没有确定的方法来实现发布者确认,这通常归结于应用程序和整个系统中的约束。典型的技术有:
单独发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但是实现较为复杂
1.从官网学习rabbitmq的使用,严格落实先宏观,后微观,结合思维导图和三遍读书法.
2.学习的过程中,锤炼英文阅读,动手查字典和利用工具.
3. 理论和实践结合,理论学习完成之后,及时进行实践包括先代码实现,阶段总结等等.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。