赞
踩
前面的几个章节我们学习了rabbitmq的消息确认和消息回退机制, 还有备份exchange功能
今天来聊聊幂等, 优先级队列和惰性队列
类似于mysql
的一致性, 支付了一个商品的价格, 如果商品购买失败, 再次购买该商品, 购买成功, 不会出现扣了两次钱买了一件商品的事情
在回答这个问题之前, 我们需要了解, 什么是消息送达?
对于rabbitmq来说消息送达, 是将消息送到消费者queue中
对于程序员来说, 消息送达是消息投递到消费者手中
消费者和生产者会出现重复消息的问题, 但是问题只需要在消费者这边处理
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息
rabbitmq
怎么做到幂等?很简单, 对每个消息添加唯一标识, 比如uuid
等
消费者消费消息前, 将其添加到ConcurrentHashMap<String, Integer>()
其中key
是uuid
, value
是数量, 数量一般是 1
在消息进入消费者前, 我们执行ConcurrentHashMap#putIfAbsent
代码, 消息put
成功一次之后, 我们才去执行, 如果put
发现已经有元素, 那么直接丢弃消息, 说明消费者已经消费了我们的消息
问: 进程崩溃, 保存在内存的
ConcurrentHashMap
也将消失, 到时候照样会出现消息重复消费答: 我们还可以考虑使用
redis
的setnx
命令
跟上面的步骤差不多, 但是使用到redis
的setnx
命令, 保证原子性, 这样即便消费端重复发送消息, 生产者也只会根据messageId
消费一次
需要注意锁的范围, 是针对整个分布式系统呢, 还是针对单个消费者个体
使用redis做幂等引入了新的问题, 那就是redis标记了某个消息已处理, 但是消费者准备执行业务时, 挂了, 我们应该怎么办?
redis
说你消息A的相关业务已经处理过了
但实际上消费者只不过是标记了消息A已处理, 跟消息A相关的业务还没执行, 消费者挂了
在消费者重启后, 会自动加载queue
中的消息A, 然后被redis
拦截并拒绝掉
被拒绝掉的消息最后会走死信队列
咋办? 凉拌, 也许可以选前面的ConcurrentHashMap
方案, 干脆直接在内存中保存着吧
问: 使用
ConcurrenthashMap
不也会导致消息被重新消费吗?答: 确实, 内存中的记录丢失之后, 同样的消息还是会继续投递过来, 即便业务已经执行过了, 但是在
ack
函数没有被执行前, 都将在queue
中保存着消息, 消费者重启后, 消息还是会被继续投递
队列繁忙的时候, 消息堆积, 一些重要的消息需要优先投递, 此时消息优先级
说到消息的优先级, 我们需要知道队列才是对不同优先级的message
进行排序的场所. 因为队列可以存储很多message
, 也就能够很方便对不同优先级的message
进行排序
RabbitMQ
队列默认情况下不支持优先级。创建优先级队列时,开发人员可以选择认为合适的最大优先级。一般优先级最好为 1~10, 使用更多优先级将消耗更多的 CPU 资源
没有priority
属性的消息其优先级被视为 0。优先级高于队列最大值的消息将被视为以最大优先级发布。
优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能
非优先级队列发送优先级消息是不会排序的,所以向非优先级队列发送优先级是没有任何作用的
在普通队列中消息过期就立即被删除, 在优先级队列中, 消息过期时, 如果被fetch
, 那么就不会被删除
如果配置了queue
的最大值, 如果queue
满了, 不论高优先级的消息还是低优先级的消息, 都会被丢失, 但正常情况应该是低优先级的消息会被丢失, 而不是高优先级
需要配置Qos
为1, 也就是prefetch
为1, 如果你不配置, 那么队列中的消息会一下子都给消费者读取出来, 预期值默认好像是150还是250, 也就是说他会一下子读那么多个消息, 而消息队列可能来不及排序
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE)
.maxPriority(10)
.build();
}
消费端:
public class Consumer { public static final String EXCHANGE_NAME = "exchange_name"; public static final String QUEUE = "queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { System.err.println("消息已响应, deliveryTag: " + deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) { System.err.println("消息未响应, deliveryTag: " + deliveryTag); } }); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); // HashMap<String, Object> arguments = new HashMap<>(); // arguments.put("x-max-priority", 10); Queue queue = QueueBuilder.durable(QUEUE).maxPriority(10).build(); channel.queueDeclare(queue.getName(), true, false, false, queue.getArguments()); channel.queueBind(QUEUE, EXCHANGE_NAME, ""); channel.basicConsume(QUEUE, false, (consumerTag, message) -> { try { System.err.println("messageContent: " + new String(message.getBody(), Charset.defaultCharset()) + ", priority: " + message.getProperties().getPriority()); } catch(Exception e) { // 省略日志记录: 时间, 消息内容和错误信息等 channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, consumerTag -> System.err.println("consumerTag: " + consumerTag)); } }
核心代码:
channel.basicQos(1);
// HashMap<String, Object> arguments = new HashMap<>();
// arguments.put("x-max-priority", 10);
Queue queue = QueueBuilder.durable(QUEUE).maxPriority(10).build();
生产端:
public class Producer { public static final String EXCHANGE_NAME = "exchange_name"; public static void main(String[] args) throws Exception { ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); for (int i = 100; i > 0; i--) { var priority = i % 11; var props = new AMQP.BasicProperties().builder().priority(priority); channel.basicPublish(EXCHANGE_NAME, "", props.build(), ("message: " + i).getBytes(Charset.defaultCharset())); } } }
效果:
// 省略部分打印
messageContent: message: 100, priority: 1
messageContent: message: 98, priority: 10
messageContent: message: 10, priority: 10
messageContent: message: 97, priority: 9
messageContent: message: 86, priority: 9
messageContent: message: 46, priority: 2
messageContent: message: 2, priority: 2
messageContent: message: 89, priority: 1
messageContent: message: 1, priority: 1
messageContent: message: 33, priority: 0
messageContent: message: 22, priority: 0
如果你使用的是spring boot 项目那么qos应该这么配置:
server:
port: 8081
spring:
rabbitmq:
host: 127.0.0.1
virtual-host: /
username: zhazha
password: 123456
listener:
simple:
acknowledge-mode: manual
prefetch: 1
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中
能够支持更长的队列,即支持更多的消息存储。
当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
// HashMap<String, Object> arguments = new HashMap<>();
// arguments.put("x-queue-mode", "lazy");
Queue queue = QueueBuilder.durable(QUEUE)
.lazy().build();
channel.queueDeclare(queue.getName(), true, false, false, queue.getArguments());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。