赞
踩
前言
RocketMQTemplate
是 RocketMQ 在 Spring 中的集成组件,它提供了 Spring 风格的消息发送接口,方便与 Spring 应用程序集成,并且支持 Spring Boot 的自动配置特性。
DefaultMQProducer
是 RocketMQ 的原生生产者,直接使用 RocketMQ 提供的 API 来发送消息,不依赖于 Spring 框架。
下面是SpringBoot整合RocketMq的流程~~
引入依赖
<!--rocketmq相关-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
yml配置
# RocketMq相关配置 rocketmq: producer: # 发送同一类消息的设置为同一个group,保证唯一 group: springboot_producer_group # 发送消息超时时间,默认3000 sendMessageTimeout: 10000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 异步消息重试此处,默认2 retryTimesWhenSendAsyncFailed: 2 # 消息最大长度,默认1024 * 1024 * 4(默认4M) maxMessageSize: 4096 # 压缩消息阈值,默认4k(1024 * 4) compressMessageBodyThreshold: 4096 # 是否在内部发送失败时重试另一个broker,默认false retryNextServer: false name-server: 127.0.0.1:9876 consumer: group: springboot_consumer_group # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值 pull-batch-size: 10 consume-thread-min: 1 consume-thread-max: 1
定义生产者和消费者
生产者发送消息
@RequestMapping("/mq")
@RestController
public class TestRocketMqController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendMsg")
public String testSendMsg(@RequestParam("msg") String msg){
Message<String> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.send("idempotent_topic",message);
return msg;
}
}
消费者消费消息
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "idempotent_topic",consumeThreadMax = 1)
public class TestRocketMqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("消费者组:springboot_consumer_group开始消费消息...");
System.out.println(message);
log.info("消费结束...");
}
}
测试整合是否成功
订单表根据identity唯一标识字段进行数据回查,简单来说:
- 步骤一:保存订单之前根据identity到订单表查询是从已经存在
- 步骤二:存在,直接返回;不存在,保存订单
订单回查
//校验当前订单是否存在
List<MtfOrder> orderList = this.baseMapper.selectOrderByIdentity(mtfOrder.getIdentity());
if (CollUtil.isNotEmpty(orderList)){
throw new MqRepeatConsumeException(MQ_REPEAT_CONSUME.getDescription()+":创建订单根据订单标识判断订单创建之前是否存在...");
}
保存订单
//创建订单
this.save(mtfOrder);
对业务表进行数据回查是最常规的方法,但是往往最常规的方法也最容易出现问题——挡板穿透。思考一种场景,判断订单是否已经存在这个校验可靠吗?
生产者发送一个创建订单的消息,消费者接收消息并执行创建订单的逻辑,那么就存在一种情况(生产者快速重发,Broker重启),就极有可能触发重复消费,第一次请求创建的订单还没有执行完成,数据还没有落库,重复请求打进来之后创建订单的校验挡板就不会拦截这种重复的请求,就会存在这种非幂等的不安全环境中。
为订单表identity唯一标识字段建立唯一索引:
订单回查
//校验库存
if (!stockService.checkStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum())){
return Result.error("库存不足,下单失败!");
}
保存订单
try {
this.save(mtfOrder);
}
catch (DuplicateKeyException e) {
log.info("订单表唯一索引异常:{}",e.getMessage());
throw new MqRepeatConsumeException(MQ_REPEAT_CONSUME.getDescription()+":创建订单根据订单表唯一索引解决重复消费问题...");
}
为订单表标识字段添加唯一索引,将重复消费的问题抛给数据库层面进行解决。虽然这样能够解决幂等性问题,但是该方案与业务逻辑耦合,违背了java开发准则,并且这种实现方案依赖于关系型数据库适用范围有限。
引入消息表解耦合
@Transactional(rollbackFor = Exception.class) public Result<?> saveOrderByMessageTableIdempotent(MtfOrder mtfOrder){ //根据消息表标识判断该订单是否已经创建,避免重复消费 MtfRocketmqLog mtfRocketmqLog = new MtfRocketmqLog(); mtfRocketmqLog.setMsgId(mtfOrder.getIdentity()); mtfRocketmqLog.setSendTime(new Date()); mtfRocketmqLog.setMessageBody(JSON.toJSONString(mtfOrder)); mtfRocketmqLog.setBusinessType("创建订单"); try { rocketmqLogService.save(mtfRocketmqLog); } catch (Exception e) { log.info("消息表唯一索引异常:{}",e.getMessage()); throw new MqRepeatConsumeException(MQ_REPEAT_CONSUME.getDescription()+":创建订单根据消息表唯一索引解决重复消费问题..."); } //校验库存 if (!stockService.checkStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum())){ return Result.error("库存不足,下单失败!"); } //扣减库存 stockService.substractStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum()); //创建订单 this.save(mtfOrder); return Result.ok("订单创建成功!"); }
以消息表的方式来解决消费幂等问题降低了和业务逻辑代码的耦合度,但是上述案例仍然依赖关系型数据库的唯一索引,于是引出下面的解决方案——根据消息状态实现消费幂等。
如果消息的消费状态是“消费中”,说明这个订单正在创建,直到消息的消费状态是“消费完成”,才表示这个订单已经创建完成
@Transactional(rollbackFor = Exception.class) public Result<?> saveOrderByMessageStatusIdempotent(MtfOrder mtfOrder) { //判断消息表中是否存在该订单对应的消息及消息状态 MtfRocketmqLog rocketmqLog = rocketmqLogService.selectRocketMqLogByMsgId(mtfOrder.getIdentity()); if (ObjectUtil.isNotEmpty(rocketmqLog)&&rocketmqLog.getMsgStatus().equals(CONSUME_SUCCESS.getCode())){ log.info("订单创建成功..."); return Result.ok("订单创建成功..."); } if (ObjectUtil.isNotEmpty(rocketmqLog)&&rocketmqLog.getMsgStatus().equals(CONSUMING.getCode())){ //发送延迟消息 Message message = new Message(); message.setTopic("idempotent_topic"); message.setBody(JSON.toJSONBytes(mtfOrder)); message.setDelayTimeLevel(4); rocketMQTemplate.syncSend("idempotent_topic",message); log.info("订单正在创建,请稍后30s后重试..."); return Result.ok("订单正在创建,请稍后30s后重试..."); } try { MtfRocketmqLog mqLog = new MtfRocketmqLog(); mqLog.setMsgId(mtfOrder.getIdentity()); mqLog.setSendTime(new Date()); mqLog.setMessageBody(JSON.toJSONString(mtfOrder)); mqLog.setBusinessType("创建订单"); //设置消息的初始状态 mqLog.setMsgStatus(CONSUMING.getCode()); rocketmqLogService.save(mqLog); //校验库存 if (!stockService.checkStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum())){ throw new Exception("库存不足,下单失败!"); //return Result.error("库存不足,下单失败!"); } //扣减库存 stockService.substractStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum()); this.save(mtfOrder); //订单创建完成,更新消息状态 rocketmqLogService.updateRocketMqLogMsgStatusByMsgId(mtfOrder.getIdentity()); } catch (Exception e) { //订单创建失败,删除对应消息,进行重新消费 rocketmqLogService.deleteRocketMqLogByMsgId(mtfOrder.getIdentity()); } return Result.ok("订单创建成功!"); }
上述的大致流程是这样的:
思考一下上述流程还会存在消息丢失的风险呢?
假如消费者第一次消费这条消息,成功插入消息并维护了消费状态“消费中”,但是之后因为某些特定情况(机器故障类似)第一次请求创建订单并没有成功,但是消息已经存在并且一致处于“消费中”的状态,那么这条消息就算一直重试也不会下单成功,直到进入死信队列,人为的造成了消息丢失。
为了避免上述场景消息丢失的风险,可以在插入每条消息记录时设置过期时间,于是便使用了Redis
public Result<?> saveOrderByRedisIdempotent(MtfOrder mtfOrder){ //判断消息的状态 Integer status = (Integer) redisTemplate.opsForHash().get(mtfOrder.getIdentity(), "status"); if (ObjectUtil.isNotEmpty(status)){ assert status != null; if (status.equals(CONSUMING.getCode())){ //发送延迟消息到队列 Message message = new Message(); message.setTopic("idempotent_topic"); message.setBody(JSON.toJSONBytes(mtfOrder)); message.setDelayTimeLevel(4); rocketMQTemplate.syncSend("idempotent_topic",message); log.info("订单正在创建,请稍后30s后重试..."); return Result.ok("订单正在创建,请稍后30s后重试..."); } if (status.equals(CONSUME_SUCCESS.getCode())){ return Result.ok("订单创建成功!"); } } MtfRocketmqLog mqLog = new MtfRocketmqLog(); mqLog.setMsgId(mtfOrder.getIdentity()); mqLog.setSendTime(new Date()); mqLog.setMessageBody(JSON.toJSONString(mtfOrder)); mqLog.setBusinessType("创建订单"); //设置消息的初始状态 mqLog.setMsgStatus(CONSUMING.getCode()); //判断redis缓存中是否存在对应的消息记录以及消息的状态(处于“消费中”的消息过期时间设置为6分钟) Map<String, Object> map = new HashMap<>(); BeanUtils.copyProperties(mqLog,map); redisTemplate.opsForHash().putAll(mqLog.getMsgId(),map); redisTemplate.expire(mqLog.getMsgId(),360, TimeUnit.SECONDS); //校验库存 if (!stockService.checkStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum())){ return Result.error("库存不足,下单失败!"); } //扣减库存 stockService.substractStockByProductIdentity(mtfOrder.getProductIdentity(),mtfOrder.getNum()); //创建订单 this.save(mtfOrder); //将redis中对应的消息的有效时间设置永不过期 redisTemplate.persist(mtfOrder.getIdentity()); return Result.ok("订单创建成功!"); }
以上都是我最近工作上的学习总结和思考,如果有什么更好的方案欢迎大家评论区讨论~~
最后附上git仓库地址:https://gitee.com/himatengfei/mq-resolve-idempotent
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。