赞
踩
如果消息的重复消费对业务有影响,那么就需要对消息进行幂等处理,下面介绍消息幂等的概念、场景和处理方法。
在数学和计算机科学中,幂等运算可以多次应用而不改变初始应用后的结果。在消息队列服务中,幂等性用于处理相同消息的重复消费。消费者重复消费一条消息,最终消费结果与初次消费结果相同,重复消费不会对业务系统造成负面影响。
例如:消费者根据扣款信息扣减订单货款,付款金额为100元,但由于网络问题,消息重复发送给消费者。结果就是消息被重复消费,但是只扣了一次货款,订单只有一次100元的扣款记录。该例子在消息消费过程中实现了消息幂等性,扣费满足业务需求。
在互联网应用中,尤其网络较差的情况下,RabbitMQ消息可能会被重复消费,如果消息的重复消费对业务有影响,可以对消息进行幂等处理,以下场景可能会重复消费消息:
可靠性是指消息在MQ中传输会发生消息丢失问题,若是涉及金钱相关的业务可能会造成巨大损失,一般发生消息丢失会存在以下三种情况
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
//可以进行重发等操作
} else {
log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
}
}
}
@Slf4j @Configuration public class RabbitMqConfig { @Bean public ConfirmCallbackService confirmCallbackService() { return new ConfirmCallbackService(); } @Bean public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); /** * 消费者确认收到消息后,手动ack回执回调处理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService()); return rabbitTemplate; } //其他配置代码 ...... }
@Bean(QUEUE_IOT_TOIN)
public Queue createIotQueue() {
return new Queue(QUEUE_IOT_TOIN, true);
}
public void sendToUploadMsg(Object obj, String routingKey) {
try {
String jsonString = JSON.toJSONString(obj);
rabbitTemplate.convertAndSend(EXCHANGE_IOT, routingKey, jsonString, message -> {
//设置该条消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}, new CorrelationData(UUIDUtil.generate()));
} catch (Exception e) {
log.info(routingKey + "发送消息异常!");
}
}
spring: rabbitmq: addresses: 127.0.0.1 port: 5672 username: guest password: guest # 发送者开启 confirm 确认机制 publisher-confirm-type: correlated # 发送者开启 return 确认机制 publisher-returns: true listener: simple: concurrency: 10 max-concurrency: 10 prefetch: 1 auto-startup: true default-requeue-rejected: true # 设置消费端手动 ack acknowledge-mode: manual # 是否支持重试 retry: enabled: true
@RabbitHandler public void handlerMq(String msg, Channel channel, Message message) throws IOException { try { //业务处理代码 ...... //手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理失败,拒绝再次接收...", e); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理...", e); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
在生产中经常会有一些类似报表系统这样的系统,需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析,通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ 中,再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。
在这个过程中,可能会有对某个订单的增删改操作,比如有三条 binlog 执行顺序是增加、修改、删除;消费者愣是换了顺序给执行成删除、修改、增加,这样能行吗?肯定是不行的。
对于 RabbitMQ 来说,导致上面顺序错乱的原因通常是消费者是集群部署,不同的消费者消费到了同一订单的不同的消息,如消费者 A 执行了增加,消费者 B 执行了修改,消费者 C 执行了删除,但是消费者 C 执行比消费者 B 快,消费者 B 又比消费者 A 快,就会导致消费 binlog 执行到数据库的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。
如下图是 RabbitMQ 可能出现顺序错乱的问题示意图:
RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。
如下图是 RabbitMQ 保证消息顺序性的方案:
https://blog.csdn.net/zw791029369/article/details/109561457
https://xie.infoq.cn/article/c84491a814f99c7b9965732b1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。