当前位置:   article > 正文

SpringBoot整合RocketMQ、发布订阅、顺序消息、事物消息、消息重试和死信队列_springboot整合rocketmq实现死信队列

springboot整合rocketmq实现死信队列

SpringBoot整合RocketMQ、发布订阅、顺序消息、事物消息、消息重试和死信队列

一、RocketMQ

RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统

Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:

  1. 支持事务消息(能够解决分布式事务的问题)
  2. 支持顺序消息(底层已经使用内存队列实现)
  3. 支持consumer端tag过滤,减少不必要的网络传输

二、Kafka与RocketMQ对比

  1. kafka中Broker是物理概念
  2. RocketMQ是逻辑概念 相等于一个Broker多个不同的 Master组合
  3. Kafka是采用zk实现对生产者、消费者 topic信息存储
  4. RocketMQ是自己的nameServer实现去中心化注册

三、RocketMQ部署

RocketMQ集群搭建,参考下面我的一篇博客:

RocketMQ双主双备集群搭建: https://blog.csdn.net/qq_43692950/article/details/111823611

四、SpringBoot集成实现发布订阅

  1. pom
 <dependency>
   	<groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>2.0.3</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  1. application.yml
server:
  port: 80

rocketmq:
#  name-server: 192.168.40.128:9876;192.168.40.130:9876;  #集群方式
  name-server: 192.168.40.128:9876;   #单机方式
  producer:
    group: bxc_producer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 定义消息实体类
@Data
public class JmsMsg {
    private int code;
    private String msg;

    public JmsMsg() { }

    public JmsMsg(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 定义消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "simpletopic", consumerGroup = "simpletopic")
public class JmsConsumer implements RocketMQListener<JmsMsg> {
    @SneakyThrows
    @Override
    public void onMessage(JmsMsg jmsMsg) {
            log.info("消费者-->"+jmsMsg.toString());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

consumerGroup 为消费者组,同一组的消费者会均摊消息,每个组都会受到消息。

  1. 发送消息
@Component
public class JmsProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public boolean send(JmsMsg msg){
        try{
            rocketMQTemplate.convertAndSend("simpletopic", msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
        }
       return false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  1. 同步方式发送消息
JmsMsg jmsMsg = new JmsMsg(200,"test");
Message message = new Message("simpletopic","tag", JSONObject.toJSONString(msg).getBytes());
SendResult send = rocketMQTemplate.getProducer().send(message);
if (send.getSendStatus() == SendStatus.SEND_OK){
    System.out.println("发送成功!");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 异步发送消息
rocketMQTemplate.getProducer().send(message, new SendCallback() {
     @Override
     public void onSuccess(SendResult sendResult) {
         if (sendResult.getSendStatus() == SendStatus.SEND_OK){
             System.out.println("发送成功!");
         }
     }
     @Override
     public void onException(Throwable throwable) {
         System.out.println("发送失败!+"+throwable.toString());
     }
 });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 只发送消息,不等待服务器响应
    此方式发送消息的过程耗时非常短,一般在微秒级别。
rocketMQTemplate.sendOneWay("simpletopic",jmsMsg);
//或者
rocketMQTemplate.getProducer().sendOneway(message);
  • 1
  • 2
  • 3
  1. 延时消息
Message message = new Message("simpletopic","tag",JSONObject.toJSONString(jmsMsg).getBytes());
//messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
//4 指的是30s
message.setDelayTimeLevel(2);
//延时发送
rocketMQTemplate.getProducer().send(message);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

五、顺序消息

RocketMQ 默认消息同时的,但是在有些场景需要队列中的消息有顺序性,比如:发送了一条添加数据的消息,然后接着发送了一条更改数据的消息,此时肯定需要添加数据的消息先被消费,在rocketmq中可以指定消费者的consumeMode,其有两个属性CONCURRENTLY、ORDERLY,ORDERLY便是让一个队列中的数据有顺序的消费,先进先出。

RocketMQ 一个Topic其实是分成了4个队列,所以在消息投递时,需要顺序的消息,要放在同一个队列中。

  1. 消费者
    指定 consumeMode = ConsumeMode.ORDERLY
@Slf4j
@Component
@RocketMQMessageListener(topic = "bxctopic", consumerGroup = "bxctopic1",consumeMode = ConsumeMode.ORDERLY)
public class JmsOrderConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt msg) {
        try {
            int time = (int)(Math.random()*5+1)*100;
            Thread.sleep(time);
            log.info("消费者2-->"+ Thread.currentThread().getName() + "," +
                    "队列" + msg.getQueueId() + msg.getQueueId()+",内容"+ new String(msg.getBody())+"   "+time);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 投递消息,需要顺序的放在一个队列中
@RestController
public class OrderMsg {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /*
    根据code选择队列存放
     */
    class TopicMessageQueueSelector implements MessageQueueSelector{
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message message, Object o) {
            int orderId = (int) o;
            long index = orderId % mqs.size();
            return mqs.get((int) index);
        }
    }

    @GetMapping("/OrderQueue")
    public String sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        JmsMsg msg1 = new JmsMsg(200, "AAAAAAAA");
        Message message1 = new Message("bxctopic", JSONObject.toJSONString(msg1).getBytes());
        rocketMQTemplate.getProducer().send(message1,new TopicMessageQueueSelector(),msg1.getCode());

        JmsMsg msg2 = new JmsMsg(200, "BBBBB");
        Message message2 = new Message("bxctopic", JSONObject.toJSONString(msg2).getBytes());
        rocketMQTemplate.getProducer().send(message2,new TopicMessageQueueSelector(),msg2.getCode());

        JmsMsg msg3 = new JmsMsg(200, "CCCCC");
        Message message3 = new Message("bxctopic", JSONObject.toJSONString(msg3).getBytes());
        rocketMQTemplate.getProducer().send(message3,new TopicMessageQueueSelector(),msg3.getCode());


        JmsMsg msg4 = new JmsMsg(201, "AAAAAAAA");
        Message message4 = new Message("bxctopic", JSONObject.toJSONString(msg4).getBytes());
        rocketMQTemplate.getProducer().send(message4,new TopicMessageQueueSelector(),msg4.getCode());

        JmsMsg msg5 = new JmsMsg(201, "BBBBB");
        Message message5 = new Message("bxctopic", JSONObject.toJSONString(msg5).getBytes());
        rocketMQTemplate.getProducer().send(message5,new TopicMessageQueueSelector(),msg5.getCode());

        JmsMsg msg6 = new JmsMsg(201, "CCCCC");
        Message message6 = new Message("bxctopic", JSONObject.toJSONString(msg6).getBytes());
        rocketMQTemplate.getProducer().send(message6,new TopicMessageQueueSelector(),msg6.getCode());


        return "success";
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  1. 访问接口效果
    在这里插入图片描述

如果消费者的模式为:consumeMode = ConsumeMode.CONCURRENTLY
效果如下,缺失了顺序性。
在这里插入图片描述

五、事物消息

RocketMQ的事物消息,在一定情况下可以解决分布式事物问题,在发起方发出消息后,由于突发状况发生错误引起数据回滚,但消费方不知道发起方状况无法回滚,导致数据不一致情况,正好RocketMQ的事物消息可以解决这一问题,在发起方发送消息时,消息不能立即被消费者消费,等待本地事物提交后,再交给消费者。

  1. pom 连接数据库的依赖
<dependency>
   <groupId>com.baomidou</groupId>
   <artifactId>mybatis-plus-boot-starter</artifactId>
   <version>3.3.2</version>
</dependency>

<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid</artifactId>
   <version>1.1.6</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. application.yml 添加连接数据库信息
spring:
  datasource:
    url: jdbc:mysql://192.168.40.130:3306/db1?useUnicode=true&characterEncoding=utf8
    username: root
    password: root
    type: com.alibaba.druid.pool.DruidDataSource
mybatis-plus:
#  mapper-locations: classpath:mapper/xml/*.xml
#  type-aliases-package: com.bxc.eurekaprovide.Entity
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    map-underscore-to-camel-case: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. user实体类
@Data
@TableName("user")
public class UserEntity {
    @TableId(value = "id",type = IdType.AUTO)
    private Integer id;
    private String name;
    private int age;

    public UserEntity(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. Mapper
@Mapper
@Component
public interface UserMapper extends BaseMapper<UserEntity> {
}
  • 1
  • 2
  • 3
  • 4
  1. 事物消息消费者
@Component
@RocketMQMessageListener(topic = "TBxcTopic", consumerGroup = "TBxcTopic")
public class TxConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        UserEntity userEntity = JSONObject.parseObject(msg, UserEntity.class);
        System.out.println("事物队列-->  消息-->"+userEntity.toString());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 手动事物封装
@Service
public class TransationalUtils {
    @Autowired
    public DataSourceTransactionManager transactionManager;

    public TransactionStatus begin() {
        TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
        return transaction;
    }

    public void commit(TransactionStatus transaction) {
        transactionManager.commit(transaction);

    }

    public void rollback(TransactionStatus transaction) {
        transactionManager.rollback(transaction);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 事物消息,监听
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "bxcProducer")
public class SyncProducerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private TransationalUtils transationalUtils;

    @Autowired
    private UserMapper userMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        TransactionStatus beginStatus = null;
        try {
            beginStatus = transationalUtils.begin();
            MessageHeaders headers = message.getHeaders();
            String msg = (String) headers.get("msg");
            if (StringUtils.isEmpty(msg)) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            UserEntity user = JSONObject.parseObject(msg, UserEntity.class);
            int result = userMapper.insert(user);
            if (result > 0) {
                transationalUtils.commit(beginStatus);
                return RocketMQLocalTransactionState.COMMIT;
            }
        } catch (Exception e) {
            e.printStackTrace();
            if (beginStatus != null) {
                transationalUtils.rollback(beginStatus);
            }
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("检查触发");
        MessageHeaders headers = message.getHeaders();
        String msg = (String) headers.get("msg");
        if (StringUtils.isEmpty(msg)) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        UserEntity userEntity = JSONObject.parseObject(msg, UserEntity.class);
        Integer id = userEntity.getId();
        UserEntity user = userMapper.selectById(id);
        if (user != null && user.getId() != null && user.getId() > 0) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

其中executeLocalTransaction方法,会在消息发出时触发,如果返回RocketMQLocalTransactionState.COMMIT 则该消息会被消费者消费,如果返回RocketMQLocalTransactionState.ROLLBACK 则消息不会被消费。

checkLocalTransaction方法,是executeLocalTransaction方法返回的既不提交也不回滚时,或者在executeLocalTransaction方法执行中服务突然停掉了RocketMQ没收到返回,这种情况下RocketMq会定期触发checkLocalTransaction方法询问数据是否已保存,如果返回RocketMQLocalTransactionState.UNKNOWN则表明不清楚,会继续询问。

  1. controller
@RestController
public class TController {
    @Autowired
    RocketMQTemplate template;

    @GetMapping("TGetTest")
    public String TGetTest() throws MQClientException {
        UserEntity userEntity = new UserEntity("testname",18);
        String user = JSONObject.toJSONString(userEntity);

        Message message = MessageBuilder.withPayload(user)
                .setHeader("msg",user)
                .build();

        template.sendMessageInTransaction("bxcProducer","TBxcTopic",message,null);
        return "success";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

五、消息重试和死信队列

在消费者,消费过程中,如果消息处理不当,需求再次消费该消息时,RocketMQ的消息重试机制便可解决,当消费者消费时,如果抛出了异常,则会重新再次投递给该消费者。

@Slf4j
@Component
@RocketMQMessageListener(topic = "simpletopic", consumerGroup = "simpletopic")
public class JmsConsumer implements RocketMQListener<JmsMsg> {
    @SneakyThrows
    @Override
    public void onMessage(JmsMsg jmsMsg) {
        log.info("消费者1-->" + jmsMsg.toString());
        #抛出异常、重新获取消息
        throw new Exception("abc");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

但消息不是一直就可以重新投递的,当重试次数达到默认的16次后(可以通过配置文件修改)如果对应的消息还没被成功消费的话,该消息就会投递到DLQ死信队列。
可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:%RETRY%消费组名称(重试Topic)%DLQ%消费组名称(死信Topic)。
死信队列也可以被订阅和消费,并且也会过期。有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3天内及时处理。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/485468
推荐阅读
相关标签
  

闽ICP备14008679号