赞
踩
发送端确认机制
,确认消息是否发送消息返回机制
,确认消息被正确路由消费端限流机制
,限制消息推送速度,保障接收端服务稳定消费端确认机制
,确认消息被正确处理消息过期时间
,防止消息大量积压死信队列
,收集过期消息吗,以供分析1.1 问题分析:消息真的发出去了吗?
消息发送后,发送端不知道RabbitMQ是否真的收到了消息
若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
需要使用RabbitMQ发送端确认机制
,确认消息发送
1.2 什么是发送端确认机制
1.3 三种确认机制
1.4 单条同步确认机制的实现方法
配置channel,开启确认模式:channel.confirmSelect()
每发送一条消息,调用channel.waitForConfirms()
方法,返回布尔值,等待确认
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
//开启确认模式
channel.confirmSelect();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm OK");
} else {
log.info("RabbitMQ confirm Failed");
}
}
1.5 多条同步确认机制的实现方法:不太推荐
channel.confirmSelect()
channel.waitForConfirms()
方法,返回布尔值,等待确认 ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
}
注意:调用channel.waitForConfirms()
方法,返回布尔值,如果为true时,则前面发送的多条消息发送成功,如果为false时,则前面发送的多条消息部分成功,部分不成功,而不是所有的消息都不成功。
1.6 异步确认机制的实现方法:不太推荐,并发时存在线程安全问题
channel.confirmSelect()
addConfirmListener
,发送消息后,会回调此方法,通知是否发送成功ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { //3.异步同步实现机制 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { //成功后调用handleAck //deliveryTag 发送端的消息序号 就是发送了消息的条数 //multiple是否多条 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { log.info("Ack, deliveryTag: {}, multiple: {}", deliveryTag, multiple); } //失败后调用handleNack @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { log.info("Nack, deliveryTag: {}, multiple: {}", deliveryTag, multiple); } }); String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes()); }
确认机制的流程图:
2.1 问题分析:消息发送后,消息真的被路由了吗?
消息返回机制
,确认消息被正确路由2.2 消息返回机制的原理是什么?
2.3 消息返回机制的开启方法
在RabbitMQ基础配置中有一个关键配置项:Mandatory
若Mandatory
为false,RabbitMQ讲直接丢弃无法路由的消息
若Mandatory
为true,RabbitMQ才会处理无法路由的消息,handleReturn
方法会被调用
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
//开启消息返回机制
channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { channel.addReturnListener(new ReturnListener() { //replyCode 状态信息 //replyText 回复的信息 //exchange 交换机 //routingKey 路由键 //AMQP.BasicProperties properties 消息的原数据 //body 消息的内容 @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { log.info("Message Return: " + "replyCode:{}, replyText:{}, exchange:{}, routingKey:{}, properties:{}, body:{}", replyCode, replyText, exchange, routingKey, properties, new String(body)); //除了打印log,可以加别的业务操作 } }); String messageToSend = objectMapper.writeValueAsString(orderMessageDTO); channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes()); }
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.addReturnListener(new ReturnCallback() {
@Override
public void handle(Return returnMessage) {
log.info("Message Return: returnMessage{}", returnMessage);
//除了打印log,可以加别的业务操作
}
});
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());
注意:channel.addReturnListener(new ReturnListener() )
和channel.addReturnListener(new ReturnCallback())
的方法其实本质一致,但是前一种的入参较多,而后一种可以通过returnMessage.getXXX获取前一种方法的参数
3.1 问题分析:消息发送后,消费端处理异常怎么办?
消费端确认机制
,确认消息被正确处理3.2 消费端ACK类型
自动ACK
:消费端收到消息后,会自动签收消息手动ACK
:消费端收到消息后,不回自动签收消息,需要我们在业务代码中显式签收消息3.3 手动ACK类型
单条手动ACK
:multiple=false
多条手动ACK
:multiple=true
推荐使用单条ACK
3.4 重回队列
1.修改channel.basicConsume
中的autoAck
为false
channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
});
2.签收消息
//1.签收条数 是否多条
void basicAck(long deliveryTag, boolean multiple) throws IOException;
DeliverCallback deliverCallback = (consumerTag, message) -> {
//对消息进行手动签收
if (message.getEnvelope().getDeliveryTag()%10 == 0){
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
}
3.手动拒收,重回队列:不太推荐
@Bean
Channel rabbitChannel() throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
//单条
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
//多条条
if (message.getEnvelope().getDeliveryTag()%10 == 0){
channel.basicNack(message.getEnvelope().getDeliveryTag(),true,true);
}
4.1 问题分析:消息发送后,消息量过大,消费端处理的过来吗?
消费端限流机制
,限制消息推送速度,保障接收端服务稳定4.2 RabbitMQ-Qos服务质量保证
AutoACK=false
4.3 Qos原理
4.4 消费端限流机制参数设置
prefetchCount
:针对一个消费端最多推送多少未确认消息
gloal
:
prefetchSize:0
:单个消息大小限制,一般为0
prefetchSize与gloal两项,RabbitMQ暂时未实现
channel.basicQos(5);
@Async public void handleMessage() throws IOException, TimeoutException, InterruptedException { log.info("start linstening message"); channel.exchangeDeclare( "exchange.order.restaurant", BuiltinExchangeType.DIRECT, true, false, null); channel.queueDeclare( "queue.restaurant", true, false, false, null); channel.queueBind( "queue.restaurant", "exchange.order.restaurant", "key.restaurant"); channel.basicQos(5); channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> { }); while (true) { Thread.sleep(100000); } }
不单独使用,与死信机制联合使用
5.1问题分析:队列爆满怎么办?
默认情况下,消息进入队列,会永远存在,直到被消费
大量堆积的消息会给RabbitMQ产生很大的压力
需要使用RabbitMQ消息过期时间
,防止消息大量积压
5.2RabbitMQ的过期时间(TTL)
RabbitMQ的过期时间称为TTL(Time to Live),生存时间
RabbitMQ的过期时间分为消息TTL
和队列TTL
消息TTL
设置了单条消息的过期时间
队列TTL
设置了队列中所有的消息过期时间
5.3如何找到适合自己的TTL
TTL设置主要考虑技术架构与业务
TTL应该明显长于服务的平均重启时间
建议TTL长于业务高峰期时间
注意:TTL这里不推荐单独使用,会导致误删消息,后面会与死信一起使用
1.设置单条消息的过期时间
//1.单条同步确认机制 实现
channel.confirmSelect();
//设置过期时间
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
log.info("message sent");
if (channel.waitForConfirms()) {
log.info("RabbitMQ confirm OK");
} else {
log.info("RabbitMQ confirm Failed");
}
2.设置队列的过期时间:
如果队列已经存在,添加了队列的过期时间,需要删除之前的队列,重启服务
@Async public void handleMessage() throws IOException, TimeoutException, InterruptedException { log.info("start linstening message"); channel.exchangeDeclare( "exchange.order.restaurant", BuiltinExchangeType.DIRECT, true, false, null); //设置队列的过期时间 Map<String, Object> args=new HashMap<>(16); args.put("x-message-ttl",15000); channel.queueDeclare( "queue.restaurant", true, false, false, args); channel.queueBind( "queue.restaurant", "exchange.order.restaurant", "key.restaurant"); channel.basicQos(5); channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> { }); while (true) { Thread.sleep(100000); } }
//设置队列的过期时间
Map<String, Object> args=new HashMap<>(16);
//设置队列中消息的过期时间
args.put("x-message-ttl",15000);
//设置队列的过期时间 不太推荐使用
args.put("x-expire",15000);
6.1 问题分析:如何转移过期消息
死信队列
,收集过期消息吗,以供分析6.2 什么是死信队列
死信队列:队列被配置了DLX属性(Dead-Letter- Exchange)
当一个消息变成死信(dead message)后,能重新北发不到另一个Exchange,这个Exchange也是一个普通的交换机
死信被死信交换机路由后,一般进入一个固定的队列
描述:当生产者发送信息,被Exchange
交换机路由到Queue
队列中,但是这个消息在队列中放置的时间过长,超过了设置的TTL
之后,就会变成死信,死信会被重新转发到一个DL Exchange
中,DL Exchange
也是一个普通的Exchange
,只是我们给他起了一个DL标识
而已,DL Exchange
会把消息路由到DL Queue
,这个Queue
就会被人工的监听或者专门处理异常的程序所监听。这个过程就叫做死信的转移。
6.3 怎么变成死信呢?
消息被拒绝(reject/nack)并且requeue=false
//手动拒收
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
消息过期(TTL到期)
//设置队列中消息的过期时间
args.put("x-message-ttl",15000);
队列达到最大长度
//设置队列的最大长度
args.put("x-max-length",5);
6.4 死信队列设置方法
设置转发、接收死信的交换机和队列
在需要设置死信的队列加入参数:
x-dead-letter-exchange=dlx.exchange
@Async public void handleMessage() throws IOException, TimeoutException, InterruptedException { log.info("start linstening message"); //声明死信交换机 channel.exchangeDeclare("exchange.dlx", BuiltinExchangeType.TOPIC, true, false, null); //声明死信队列 channel.queueDeclare("queue.dlx", true, false, false, null); //绑定exchange和queue channel.queueBind("queue.dlx","exchange.dlx","#"); channel.exchangeDeclare( "exchange.order.restaurant", BuiltinExchangeType.DIRECT, true, false, null); //设置队列的过期时间 Map<String, Object> args=new HashMap<>(16); //设置队列中消息的过期时间 args.put("x-message-ttl",15000); //设置队列的过期时间 // args.put("x-expire",15000); //设置死信队列 args.put("x-dead-letter-exchange","exchange.dlx"); channel.queueDeclare( "queue.restaurant", true, false, false, args); channel.queueBind( "queue.restaurant", "exchange.order.restaurant", "key.restaurant"); //设置消费端限流机制 channel.basicQos(5); channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> { }); while (true) { Thread.sleep(100000); } }
声明死信交换机、队列、绑定exchange和queue
//声明死信队列交换机
channel.exchangeDeclare("exchange.dlx",
BuiltinExchangeType.TOPIC,
true,
false,
null);
//声明死信队列
channel.queueDeclare("queue.dlx",
true,
false,
false,
null);
//绑定exchange和queue
channel.queueBind("queue.dlx","exchange.dlx","#");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。