赞
踩
今天记录下java使用rocketMQ实现幂等性
1、标识请求唯一性:orderFlag 作为请求的唯一标识符,确保该标识符在每次请求中都是唯一的,这样可以避免重复请求。
2、提前检查幂等性:在发送消息之前,可以先检查该请求是否已经被处理,如果已经处理,就不再发送消息。这样可以减轻消息队列和数据库的压力。
3、消费端的幂等性处理:在消息队列的消费端,也需要考虑幂等性。即使请求在生产者端保持幂等,消费端也可能多次处理相同的消息。你可以在消费端根据请求标识符来检查消息是否已经处理,如果已经处理,就不再执行订单生成和保存操作。
4、消息队列的事务性:你已经在发送消息时使用了事务消息,这是一种确保消息不会被重复消费的方式。确保消息队列的消费者端也可以正确处理这些事务消息。
发送消息
/** * 生成一个标识,当消息队列的消息没有处理完成后 * 前端根据这个查询订单是否生成 */ //当前时间戳 String orderFlag = UUID.randomUUID().toString(); /** * 发送信息到消息队列 */ JSONObject jsonObject = new JSONObject(); //解决幂等性问题 jsonObject.put("txNo", orderFlag); jsonObject.put("orderFlag", orderFlag); String str = jsonObject.toJSONString(); //生成消息 Message<String> message = MessageBuilder.withPayload(str).build(); try { //发送一条事务消息,消息是不可消费的 log.info("提交发送MQ开始参数{}", message); rocketMQTemplate.sendMessageInTransaction("commit_virtual_order_producer_group", RocketmqConstants.COMMIT_VIRTUAL_ORDER_TOPIC_T, message, null); } catch (Exception e) { LogUtil.error("发送事务消息失败", e); redisUtil.del(cacheName); throw new ServiceException(BizExceptionEnum.SERVER_ERROR); }
消费者监听
@Component @RocketMQMessageListener(consumerGroup = "commit_virtual_order_consumer_group", topic = RocketmqConstants.COMMIT_VIRTUAL_ORDER_TOPIC_T) public class CommitVirtualOrderConsumerListener implements RocketMQListener<String> { @Autowired private RedisUtil redisUtil; @Autowired private OrderVirtualFormService orderVirtualFormService; @Override public void onMessage(String s) { //解析消息 JSONObject jsonObject = JSONObject.parseObject(s); //事务号 String txNo = jsonObject.getString("txNo"); //幂等性判断,防止重复消费 String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNo; if (redisUtil.hasKey(cacheKey)) { //调用下单逻辑 orderVirtualFormService.commitVirtualOrder(jsonObject); } } }
生产者监听
@RocketMQTransactionListener(txProducerGroup = "commit_virtual_order_producer_group") public class CommitVirtualOrderProducerListener implements RocketMQLocalTransactionListener { @Autowired private RedisUtil redisUtil; /** * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调 * @param message * @param o * @return */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { JSONObject jsonObject = messageToObj(message); //事务号 String txNo = jsonObject.getString("txNo"); //幂等判断 String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNo; if (redisUtil.hasKey(cacheKey)) { //自动向mq发送commit请求,将mq中的该消息改为可消费 return RocketMQLocalTransactionState.COMMIT; } //redis中添加幂等操作标识 redisUtil.set(cacheKey, cacheKey, 86400L); //自动向mq发送commit请求,将mq中的该消息改为可消费 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); LogUtil.info("提交虚拟订单消息回滚"); return RocketMQLocalTransactionState.ROLLBACK; } } /** * 事务状态回查 * @param message * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { JSONObject jsonObject = messageToObj(message); //事务号 String txNo = jsonObject.getString("txNo"); //幂等判断 String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNo; if (redisUtil.hasKey(cacheKey)) { //自动向mq发送commit请求,将mq中的该消息改为可消费 return RocketMQLocalTransactionState.COMMIT; }else { return RocketMQLocalTransactionState.UNKNOWN; } } /** * message转成对象 * @param message * @return */ private JSONObject messageToObj(Message message){ String str = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(str); return jsonObject; } }
消费者逻辑主要代码
//事务号 String txNoStr = jsonObjectParam.getString("txNo"); //消息队列的处理参数 String orderFlag = jsonObjectParam.getString("orderFlag"); //提交订单传参 VirtualOrderCommitParam virtualOrderCommitParam = JSON.parseObject(jsonObjectParam.getString("virtualOrderCommitParam"), VirtualOrderCommitParam.class); //幂等性判断 TxNo txNo = txNoMapper.selectById(txNoStr); if (txNo != null) { return ErrorResponseData.error("已经处理过了"); } //添加事务记录,用于幂等(在这里保存是防止库存多减) TxNo result = new TxNo(); result.setTxNo(txNoStr); txNoMapper.insert(result); //删除掉幂等性标识 String cacheKey = CacheConstants.CACHE_KEY_COMMIT_VIRTUAL_ORDER_NO + txNoStr; redisUtil.del(cacheKey);
消费者消费后删除幂等性标识,
最后,这只是通过mq消息处理,后续可优化在发消息之前判断改标识是否存在,存在可不要再发,减轻服务器压力,
还有更多方案选择,例如使用分布式锁,前端传入唯一标识,redis的setNx,如果前端传入的数据有唯一标识字段,可以通过插入数据库唯一处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。