当前位置:   article > 正文

RocketMQ 第一章 核心功能 4 消息发送样例 4.5 批量消息_rocketmq批量发送 指定队列

rocketmq批量发送 指定队列

RocketMQ

【黑马程序员RocketMQ系统精讲,电商分布式消息中间件,硬核揭秘双十一】

第一章 核心功能

4 消息发送样例

4.5 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

在这里插入图片描述

4.5.1 发送和接收批量消息

之前我们 做的 循环不算,那是 一条一条发的

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

来一个 全新 的生产者

package com.dingjiaxiong.mq.rocketmq.batch;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

// 生产者
public class Producer {


    public static void main(String[] args) throws Exception {


        // 1.创建消息生产者producer,并制定生产者组名

        DefaultMQProducer producer = new DefaultMQProducer("group1");

        // 2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.19.100:9876;192.168.19.102:9876");

        // 3.启动producer
        producer.start();

        List<Message> msgs = new ArrayList<Message>();

        // 4.创建消息对象,指定主题Topic、Tag和消息体
        /**
         * 参数1:消息主题Topic
         * 参数2:消息Tag
         * 参数3:消息内容
         * */
        Message message1 = new Message("BatchTopic", "Tag1", ("Hello world" + 1).getBytes());
        Message message2= new Message("BatchTopic", "Tag1", ("Hello world" + 2).getBytes());
        Message message3 =  new Message("BatchTopic", "Tag1", ("Hello world" + 3).getBytes());


        msgs.add(message1);
        msgs.add(message2);
        msgs.add(message3);
        
        // 5. 发送消息
        SendResult result = producer.send(msgs);

        //发送状态
        SendStatus status = result.getSendStatus();
        // 消息ID
        String msgId = result.getMsgId();

        //消息接受队列ID
        int queueId = result.getMessageQueue().getQueueId();
        //打印
        System.out.println("发送状态:" + status + ",消息ID:" + msgId + ",队列" + queueId);

        TimeUnit.SECONDS.sleep(1); //让线程睡一秒钟
        

        // 6.关闭生产者producer
        producer.shutdown();


    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

再来一个消费者

package com.dingjiaxiong.mq.rocketmq.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

// 消费者
public class Consumer {
    
    public static void main(String[] args) throws Exception {

        // 1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2. 指定Nameserver地址
        consumer.setNamesrvAddr("192.168.19.100:9876;192.168.19.102:9876");

        // 3. 订阅主题Topic和Tag
        consumer.subscribe("BatchTopic","Tag1");

        // 4. 设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接收消息内容
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : msgs) {

                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5. 启动消费者consumer
        consumer.start();

        System.out.println("消费者 启动了....");

    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

OK, 直接先启动消费者 进行监听

在这里插入图片描述

OK, 再启动 生产者

在这里插入图片描述

OK,消息发送 成功 了,是“一堆”

在这里插入图片描述

OK, 消费者这边 直接拿到了 3 条消息,这就是 批量 发送和接收

4.5.2 注意事项

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:【就是我们 刚刚演示的 情况】

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Qyk5wNHW-1672018673891)(G:\丁家雄自学javajava\12RocketMQ笔记\assets\1668998179424.png)]

但是,如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

这里 黑马老师 也没有进行 实现,了解 下就行了,批量发送时,有大小限制

官网的解释

https://rocketmq.apache.org/zh/docs/4.x/producer/08message4

其实官网文档上,已经不是 4M 了,

在这里插入图片描述

果然啊, 视频太旧了【这就是 批量消息的发送与接收】

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/715401
推荐阅读
相关标签
  

闽ICP备14008679号