赞
踩
当生产端的生产效率大于消费端的消费效率,就会造成消息处理不完的情况。
排查方式:RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?_rocketmq监控指标_阿里云云原生的博客-CSDN博客
处理方案:
消费端水平扩容,
1、如果Topic下的分配的队列足够多,集群中新增Consummer节点来消费。最极限的情况是把Consumer的节点个数设置成跟队列的个数相同。超过的情况没有意义,一个consumer同步消费一条队列。
2、如果Topic下的分配的队列不够的情况。
2-1、新增一个topic,并且分配更多的队列,并且把旧的topic的消息转存到新的Topic中,新增更多的Consumer进行消费队列(Consumer数量==队列的数量)。
2-2、旧的topic,可以再分配一个新的consumer去消费。
原因:
1、发送时消息重复: 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
2、投递时消息重复: 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
3、负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启): 当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
解决方案:
1、使用数据库的唯一约束实现幂等,比如对于数据插入类的场景,比如创建订单,因为订单号肯定是唯一的,所以如果是多次调用就会触发数据库的唯一约束异常,从而避免一个请求创建多个订单的问题。
2、使用redis里面提供的setNX指令,比如对于MQ消费的场景,为了避免MQ重复消费导致数据多次被修改的问题,可以在接受到MQ的消息时,把这个消息通过setNx写入到redis里面,一旦这个消息被消费过,就不会再次消费。
producer端(发送端):
1、producer端由默认的异步机制改为实时的同步机制,producer端就可以实时知道消息的发送结果。
2、可以实现异步回调来监听消息发送的结果,如果发送失败,可以在回调中重试。
3、 使用producer的重试机制,没发送成功就再发送一次。
broker端(存储端):
1、 异步批量刷盘机制,按照一定消息量和时间间隔去刷盘。这个是由操作系统本身去决定,如果刷盘之前系统崩溃了,才会导致消息丢失。针对这种崩溃场景:需要通过partition的副本机制和ack机制来解决。
Broker 一般集群部署,有主节点和从节点。消息到 Broker 存储端,只有主节点和从节点都写入成功,才反馈成功ack 给生产者。这就同步复制,它保证了消息不丢失,但降低了系统吞吐量。与之对应即异步复制,只要消息写入主节点成功,就返回成功ack,它速度快,但会有性能问题。
cusumer端(消费端):
消费者执行完业务逻辑,再反馈会 Broker 说消费成功,这样才可以保证消费阶段不丢消息,调整offset即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。