赞
踩
答案:将去重操作直接放在了消费端,消费端处理消息的业务逻辑保持幂等性。消费者收到消息后,从消息中获取消息标识写入到Redis(分布式锁)或数据库(标识作为表唯一索引插入一条记录),当再次收到该消息时就不作处理。在broker端对Queue加锁(synchronized),Consumer监听的Queue存在已投递但未收到ack且未超时的消息,不允许获取锁,直到该Queue投递的消息全部ack或者消费超时,才允许新的Consumer获取锁,拉取消息。
思考问题:1、为什么不在生产者去重?
2、为什么在消费者做去重?
一、网络波动导致系统A消息发送到RocketMQ后没有收到消息发送超时,系统A重试导致消息重复
1、RocketMQ支持消息查询的功能,消息发送前去RocketMQ查询一下是否已经发送过该条消息,存在则不发送,不存在发送到RocketMQ。在高并发的场景下,每条消息在发送到RocketMQ时都去查询一下,会影响接口的性能。
2、redis分布式锁,在发送消息到RocketMQ成功之后,向redis中插入一条数据,如果发生重试,则先去redis中查询是否存在,存在的话不再发送消息。redis集群此时宕机,再次查询redis判断消息是否已经发送过,无法得到正确结果的。
以上两种方式只是保证只发送了一次,不能保证消费只一次的情况。
二、MQ要保证消息投递的可靠性,对未ack的消息,会重复投递。
场景一:broker发送Consumer超时后重新发送
消费者端要保证消费的幂等性,从消息中获取消息标识写入到Redis或数据库,当再次收到该消息时不作处理。
场景二:负载均衡阶段,前一个监听Queue的消费实例拉取的消息未全部ack,新的消费实例监听到这个Queue重新拉取消息。
在负载均衡结果变化过程增加了一个过渡态,在过渡态的时候,Consumer会继续保留上一次负载均衡的结果,直到原消费者拉取的消息全部ack,才释放老的结果。在broker端对Queue加锁(synchronized),Consumer监听的Queue存在已投递但未收到ack且未超时的消息,不允许获取锁,直到该Queue投递的消息全部ack或者消费超时,才允许新的Consumer获取锁,拉取消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。