赞
踩
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
生产者发送消息调用的方法是:send(Collection<Message> msgs)
,与发送同步消息的区别是批量消息发送传入了一个消息的集合。
待发送的消息有如下的限制条件:
public class BatchProducer { public static void main(String[] args) throws Exception { // 1.关联生产者组 DefaultMQProducer defaultMQProducer = new DefaultMQProducer(MyRocketMqConstant.BatchLearn.BATCH_LEARN_PRODUCER_GROUP); // 2.关联注册中心 defaultMQProducer.setNamesrvAddr(MyRocketMqConstant.NAME_SRV); // 3.启动生产者 defaultMQProducer.start(); // 构造消息 List<Message> messages = buildMessages(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 4.发送消息 SendResult sendResult = defaultMQProducer.send(messages); System.out.println(String.format("SendResult status:%s,queueId:%d,body:%s" ,sendResult.getSendStatus() ,sendResult.getMessageQueue().getQueueId() ,"批量消息体....."+sdf.format(new Date()))); // 5.关闭生产者 defaultMQProducer.shutdown(); } public static List<Message> buildMessages() { List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { String str = "Hello World " + i; Message message = new Message(MyRocketMqConstant.BatchLearn.BATCH_LEARN_TOPIC , MyRocketMqConstant.BatchLearn.BATCH_LEARN_TAG_A , "KEY " + i , str.getBytes(StandardCharsets.UTF_8)); messages.add(message); } return messages; } }
public class BatchConsumer { public static void main(String[] args) throws Exception { // 1.关联消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MyRocketMqConstant.BatchLearn.BATCH_LEARN_CONSUMER_GROUP); // 2.关联注册中心 consumer.setNamesrvAddr(MyRocketMqConstant.NAME_SRV); // 3.订阅消息 consumer.subscribe(MyRocketMqConstant.BatchLearn.BATCH_LEARN_TOPIC, MyRocketMqConstant.BatchLearn.BATCH_LEARN_TAG_A); // 4.注册监听器 consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { Date msgDate = new Date(msg.getStoreTimestamp()); Date now = new Date(); System.out.println("[consumerThread=" + Thread.currentThread().getName() + "] [broker=" + msg.getBrokerName() + msg.getStoreHost() + "] [queueId=" + msg.getQueueId() + "] [now" + sdf.format(now) + " storeDate" + sdf.format(msgDate) + "] [content=" + new String(msg.getBody()) + "]"); } try { TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); // 5.启动消费者 System.out.println("消费者启动"); consumer.start(); } }
我将生产者启动了两次,一次是09:51:47,一次是09:55:09。能够发现两次批量消息放入的消息队列id不相同,且在各自的队列中是有序的。
[consumerThread=ConsumeMessageThread_3] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:12 storeDate2021-04-19 09:51:47] [content=Hello World 46]
[consumerThread=ConsumeMessageThread_3] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:14 storeDate2021-04-19 09:51:47] [content=Hello World 47]
[consumerThread=ConsumeMessageThread_4] [broker=broker-a/192.168.15.15:10911] [queueId=3] [now2021-04-19 09:55:14 storeDate2021-04-19 09:55:09] [content=Hello World 0]
[consumerThread=ConsumeMessageThread_3] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:19 storeDate2021-04-19 09:51:47] [content=Hello World 48]
[consumerThread=ConsumeMessageThread_4] [broker=broker-a/192.168.15.15:10911] [queueId=3] [now2021-04-19 09:55:22 storeDate2021-04-19 09:55:09] [content=Hello World 1]
[consumerThread=ConsumeMessageThread_5] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:28 storeDate2021-04-19 09:51:47] [content=Hello World 49]
摘录RocketMQ示例中的描述:
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下
我们需要知道其中的大小限制是什么?
org.apache.rocketmq.client.exception.MQClientException: CODE: 13 DESC: the message body size over max value, MAX: 4194304
public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1024 * 1024 *4; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { // 获取开始切割的消息索引(消息字节小于4MB的消息索引) int startIndex = getStartIndex(); // 切割结束位置 int nextIndex = startIndex; // 临时存放切割出的消息列表的总字节数 int totalSize = 0; // 切割消息列表,从起始位置切割,直到切割出的消息列表的总字节数超过4MB为止 for (; nextIndex < messages.size() ; nextIndex++) { // 计算遍历的消息的字节并累加,当超过4MB时退出循环,记录nextIndex;用于切分list Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT){ break; }else { totalSize += tmpSize; } } List<Message> subList = this.messages.subList(startIndex, nextIndex); // 存放切割结束位置 currIndex = nextIndex; return subList; } /** * 计算开始切割的位置,切割的对象是消息列表,最小粒度就是一个消息对象(一条消息的字节大于4MB不会对该消息对象进行切割) * 获取起始切割位置就是找到单条消息字节小于4MB的消息索引。对大于4MB的消息跳过。 */ private int getStartIndex(){ // 计算开始索引的消息的字节数 Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); // 下一个开始位置的消息的字节数不能大于4MB while (tmpSize > SIZE_LIMIT) { currIndex ++; Message message = messages.get(currIndex); tmpSize = calcMessageSize(message); } return currIndex; } private int calcMessageSize(Message message){ int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } // 增加日志的开销20字节 tmpSize = tmpSize+20; return tmpSize; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。