赞
踩
业务需求:默认情况下符合条件的订单都需要进行人工支付,耗费大量时间成本,产生需求:在指定时间内对符合支付条件的订单进行系统自动支付
实现思路:
rocketMQ 生成消息时,定义消息队列为延时队列,指定时间。
分布式情况下可能多台服务器同时执行产生消息,所以消费消息时,需要通过redis 分布式锁来保证同一时刻,只有一台服务器在进行执行消费消息的操作,并通过业务查询判断是否已经支付成功,今在为消费成功的情况下,才进行消费,从而解决消息的重复消费幂等性。
yml
rocketmq: # 是否开启自动配置 isEnable: true # nameserver 地址 namesrvAddr: 172.16.170.70:9876 # 设置一次消费消息的条数,默认为 1 条 consumerMessageBatchMaxSize: 1 # 消费者线程数量 consumerThreadMax: 32 consumerThreadMin: 5 # 最大消费重试次数 maxReconsumeTimes: 3 # 发送同一类消息设置为同一个 group,保证唯一默认不需要设置,rocketmq 会使用 ip@pid(pid代表 jvm 名字)作为唯一标识 groupName: dipaoGroup # 消息最大长度 默认 1024*4(4M) producerMaxMessageSize: 4096 # 发送消息失败重试次数,默认 2 producerRetryTimesWhenSendFailed: 2 # 发送消息超时时间,默认 3000 producerSendMsgTimeOut: 3000 autoPay: topic: autoPay # 最大消费重试次数 maxReconsumeTimes: 3 groupName: autoPayGroup
RocketMQ-延时消息Demo及实现原理分析:
https://blog.csdn.net/hosaos/article/details/90577732
RocketMQConfiguation :
@Configuration @EnableConfigurationProperties({RocketMQProperties.class}) @ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") @Slf4j public class RocketMQConfiguation implements InitializingBean { @Autowired private RocketMQProperties properties; @Autowired private ApplicationContext applicationContext; /** * 注入一个默认的生产者 * * @return * @throws MQClientException */ @Bean public DefaultMQProducer getRocketMQProducer() throws MQClientException { if (StringUtils.isEmpty(properties.getGroupName())) { throw new MQClientException(-1, "groupName is blank"); } if (StringUtils.isEmpty(properties.getNamesrvAddr())) { throw new MQClientException(-1, "nameServerAddr is blank"); } DefaultMQProducer producer; producer = new DefaultMQProducer(properties.getGroupName()); producer.setNamesrvAddr(properties.getNamesrvAddr()); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 如果需要同一个 jvm 中不同的 producer 往不同的 mq 集群发送消息,需要设置不同的 instanceName // producer.setInstanceName(instanceName); producer.setMaxMessageSize(properties.getProducerMaxMessageSize()); producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout()); // 如果发送消息失败,设置重试次数,默认为2次 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed()); try { producer.start(); log.info("producer is start,groupName:{},namesrvAddr:{}", properties.getGroupName(), properties.getNamesrvAddr()); } catch (MQClientException e) { log.error(String.format("producer is error {}", e.getMessage(), e)); throw e; } return producer; } /** * SpringBoot 启动时加载所有消费者 */ @Override public void afterPropertiesSet() { Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class); if (consumers == null || consumers.size() == 0) { log.info("init rocket consumer 0"); } Iterator<String> beans = consumers.keySet().iterator(); while (beans.hasNext()) { String beanName = (String) beans.next(); AbstractRocketConsumer consumer = consumers.get(beanName); consumer.init(); createConsumer(consumer); log.info("init success consumer title {} , topics {} , tags {}", consumer.consumerTitle, consumer.topics, consumer.tags); } } /** * 通过消费者信息创建消费者 */ public void createConsumer(AbstractRocketConsumer arc) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(arc.groupName); consumer.setNamesrvAddr(this.properties.getNamesrvAddr()); consumer.setConsumeThreadMin(this.properties.getConsumerThreadMin()); consumer.setConsumeThreadMax(this.properties.getConsumerThreadMax()); consumer.registerMessageListener(arc.getMessageListener()); consumer.setMaxReconsumeTimes(this.properties.getMaxReconsumeTimes()); /** * 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费 */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 设置消费模型,集群还是广播,默认为集群 */ // consumer.setMessageModel(MessageModel.CLUSTERING); /** * 设置一次消费消息的条数,默认为 1 条 */ consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerMessageBatchMaxSize()); try { consumer.subscribe(arc.topics, arc.tags); consumer.start(); arc.mqPushConsumer = consumer; } catch (MQClientException e) { log.error("info consumer title {}", arc.getConsumerTitle(), e); } } }
ConfigurationProperties:
@Data @ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerThreadMin = 5; private int consumerThreadMax = 30; private int consumerMessageBatchMaxSize = 1; private int maxReconsumeTimes = 3; } 生产者: @Service @Slf4j public class RocketProducer { @Autowired private DefaultMQProducer defaultMQProducer; public void sendMessage(String msg, String topics, String tags, String key,int delayTimeLevel) { try { Message message = new Message(topics, tags, key, msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); if (delayTimeLevel > 0){ message.setDelayTimeLevel(delayTimeLevel); } // 发送消息到一个 Broker SendResult sendResult = defaultMQProducer.send(message); // 通过 sendResult 返回消息是否成功送达 log.info("发送MQ消息:" + sendResult.toString()); } catch (Exception e) { log.error("发送MQ消息异常:" + e.getMessage(), e); } } public void sendMessage(String msg, String topics) { sendMessage(msg, topics, "*", "", 0); } public void sendMessage(String msg, String topics, String key) { sendMessage(msg, topics, "*", key, 0); } public void sendMessage(String msg, String topics, String key, int delayTimeLevel) { sendMessage(msg, topics, "*", key, delayTimeLevel); } }
RocketProducer 生产者生产消息
// 当前订单所属基地需要进行自动支付尾款 并且未进行人工支付 才进行系统自动支付
DriverConfigEntity autoPayPreAmountBase = driverConfigDao.findOneByKey("autoPayPreAmountBase");
boolean autoPayPreAmountFlag = Objects.nonNull(autoPayPreAmountBase) && autoPayPreAmountBase.getValue().contains(orderEntity.getCustomerFullName())
&& DriverPayStatusEnum.PAY_SUCCESS.getCode() != orderEntity.getPrePayStatus();
if (autoPayPreAmountFlag) {
int delayTimeLevel = 0;
// 设置消息延时等级
DriverConfigEntity delayTimeLevelSetting = driverConfigDao.findOneByKey("delayTimeLevel");
if (Objects.nonNull(delayTimeLevelSetting)) {
delayTimeLevel = Integer.valueOf(delayTimeLevelSetting.getValue());
}
log.info("===生产消息===autoPayOrderPreAmount===系统自动支付司机id为[{}]订单号为[{}]的预付款", driverId, orderNo);
rocketProducer.sendMessage(orderNo, topics, "", delayTimeLevel);
}
消费者父接口:
public interface RocketConsumer { /** * 初始化消费者 */ void init(); /** * 注册监听 * @param messageListener */ void registerMessageListener(MessageListener messageListener); } 加载消费者基本信息 @Data public abstract class AbstractRocketConsumer implements RocketConsumer { protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitle; protected MQPushConsumer mqPushConsumer; protected String groupName; /** * 必要的信息 * * @param topics * @param tags * @param consumerTitle */ public void necessary(String topics, String tags, String consumerTitle,String groupName) { this.topics = topics; this.tags = tags; this.consumerTitle = consumerTitle; this.groupName = groupName; } @Override public abstract void init(); @Override public void registerMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } }
消费者消费消息
@Component @Slf4j @Getter public class AutoPayAmountConsumerMQ extends AbstractRocketConsumer { @Value("${rocketmq.autoPayAmount.topic}") public String topic; @Value("${rocketmq.autoPayAmount.groupName}") public String groupName; @Autowired private MainOrderDao mainOrderDao; @Autowired private RedisLocker redisLocker; @Autowired private OrderPayV2Service orderPayService; @Autowired private OperationLogService operationLogService; @Autowired private TransactionManager transactionManager; private static final String AUTO_PAY_ORDER_PRE_AMOUNT = "autoPayOrderPreAmount:"; @Override public void init() { // 设置主题,标签与消费者标题 super.necessary(topic, "*", "报警error日志消息", groupName); // 消费者具体执行逻辑 registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // 一次消费一条消息 MessageExt msg = msgs.get(0); String orderNo = new String(msg.getBody()); // 获取redis分布式锁 String lockKey = AUTO_PAY_ORDER_PRE_AMOUNT + orderNo; String lockValue = redisLocker.lock(lockKey); try { // 锁判断 if (StringUtils.isNotBlank(lockValue)) { MainOrderEntity mainOrderEntity = mainOrderDao.findByOrderNo(orderNo, Lists.newArrayList("id", "orderNo", "prePayStatus", "orderStatus")); // 订单未支付预付款并且未取消才进行支付预付款 boolean autoPayOrderPreAmountFlag = Objects.nonNull(mainOrderEntity) && mainOrderEntity.getOrderStatus() != OrderStatusEnum.CANCELED.getCode() && DriverPayStatusEnum.PAY_SUCCESS.getCode() != mainOrderEntity.getPrePayStatus(); if (autoPayOrderPreAmountFlag) { Resp resp = orderPayService.checkOrderPay(FeeItemTypeEnum.PRE_PAY, mainOrderEntity.getId()); if (resp.hasSuccess()) { Pair<MainOrderEntity, DriverInfoEntity> pair = (Pair<MainOrderEntity, DriverInfoEntity>) resp.getData(); transactionManager.doInTransaction(() -> { // 订单支付 // 订单支付日志 // 修改订单支付方式 // 设置订单支付方式 return null; }); log.info("===消费消息===autoPayOrderPreAmount===系统自动支付订单号为[{}]的预付款", orderNo); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { redisLocker.unlock(lockKey, lockValue); } }); } }
redis分布式锁
@Slf4j @Component public class RedisLocker { /** * 锁的key前缀,整个lock key是applicationName:lock:key */ public static final String LOCK_KEY_PREFIX = "lock" + RedisConfig.KEY_DELIMITER; /** * 默认请求锁成功后的有效期,60秒 */ public static final long DEFAULT_LOCK_EXPIRE = 60 * 1000; @Autowired private RedisTemplateWarpper redisTemplateWarpper; /** * 加锁,使用默认的锁有效时间 * * @param key - key名称 * @return return null is lock failed Otherwise return uuid value of lock-key */ public String lock(String key) { return this.lock(key, DEFAULT_LOCK_EXPIRE); } /** * 加锁 * * @param key - key名称 * @param expireMillisecond - 锁成功后的有效期,毫秒 * @return return null or empty string is lock failed Otherwise return uuid value of lock-key */ public String lock(String key, long expireMillisecond) { Preconditions.checkArgument(StringUtils.isNotBlank(key)); Preconditions.checkArgument(expireMillisecond > 0L); String lockKey = LOCK_KEY_PREFIX + key; String lockValue = UUID.randomUUID().toString(); boolean keySet = redisTemplateWarpper.vSetIfAbsent(lockKey, lockValue, expireMillisecond); if (keySet) { //锁成功 return lockValue; } return null; } /** * 解锁 * * @param key */ public void unlock(String key, String value) { if (StringUtils.isBlank(value)) { return; } String lockKey = LOCK_KEY_PREFIX + key; String lockValueRedis = redisTemplateWarpper.vGet(lockKey); if (StringUtils.equals(lockValueRedis, value)) { redisTemplateWarpper.kDelete(lockKey); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。