当前位置:   article > 正文

【无标题】_springboot 默认的消费者无法设置setmaxreconsumetimes

springboot 默认的消费者无法设置setmaxreconsumetimes

1. java客户端

在写代码之前,先了解几个概念:

  • Topic:主题,是一类消息的集合,每条消息只能属于一个主题

  • 生产者向Topic发送消息,消费者订阅指定的Topic,MQ会主动把消息推送给消费者

  • Tag:标签,为消息设置的标签,用于区分同一 Topic 下不同类型的消息

  • 简单来说就是给 Topic 下的消息再次分组

  • GroupName:用于给生产者和消费者分组

1. 快速入门

建立一个普通的java项目

修改 pom.xml,增加

  1. <!-- 客户端依赖,需要跟 rocketmq 版本一致 -->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.9.3</version>
  6. </dependency>

新建 Producer.java

  1. public static void main(String[] args) throws Exception {
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. // 指定 nameserver 地址,把自己注册到nameserver上,同时获取 broker 信息
  4. producer.setNamesrvAddr("192.168.56.106:9876");
  5. //启动生产者
  6. producer.start();
  7. for (int i = 0; i < 10; i++) {
  8. Message msg = new Message("wangzhe" ,// topic
  9. "yase" /* Tag */,
  10. ("亚瑟的大宝剑").getBytes("utf-8"));//把字符串转换为字节数组
  11. //发送消息
  12. SendResult sendResult = producer.send(msg);
  13. System.out.println(sendResult);
  14. }
  15. //关闭生产者,关闭后,在 rocketmq 的界面中就找不到了
  16. producer.shutdown();
  17. }

注意:发消息之前,设置虚拟机的系统时间,跟宿主机保持一致

结果:

界面上也能找到自己的 Topic 和 消息

然后新建一个Consumer类

  1. public static void main(String[] args) throws Exception {
  2. // 建立消费者,自定义一个分组名
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  4. // 指定 nameserver 地址,把自己注册到nameserver上,同时获取 broker 信息
  5. consumer.setNamesrvAddr("192.168.56.106:9876");
  6. // 订阅Topic,第二个参数为*,表示这个Topic的消息都要
  7. consumer.subscribe("wangzhe", "*");
  8. // 注册监听器,收到消息时执行
  9. consumer.registerMessageListener(new MessageListenerConcurrently() {
  10. /**
  11. * 消费消息
  12. * @param msgs 这时一个列表,可能有多个消息
  13. * @param context
  14. * @return
  15. */
  16. @Override
  17. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  18. ConsumeConcurrentlyContext context) {
  19. //取出消息体,这是我们最关注的东西
  20. System.out.println(new String(msgs.get(0).getBody()));
  21. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
  22. }
  23. });
  24. System.out.println("消费者已经就绪。。。。。。");
  25. // 启动消费者
  26. consumer.start();
  27. }

Consumer 启动后会一直等待 Broker 推送消息

这时候启动 Producer 发送消息,消费者就能收到

ConsumeConcurrentlyStatus.CONSUME_SUCCESS:表示消费成功

1. 失败重试

返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,之后消息会重发

修改 Consumer,增加 consumerLater 方法:

  1. private static void consumerLater() throws Exception {
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. consumer.subscribe("wangzhe", "*");
  5. consumer.registerMessageListener(new MessageListenerConcurrently() {
  6. @Override
  7. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  8. ConsumeConcurrentlyContext context) {
  9. System.out.println(LocalDateTime.now() +"-------"+ new String(msgs.get(0).getBody()));
  10. return ConsumeConcurrentlyStatus.RECONSUME_LATER;//表示消费失败,稍后消息会重发,默认重发16次
  11. }
  12. });
  13. System.out.println("消费者已经就绪。。。。。。");
  14. consumer.start();
  15. }

上图中:把之前的代码封装到 consumerSuccess 方法中,新增了 consumerLater 方法

先启动Consumer,再启动Producer发送消息

结果:消息会重发

可以通过 setMaxReconsumeTimes 方法设置重发次数,为了避免干扰,让上一次的消息成功消费掉

修改 Consumer,增加 consumerTimes 方法:

  1. private static void consumerTimes() throws Exception {
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. consumer.subscribe("wangzhe", "*");
  5. //设置最多重发 1 次
  6. consumer.setMaxReconsumeTimes(1);
  7. consumer.registerMessageListener(new MessageListenerConcurrently() {
  8. @Override
  9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  10. ConsumeConcurrentlyContext context) {
  11. System.out.println(LocalDateTime.now() +"-------"+ new String(msgs.get(0).getBody()));
  12. return ConsumeConcurrentlyStatus.RECONSUME_LATER;//表示消费失败,稍后消息会重发,默认重发16次
  13. }
  14. });
  15. System.out.println("消费者已经就绪。。。。。。");
  16. consumer.start();
  17. }

上图中:一共消费两次,只重发了一次

2. 发送消息的3种方式

1. 同步发送

Producer 发送消息后会等待 Broker 的响应,然后再往下执行,上面的代码就是这种方式

修改 Producer,增加 syncSend 方法:

  1. private static void syncSend() throws Exception{
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. producer.start();
  5. for (int i = 0; i < 5; i++) {
  6. String message = "亚瑟的大宝剑" + i;//加上序号
  7. Message msg = new Message("wangzhe" ,
  8. "yase",
  9. message.getBytes("utf-8"));//把字符串转换为字节数组
  10. SendResult send = producer.send(msg);
  11. System.out.println("发送成功:" + message);
  12. }
  13. producer.shutdown();
  14. }

结果,有序输出:

这时,如果发送失败,默认进行两次重试,可以通过 setRetryTimesWhenSendFailed() 指定重试次数

2. 异步发送

发送消息后,不等待 Broker的响应,而是提供一个回调方法,比如:

  1. public static void main(String[] args) throws Exception {
  2. //1. 同步发送
  3. //syncSend();
  4. //2. 异步发送
  5. asyncSend();
  6. }
  7. private static void asyncSend() throws Exception {
  8. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  9. producer.setNamesrvAddr("192.168.56.106:9876");
  10. producer.start();
  11. for(int i = 1; i<=5; i++){//数字搞的大一点,不然效果不明显
  12. String message = "亚瑟的大宝剑" + i;
  13. Message msg = new Message("wangzhe" ,
  14. "yase" ,
  15. message.getBytes("utf-8"));
  16. //异步发送消息,提供一个回调方法
  17. producer.send(msg, new SendCallback() {
  18. @Override
  19. public void onSuccess(SendResult sendResult) {
  20. System.out.println("发送成功:" + message);
  21. }
  22. @Override
  23. public void onException(Throwable e) {
  24. System.out.println("发送失败:" + e.getMessage());
  25. }
  26. });
  27. }
  28. Thread.sleep(10000);//休眠10秒再关闭客户端
  29. producer.shutdown();
  30. }

结果,乱序输出:

这时,如果发送失败,默认进行2次重试,可以通过 setRetryTimesWhenSendAsyncFailed() 指定重试次数。

3. ONEWAY

这种方式不会等 Broker 的响应,也不管成败,可以应用于一些消息不是那么重要,可丢失的场景

通过 sendOneway() 方法实现,该方法没有返回值

  1. private static void oneWay() throws Exception {
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. producer.start();
  5. for(int i = 1; i<=5; i++){
  6. String message = "亚瑟的大宝剑" + i;
  7. Message msg = new Message("wangzhe" ,
  8. "yase" ,
  9. message.getBytes("utf-8"));
  10. //发送消息 oneway,该方法没有返回值
  11. producer.sendOneway(msg);
  12. System.out.println("发送成功:" + message);
  13. }
  14. producer.shutdown();
  15. }

2. 消息的有序性

消费 message 时,按照发送的顺序来消费,先进先出

使用同步发送保证了发送时消息的顺序

1. 现象

Consumer 使用 consumerSuccess 方法,Producer 使用同步发送

先启动Producer,在启动Consumer,结果:

原因:消息是保存在多个Queue中的

2. Queue

队列是存储消息的物理实体,可以认为是数据库的表。一个Topic中有多个Queue,默认是4个,我们发的消息就是存储在这些个Queue中

在 RocketMQ Dashboard 中可以看到队列的数量

3. 为什么乱序

  • 默认情况下,消息发送会采取 _轮询 _的方式把消息发送到不同的queue

  • 消费时,从多个queue上获取消息,导致乱序

4. 为什么需要有序

假设我们的 Producer 为订单先后生成了 4 条消息:

  • 订单T0000001:未支付

  • 订单T0000001:已支付

  • 订单T0000001:发货中

  • 订单T0000001:发货失败

轮询 的方式放到了 4 队列中,每个队列一条消息

这时候消费者 收到的消息可能有很多种,但是只有一种是正确的

5. 顺序消费

解决:**发送数据,通过设置,把这些消息按照顺序放到一个Queue中,然后消费者再去消费

**有 2 种方式:

  • 设置Topic下只有一个队列,也叫全局有序(只有一个队列)

  • 设置相关的消息都发送到相同的一个队列,也叫分区有序

1. 全局有序

需要在 RocketMQ Dashborad 中先把对应主题删掉

在 Producer 中,增加:

  1. private static void allOrder() throws Exception{
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. //只设置一个队列
  5. producer.setDefaultTopicQueueNums(1);
  6. producer.start();
  7. for (int i = 0; i < 5; i++) {
  8. String message = "亚瑟的大宝剑" + i;
  9. Message msg = new Message("wangzhe" ,
  10. "yase" ,
  11. message.getBytes("utf-8"));
  12. SendResult send = producer.send(msg);
  13. System.out.println("发送成功:" + message);
  14. }
  15. producer.shutdown();
  16. }

启动 Producer 后, RocketMQ Dashborad 查看

Consumer 增加 consumerOrder 方法,使用 MessageListenerOrderly 监听器

  1. private static void consumerOrder() throws Exception {
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. consumer.subscribe("wangzhe", "*");
  5. //使用 MessageListenerOrderly 监听器
  6. consumer.registerMessageListener(new MessageListenerOrderly() {
  7. @Override
  8. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  9. System.out.println(LocalDateTime.now().toString() + "---" + new String(msgs.get(0).getBody()));
  10. return ConsumeOrderlyStatus.SUCCESS;
  11. }
  12. });
  13. System.out.println("消费者已经就绪。。。。。。");
  14. consumer.start();
  15. }

结果:

2. 分区有序

先删除 RocketMQ Dashborad 中对应主题

另外,pom.xml中增加:

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>fastjson</artifactId>
  4. <version>1.2.60</version>
  5. </dependency>

使用 Java客户端 实现,在 Producer 中增加一个新的 order 方法,内容:

  1. private static void order() throws Exception{
  2. DefaultMQProducer producer = new DefaultMQProducer("test");
  3. producer.setNamesrvAddr("192.168.228.106:9876");
  4. producer.start();
  5. for(int i = 1; i<=6; i++){
  6. Map<String,String> message = new HashMap<>();
  7. if(i % 2 ==0){
  8. message.put("type","法师");
  9. message.put("name","妲己"+i);
  10. }else{
  11. message.put("type","战士");
  12. message.put("name","亚瑟"+i);
  13. }
  14. Message msg = new Message("wangzhe" ,
  15. "yase",
  16. JSONObject.toJSONString(message).getBytes("utf-8"));
  17. SendResult send = producer.send(msg, new MessageQueueSelector() {
  18. /**
  19. * @param mqs topic下的4个队列
  20. * @param msg 上面的msg对象
  21. * @param arg 自定义的参数,用来选择消息发到哪个队列
  22. * @return
  23. */
  24. @Override
  25. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  26. //这样同一类型的消息会发达一个队列中
  27. String type = String.valueOf(arg);
  28. if(type.equals("法师")){
  29. return mqs.get(0);
  30. }else{
  31. return mqs.get(1);
  32. }
  33. }
  34. }, message.get("type"));
  35. System.out.println(send);
  36. System.out.println("发送成功:" + message);
  37. }
  38. producer.shutdown();
  39. }

Consumer 不做修改,先启动Producer发送消息,然后再启动Consumer

结果:

3. 延时消息

1. 介绍

消息写入到 Broker 后,内部的调度线程会在等待指定的时间后,把消息投递到队列中

比如:12306上卖火车票,车票预订成功后就会发一条消息,在45分钟后发送给业务系统,如果这时订单没有付款,就取消订单,已经付款则忽略

目前 RocketMQ 不支持随意时长的延迟,而是通过特定的等级来指定。

默认支持18个等级,等级定义在RocketMQ Dashboard 项目中的 MessageStoreConfig类:

指定的延时等级为 3,则表示延迟时长为 10s,即延迟等级是从 1 开始计数的

如果想自定义,需要修改 broker 启动时指定的配置文件,我的是 /home/soft/rocketmq/conf/broker.conf

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

2. java 客户端

修改:Producer ,增加:

  1. private static void delay() throws Exception {
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. producer.start();
  5. String message = "亚瑟的大宝剑---延时消息";
  6. Message msg = new Message("wangzhe" ,
  7. "yase" ,
  8. message.getBytes("utf-8"));
  9. msg.setDelayTimeLevel(3);//设置延时级别
  10. SendResult send = producer.send(msg);
  11. System.out.println(LocalDateTime.now().toString() + "发送成功:" + message);
  12. producer.shutdown();
  13. }

运行 Producer,结果:

4. 批量发送消息

如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,可以把消息打包批量发送,提高性能

批量发送消息有些限制:

  • 应该有相同的topic

  • 不能是延时消息

  • 不能超过4M

  • 可以通过 Producer 的 setMaxMessageSize 方法设置

  • spring boot 下可通过 rocketmq.producer.maxMessageSize 设置,单位是字节

1. java 客户端

Producer 中,增加:

  1. private static void batch() throws Exception{
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. //设置最大消息大小,默认4M
  5. //同时也要修改启动broker时,指定配置文件:broker.conf, 加上 maxMessageSize
  6. producer.setMaxMessageSize(1024 * 1024 * 4);
  7. producer.start();
  8. List<Message> list = new ArrayList<>();
  9. for(int i = 1; i<=100; i++){
  10. String message = "亚瑟的大宝剑" + i;
  11. Message msg = new Message("wangzhe" ,
  12. "yase" ,
  13. message.getBytes("utf-8"));
  14. list.add(msg);
  15. }
  16. SendResult send = producer.send(list);
  17. System.out.println(send);
  18. producer.shutdown();
  19. }

Consumser,批量消费:

  1. private static void consumerBatch() throws Exception{
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. //每次从 broker 批量拉取消息的数量,默认32
  5. consumer.setPullBatchSize(32);
  6. //从拉取的消息中,每次最多消费的数量,默认1,就是下面 consumeMessage 方法的 msgs 参数的长度
  7. consumer.setConsumeMessageBatchMaxSize(10);
  8. //设置最多只有 1 个线程处理消息,默认是多线程,只设置 1 个线程,更容易看到效果
  9. consumer.setConsumeThreadMax(1);
  10. //设置最少有 1 个线程处理消息
  11. consumer.setConsumeThreadMin(1);
  12. consumer.subscribe("wangzhe", "*");
  13. consumer.registerMessageListener(new MessageListenerConcurrently() {//使用并发消费监听器
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  16. System.out.println("拿到的消息数量......" + msgs.size());
  17. for(MessageExt msg : msgs){
  18. System.out.println(Thread.currentThread().getName() + "---" + LocalDateTime.now().toString() + "---" + new String(msg.getBody()));
  19. }
  20. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  21. }
  22. });
  23. consumer.start();
  24. System.out.println("消费者已经就绪。。。。。。");
  25. }

注意:先启动 Producer ,在启动 Consumer,结果:

5. 消息过滤

Producer 发送了很多消息,但是 Consumer 只想消费其中的某些,而不是全部,有两种方式

  • Tag过滤

  • SQL过滤(做了解)

1. Tag 过滤

1. java 客户端

修改Producer,增加

  1. private static void tagFilter() throws Exception{
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. producer.start();
  5. Message msg = new Message("wangzhe" ,"yase" , "我是亚瑟".getBytes("utf-8"));
  6. Message msg1 = new Message("wangzhe" ,"diaochan" , "我是貂蝉".getBytes("utf-8"));
  7. Message msg2 = new Message("wangzhe" ,"lvbu" , "我是吕布".getBytes("utf-8"));
  8. SendResult send = producer.send(msg);
  9. System.out.println(send);
  10. SendResult send1 = producer.send(msg1);
  11. System.out.println(send1);
  12. SendResult send2 = producer.send(msg2);
  13. System.out.println(send2);
  14. producer.shutdown();
  15. }

修改Consumer

  1. private static void consumerTag() throws Exception{
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. // 只消费 lvbu 和 ciaochan 这两个tag
  5. consumer.subscribe("wangzhe", "lvbu || diaochan");
  6. consumer.registerMessageListener(new MessageListenerConcurrently() {
  7. @Override
  8. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  9. //取出消息体,这是我们最关注的东西
  10. System.out.println(new String(msgs.get(0).getBody()));
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
  12. }
  13. });
  14. consumer.start();
  15. System.out.println("消费者已经就绪。。。。。。");
  16. }

结果,只收到 lvbu和 diaochan的消息

注意:实际工作中,一个服务一般只有一个消费者,通过 tag,调用不同的service处理消息

2. sql 过滤(了解

默认不支持 sql,需要修改 broker 的配置文件,我们的是 /home/soft/rocketmq/conf/broker.conf

enablePropertyFilter=true

之后重启rocketMq

表达式介绍:

  1. 关键字:
  2. AND, OR, NOT, BETWEEN...AND, IN, TRUE, FALSE, IS, NULL
  3. 数据类型:
  4. Boolean, 比如: TRUE, FALSE
  5. String, 比如: 'abc',必须用单引号包起来
  6. 数字, 比如: 1233.1415
  7. 语法:
  8. AND, OR
  9. >, >=, <, <=, =
  10. BETWEEN A AND B, equals to >=A AND <=B
  11. NOT BETWEEN A AND B, equals to >B OR <A
  12. IN ('a', 'b'), equals to ='a' OR ='b',
  13. IS NULL, IS NOT NULL,
  14. =TRUE, =FALSE,
  15. 样例:
  16. (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)

1. java 客户端

修改 Producer,增加

  1. private static void sqlFilter() throws Exception {
  2. DefaultMQProducer producer = new DefaultMQProducer("test");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. producer.start();
  5. Message msg1 = new Message("wangzhe", "貂蝉芳龄18".getBytes("utf-8"));
  6. msg1.putUserProperty("age","18");
  7. SendResult send1 = producer.send(msg1);
  8. System.out.println(send1);
  9. Message msg2 = new Message("wangzhe", "西施芳龄19".getBytes("utf-8"));
  10. msg2.putUserProperty("age","19");
  11. SendResult send2 = producer.send(msg2);
  12. System.out.println(send2);
  13. Message msg3 = new Message("wangzhe", "甄姬芳龄20".getBytes("utf-8"));
  14. msg3.putUserProperty("age","20");
  15. SendResult send3 = producer.send(msg3);
  16. System.out.println(send3);
  17. producer.shutdown();
  18. }

修改Consumer,增加

  1. private static void consumerSql() throws Exception{
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. //Sql 过滤
  5. consumer.subscribe("wangzhe", MessageSelector.bySql("age = 18"));
  6. consumer.registerMessageListener(new MessageListenerConcurrently() {
  7. @Override
  8. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  9. //取出消息体,这是我们最关注的东西
  10. System.out.println(new String(msgs.get(0).getBody()));
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
  12. }
  13. });
  14. consumer.start();
  15. System.out.println("消费者已经就绪。。。。。。");
  16. }

结果:

6. 发送重试机制

1.消息发送重试

Producer 会对发送失败的消息,进行重发,目的:保证消息不丢失

需要注意:

  • 采用 同步或异步 发送时才会重试,oneway方式不会

  • 普通消息有重试,顺序消息没有

  • 可能造成消息重复,需要在消费端处理

1. 同步发送重试

  • 发送失败重投次数,默认为2

  • 可以通过 provider.setRetryTimesWhenSendFailed** **方法设置重投次数

  • 重投时,尝试向其他broker发送(就是Rocketmq集群下的其他节点),目的是为了保证消息不丢

  • 需要设置 provider.setRetryAnotherBrokerWhenNotStoreOK(true);

  • 如果消息刷盘超时,不会将消息尝试发送到其他 Broker

  • 刷盘:Producer 发送消息到 Broker,Broker会先把消息放到内存中,然后定时刷到磁盘

  • 超过重投次数,抛出异常

  • 如果消息发送超时,不重试

  • 可以通过 setSendMsgTimeout 设置超时时间,默认 3 秒

2. 异步发送重试

  • 发送失败重试次数,默认为2

  • 可以通过 provider.**retryTimesWhenSendAsyncFailed **方法设置重投次数

  • 仅在同一个broker上做重试

  • 如果消息发送超时,不重试

  • 可以通过 setSendMsgTimeout 设置超时时间,默认 3 秒

2. 消息的可靠性

其实就是保证消息不丢失

RocketMQ消息丢失可能发生在以下三个阶段:

  • 生产者发送消息到Broker时;

  • Broker内部存储消息到磁盘以及主从复制同步时;

  • Broker把消息推送给消费者或者消费者主动拉取消息时;

1. 发送端

推荐使用同步发送,如果是异步发送,可以根据发送的结果信息来判断是否需要重试来保证消息的可靠性

如果要求比较高,在发送失败时,尝试将消息存储到 db,由后台线程定时重试,确保消息到达 Broker

响应状态:

  • SEND_OK:消息发送成功,但并不可靠,要保证不丢失,需要启用同步Master服务器或同步刷盘

  • FLUSH_DISK_TIMEOUT:消息发送成功,但刷盘超时,此时消息在队列中(内存),服务器宕机,消息丢失

  • FLUSH_SLAVE_TIMEOUT:消息发送成功,同步到slave节点超时,此时消息在队列中,服务器宕机,消息丢失

  • SLAVE_NOT_AVAILABLE:消息发送成功,但slave不可用。

2. Broker端

可能丢失的原因:

  • 刷盘阶段可能会出现消息丢失

  • 主从复制时,要将Master的消息同步到Slave,这个过程也会出现消息丢失

刷盘有2种方式:同步刷盘、异步刷盘。

  • 同步刷盘:消息写入内存的 PageCache后,立刻刷盘,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大

  • 异步刷盘:默认方式,消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功

  • 当 PageCache中的消息积累到一定的量时,触发一次刷盘或者定时将消息写入磁盘中

  • 这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全

主从同步也有2种方式:同步复制、异步复制。

  • 同步复制:等Master和Slave都写成功后才反馈客户端

  • 在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复

  • 但是同步复制会增大数据写入延迟,降低系统吞吐量

  • 异步复制:只要Master写成功,即可反馈给客户端写成功状态

  • 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,

  • 但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失

3. 消费端

  • 消费重试

  • 死信队列:未成功消费的消息,不会立刻将消息丢弃,而是发送到死信队列,其名称是在原队列名称前加%DLQ%,如果消息最终进入了死信队列,则可以通过RocketMQ提供的相关接口从死信队列获取到相应的消息,保证了消息消费的可靠性

7. 重复消费

消息的重复消费分为两种

  • 同一条消息消费两次

  • Consumer 已经处理消息,但是由于网络原因,Broker 没有收到通知,重新消费

  • 两条消息内容一样

  • Producer 发送消息时可能触发重试机制,导致多条消息内容一致

解决:

  • 保证消费端业务逻辑的幂等性,即:无论执行多少个相同消息,结果都一致

  • 比如:处理消息的目的是,更新数据库值为100,这样的消息就算重复消费也无所谓

  • 利用日志表去记录每次消费成功消息,消费前先判断下日志表中是否已经消费过

  • 在发送消息时,可以对消息内容进行hash,然后把得hahs值放到Message的key中,推荐使用

  • 把消息的业务id,放到key中,比如:订单id

  • java 客户端方式

  • consusmer获取key,然后存起来,每次消费时先判断 key 存不存在

  • 在界面上通过key查询也很方便

8. 消费的2种方式

1. 集群消费

每个消息只会被消费一次,同一个Group下的Consumer,合作消费

Producer

  1. private static void send() throws Exception{
  2. DefaultMQProducer producer = new DefaultMQProducer("wangzhe");
  3. producer.setNamesrvAddr("192.168.56.106:9876");
  4. producer.start();
  5. //发送 6 条消息
  6. for(int i = 1; i<=6; i++){
  7. String message = "亚瑟的大宝剑" + i;
  8. Message msg = new Message("wangzhe" ,// topic
  9. "yase" /* Tag */,
  10. message.getBytes("utf-8"));//把字符串转换为字节数组
  11. msg.setKeys("1111111111");//存放业务id
  12. SendResult send = producer.send(msg);
  13. System.out.println("发送成功:" + message);
  14. }
  15. producer.shutdown();
  16. }

Consumer

  1. public static void main(String[] args) throws Exception {
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wangzhe");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. // 设置消费模型,集群还是广播,默认为集群
  5. consumer.setMessageModel(MessageModel.CLUSTERING);
  6. consumer.subscribe("wangzhe", "*");
  7. consumer.setMaxReconsumeTimes(2);
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  11. System.out.println(LocalDateTime.now().toString() + "---" + new String(msgs.get(0).getBody()));
  12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  13. }
  14. });
  15. consumer.start();
  16. System.out.println("消费者已经就绪。。。。。。");
  17. }

Consumer2

  1. public static void main(String[] args) throws Exception {
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
  3. consumer.setNamesrvAddr("192.168.56.106:9876");
  4. // 设置消费模型,集群还是广播,默认为集群
  5. consumer.setMessageModel(MessageModel.CLUSTERING);
  6. consumer.subscribe("wangzhe", "*");
  7. consumer.setMaxReconsumeTimes(2);
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  11. System.out.println(LocalDateTime.now().toString() + "---" + new String(msgs.get(0).getBody()));
  12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  13. }
  14. });
  15. consumer.start();
  16. System.out.println("消费者已经就绪2。。。。。。");
  17. }

结果:

2. 广播

每个Consumer都消费所有消息

修改Consumer

  1. // 设置消费模型:广播
  2. consumer.setMessageModel(MessageModel.BROADCASTING);

9. Spring boot中使用

Rocketmq 集成在 Spring boot 中非常简单,只需要准备一个配置类,比如:

  1. @Configuration
  2. public class RocketmqConfig {
  3. /**
  4. * rocketmq 生产者
  5. * @return
  6. */
  7. @Bean
  8. public DefaultMQProducer defaultMQProducer(){
  9. DefaultMQProducer producer = new DefaultMQProducer("consumer");
  10. // 指定 nameserver 地址,把自己注册到nameserver上,同时获取 broker 信息
  11. producer.setNamesrvAddr("192.168.56.106:9876");
  12. try {
  13. producer.start();
  14. } catch (MQClientException e) {
  15. }
  16. return producer;
  17. }
  18. /**
  19. * rocketmq 消费者
  20. * @return
  21. */
  22. @Bean
  23. public DefaultMQPushConsumer defaultMQPushConsumer(){
  24. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producer");
  25. consumer.setNamesrvAddr("192.168.56.106:9876");
  26. try {
  27. consumer.subscribe("topic", "*");
  28. consumer.registerMessageListener(new MessageListenerConcurrently() {
  29. @Override
  30. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  31. ConsumeConcurrentlyContext context) {
  32. String url = new String(msgs.get(0).getBody());
  33. handelMovieService.handelUrl(url);
  34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//表示消费成功
  35. }
  36. });
  37. //启动消费者
  38. consumer.start();
  39. } catch (MQClientException e) {
  40. }
  41. return consumer;
  42. }
  43. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/603816
推荐阅读
相关标签
  

闽ICP备14008679号