rocketMQ 生成消息时,定义消息队列为延时队列,指定时间。
分布式情况下可能多台服务器同时执行产生消息,所以消费消息时,需要通过redis 分布式锁来保证同一时刻,只有一台服务器在进行执行消费消息的操作,并通过业务查询判断是否已经支付成功,今在为消费成功的情况下,才进行消费,从而解决消息的重复消费幂等性。


  # 是否开启自动配置
  isEnable: true
  # nameserver 地址
  # 设置一次消费消息的条数,默认为 1 条
  consumerMessageBatchMaxSize: 1
  # 消费者线程数量
  consumerThreadMax: 32
  consumerThreadMin: 5
  # 最大消费重试次数
  maxReconsumeTimes: 3
  # 发送同一类消息设置为同一个 group,保证唯一默认不需要设置,rocketmq 会使用 ip@pid(pid代表 jvm 名字)作为唯一标识
  groupName: dipaoGroup
  # 消息最大长度 默认 1024*4(4M)
  producerMaxMessageSize: 4096
  # 发送消息失败重试次数,默认 2
  producerRetryTimesWhenSendFailed: 2
  # 发送消息超时时间,默认 3000
  producerSendMsgTimeOut: 3000
    topic: autoPay
    # 最大消费重试次数
    maxReconsumeTimes: 3
    groupName: autoPayGroup
RocketMQConfiguation :

@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
public class RocketMQConfiguation implements InitializingBean {

    private RocketMQProperties properties;

    private ApplicationContext applicationContext;

     * 注入一个默认的生产者
     * @return
     * @throws MQClientException
    public DefaultMQProducer getRocketMQProducer() throws MQClientException {
        if (StringUtils.isEmpty(properties.getGroupName())) {
            throw new MQClientException(-1, "groupName is blank");

        if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
            throw new MQClientException(-1, "nameServerAddr is blank");
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(properties.getGroupName());

        // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

        // 如果需要同一个 jvm 中不同的 producer 往不同的 mq 集群发送消息,需要设置不同的 instanceName
        // producer.setInstanceName(instanceName);
        // 如果发送消息失败,设置重试次数,默认为2次

        try {
            log.info("producer is start,groupName:{},namesrvAddr:{}", properties.getGroupName(), properties.getNamesrvAddr());
        } catch (MQClientException e) {
            log.error(String.format("producer is error {}", e.getMessage(), e));
            throw e;
        return producer;

     * SpringBoot 启动时加载所有消费者
    public void afterPropertiesSet() {
        Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
        if (consumers == null || consumers.size() == 0) {
            log.info("init rocket consumer 0");
        Iterator<String> beans = consumers.keySet().iterator();
        while (beans.hasNext()) {
            String beanName = (String) beans.next();
            AbstractRocketConsumer consumer = consumers.get(beanName);
            log.info("init success consumer title {} , topics {} , tags {}", consumer.consumerTitle,
                    consumer.topics, consumer.tags);

     * 通过消费者信息创建消费者
    public void createConsumer(AbstractRocketConsumer arc) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(arc.groupName);
         * 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         * 设置消费模型,集群还是广播,默认为集群
        // consumer.setMessageModel(MessageModel.CLUSTERING);

         * 设置一次消费消息的条数,默认为 1 条
        try {
            consumer.subscribe(arc.topics, arc.tags);
            arc.mqPushConsumer = consumer;
        } catch (MQClientException e) {
            log.error("info consumer title {}", arc.getConsumerTitle(), e);

@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
    private boolean isEnable = false;
    private String namesrvAddr = "localhost:9876";
    private String groupName = "default";
    private int producerMaxMessageSize = 1024;
    private int producerSendMsgTimeout = 2000;
    private int producerRetryTimesWhenSendFailed = 2;
    private int consumerThreadMin = 5;
    private int consumerThreadMax = 30;
    private int consumerMessageBatchMaxSize = 1;
    private int maxReconsumeTimes = 3;

public class RocketProducer {

    private DefaultMQProducer defaultMQProducer;

    public void sendMessage(String msg, String topics, String tags, String key,int delayTimeLevel) {
        try {
            Message message = new Message(topics, tags, key, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            if (delayTimeLevel > 0){
            // 发送消息到一个 Broker
            SendResult sendResult = defaultMQProducer.send(message);
            // 通过 sendResult 返回消息是否成功送达
            log.info("发送MQ消息:" + sendResult.toString());
        } catch (Exception e) {
            log.error("发送MQ消息异常:" + e.getMessage(), e);

    public void sendMessage(String msg, String topics) {
        sendMessage(msg, topics, "*", "", 0);

    public void sendMessage(String msg, String topics, String key) {
        sendMessage(msg, topics, "*", key, 0);

    public void sendMessage(String msg, String topics, String key, int delayTimeLevel) {
        sendMessage(msg, topics, "*", key, delayTimeLevel);
RocketProducer 生产者生产消息

// 当前订单所属基地需要进行自动支付尾款 并且未进行人工支付 才进行系统自动支付
DriverConfigEntity autoPayPreAmountBase = driverConfigDao.findOneByKey("autoPayPreAmountBase");
boolean autoPayPreAmountFlag = Objects.nonNull(autoPayPreAmountBase) && autoPayPreAmountBase.getValue().contains(orderEntity.getCustomerFullName())
        && DriverPayStatusEnum.PAY_SUCCESS.getCode() != orderEntity.getPrePayStatus();
if (autoPayPreAmountFlag) {
    int delayTimeLevel = 0;
    // 设置消息延时等级
    DriverConfigEntity delayTimeLevelSetting = driverConfigDao.findOneByKey("delayTimeLevel");
    if (Objects.nonNull(delayTimeLevelSetting)) {
        delayTimeLevel = Integer.valueOf(delayTimeLevelSetting.getValue());
    log.info("===生产消息===autoPayOrderPreAmount===系统自动支付司机id为[{}]订单号为[{}]的预付款", driverId, orderNo);
    rocketProducer.sendMessage(orderNo, topics, "", delayTimeLevel);
public interface RocketConsumer {
     * 初始化消费者
    void init();
     * 注册监听
     * @param messageListener
    void registerMessageListener(MessageListener messageListener);

public abstract class AbstractRocketConsumer implements RocketConsumer {

    protected String topics;
    protected String tags;
    protected MessageListener messageListener;
    protected String consumerTitle;
    protected MQPushConsumer mqPushConsumer;
    protected String groupName;

     * 必要的信息
     * @param topics
     * @param tags
     * @param consumerTitle
    public void necessary(String topics, String tags, String consumerTitle,String groupName) {
        this.topics = topics;
        this.tags = tags;
        this.consumerTitle = consumerTitle;
        this.groupName = groupName;

    public abstract void init();

    public void registerMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;

public class AutoPayAmountConsumerMQ extends AbstractRocketConsumer {

    public String topic;
    public String groupName;
    private MainOrderDao mainOrderDao;
    private RedisLocker redisLocker;
    private OrderPayV2Service orderPayService;
    private OperationLogService operationLogService;
    private TransactionManager transactionManager;
    private static final String AUTO_PAY_ORDER_PRE_AMOUNT = "autoPayOrderPreAmount:";

    public void init() {
        // 设置主题,标签与消费者标题
        super.necessary(topic, "*", "报警error日志消息", groupName);
        // 消费者具体执行逻辑
        registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // 一次消费一条消息
            MessageExt msg = msgs.get(0);
            String orderNo = new String(msg.getBody());
            // 获取redis分布式锁
            String lockKey = AUTO_PAY_ORDER_PRE_AMOUNT + orderNo;
            String lockValue = redisLocker.lock(lockKey);
            try {
                // 锁判断
                if (StringUtils.isNotBlank(lockValue)) {
                    MainOrderEntity mainOrderEntity = mainOrderDao.findByOrderNo(orderNo, Lists.newArrayList("id", "orderNo", "prePayStatus", "orderStatus"));
                    // 订单未支付预付款并且未取消才进行支付预付款
                    boolean autoPayOrderPreAmountFlag = Objects.nonNull(mainOrderEntity) && mainOrderEntity.getOrderStatus() != OrderStatusEnum.CANCELED.getCode()
                            && DriverPayStatusEnum.PAY_SUCCESS.getCode() != mainOrderEntity.getPrePayStatus();
                    if (autoPayOrderPreAmountFlag) {
                        Resp resp = orderPayService.checkOrderPay(FeeItemTypeEnum.PRE_PAY, mainOrderEntity.getId());
                        if (resp.hasSuccess()) {
                            Pair<MainOrderEntity, DriverInfoEntity> pair = (Pair<MainOrderEntity, DriverInfoEntity>) resp.getData();
                            transactionManager.doInTransaction(() -> {
                                // 订单支付
                                // 订单支付日志
                                // 修改订单支付方式
                                // 设置订单支付方式
                                return null;
                            log.info("===消费消息===autoPayOrderPreAmount===系统自动支付订单号为[{}]的预付款", orderNo);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            } catch (Exception e) {
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            } finally {
                redisLocker.unlock(lockKey, lockValue);
public class RedisLocker {
     * 锁的key前缀,整个lock key是applicationName:lock:key
    public static final String LOCK_KEY_PREFIX = "lock" + RedisConfig.KEY_DELIMITER;
     * 默认请求锁成功后的有效期,60秒
    public static final long DEFAULT_LOCK_EXPIRE = 60 * 1000;
    private RedisTemplateWarpper redisTemplateWarpper;

     * 加锁,使用默认的锁有效时间
     * @param key - key名称
     * @return return null is lock failed Otherwise return uuid value of lock-key
    public String lock(String key) {
        return this.lock(key, DEFAULT_LOCK_EXPIRE);

     * 加锁
     * @param key               - key名称
     * @param expireMillisecond - 锁成功后的有效期,毫秒
     * @return return null or empty string is lock failed Otherwise return uuid value of lock-key
    public String lock(String key, long expireMillisecond) {
        Preconditions.checkArgument(expireMillisecond > 0L);

        String lockKey = LOCK_KEY_PREFIX + key;
        String lockValue = UUID.randomUUID().toString();
        boolean keySet = redisTemplateWarpper.vSetIfAbsent(lockKey, lockValue, expireMillisecond);
        if (keySet) {   //锁成功
            return lockValue;
        return null;

     * 解锁
     * @param key
    public void unlock(String key, String value) {
        if (StringUtils.isBlank(value)) {

        String lockKey = LOCK_KEY_PREFIX + key;
        String lockValueRedis = redisTemplateWarpper.vGet(lockKey);
        if (StringUtils.equals(lockValueRedis, value)) {
