当前位置:   article > 正文

java使用rocketMQ实现幂等性_rocketmq如何保证消息幂等性

rocketmq如何保证消息幂等性

今天记录下java使用rocketMQ实现幂等性
1、标识请求唯一性:orderFlag 作为请求的唯一标识符,确保该标识符在每次请求中都是唯一的,这样可以避免重复请求。

2、提前检查幂等性:在发送消息之前,可以先检查该请求是否已经被处理,如果已经处理,就不再发送消息。这样可以减轻消息队列和数据库的压力。

3、消费端的幂等性处理:在消息队列的消费端,也需要考虑幂等性。即使请求在生产者端保持幂等,消费端也可能多次处理相同的消息。你可以在消费端根据请求标识符来检查消息是否已经处理,如果已经处理,就不再执行订单生成和保存操作。

4、消息队列的事务性:你已经在发送消息时使用了事务消息,这是一种确保消息不会被重复消费的方式。确保消息队列的消费者端也可以正确处理这些事务消息。

发送消息

 		/**
         * 生成一个标识,当消息队列的消息没有处理完成后
         * 前端根据这个查询订单是否生成
         */
        //当前时间戳
        String orderFlag = UUID.randomUUID().toString();
        /**
         * 发送信息到消息队列
         */
        JSONObject jsonObject = new JSONObject();
        //解决幂等性问题
        jsonObject.put("txNo", orderFlag);
        jsonObject.put("orderFlag", orderFlag);
        String str = jsonObject.toJSONString();
        //生成消息
        Message<String> message = MessageBuilder.withPayload(str).build();
        try {
            //发送一条事务消息,消息是不可消费的
            log.info("提交发送MQ开始参数{}", message);
            rocketMQTemplate.sendMessageInTransaction("commit_virtual_order_producer_group", RocketmqConstants.COMMIT_VIRTUAL_ORDER_TOPIC_T, message, null);
        } catch (Exception e) {
            LogUtil.error("发送事务消息失败", e);
            redisUtil.del(cacheName);
            throw new ServiceException(BizExceptionEnum.SERVER_ERROR);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

消费者监听

@Component
@RocketMQMessageListener(consumerGroup = "commit_virtual_order_consumer_group", topic = RocketmqConstants.COMMIT_VIRTUAL_ORDER_TOPIC_T)
public class CommitVirtualOrderConsumerListener implements RocketMQListener<String> {
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private OrderVirtualFormService orderVirtualFormService;

    @Override
    public void onMessage(String s) {
        //解析消息
        JSONObject jsonObject = JSONObject.parseObject(s);
        //事务号
        String txNo = jsonObject.getString("txNo");

        //幂等性判断,防止重复消费
        String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNo;
        if (redisUtil.hasKey(cacheKey)) {
            //调用下单逻辑
            orderVirtualFormService.commitVirtualOrder(jsonObject);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

生产者监听

@RocketMQTransactionListener(txProducerGroup = "commit_virtual_order_producer_group")
public class CommitVirtualOrderProducerListener implements RocketMQLocalTransactionListener {
    @Autowired
    private RedisUtil redisUtil;

    /**
     * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
     * @param message
     * @param o
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            JSONObject jsonObject = messageToObj(message);
            //事务号
            String txNo = jsonObject.getString("txNo");
            //幂等判断
            String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNo;
            if (redisUtil.hasKey(cacheKey)) {
                //自动向mq发送commit请求,将mq中的该消息改为可消费
                return RocketMQLocalTransactionState.COMMIT;
            }
            //redis中添加幂等操作标识
            redisUtil.set(cacheKey, cacheKey, 86400L);
            //自动向mq发送commit请求,将mq中的该消息改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            LogUtil.info("提交虚拟订单消息回滚");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 事务状态回查
     * @param message
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        JSONObject jsonObject = messageToObj(message);
        //事务号
        String txNo = jsonObject.getString("txNo");
        //幂等判断
        String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNo;
        if (redisUtil.hasKey(cacheKey)) {
            //自动向mq发送commit请求,将mq中的该消息改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    /**
     * message转成对象
     * @param message
     * @return
     */
    private JSONObject messageToObj(Message message){
        String str = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(str);
        return jsonObject;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

消费者逻辑主要代码

 //事务号
        String txNoStr = jsonObjectParam.getString("txNo");
        //消息队列的处理参数
        String orderFlag = jsonObjectParam.getString("orderFlag");
        //提交订单传参
        VirtualOrderCommitParam virtualOrderCommitParam = JSON.parseObject(jsonObjectParam.getString("virtualOrderCommitParam"), VirtualOrderCommitParam.class);

        //幂等性判断
        TxNo txNo = txNoMapper.selectById(txNoStr);
        if (txNo != null) {
            return ErrorResponseData.error("已经处理过了");
        }
        //添加事务记录,用于幂等(在这里保存是防止库存多减)
        TxNo result = new TxNo();
        result.setTxNo(txNoStr);
        txNoMapper.insert(result);
        //删除掉幂等性标识
        String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNoStr;
        redisUtil.del(cacheKey);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

消费者消费后删除幂等性标识,

最后,这只是通过mq消息处理,后续可优化在发消息之前判断改标识是否存在,存在可不要再发,减轻服务器压力,
还有更多方案选择,例如使用分布式锁,前端传入唯一标识,redis的setNx,如果前端传入的数据有唯一标识字段,可以通过插入数据库唯一处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/930078
推荐阅读
相关标签
  

闽ICP备14008679号