赞
踩
【黑马程序员RocketMQ系统精讲,电商分布式消息中间件,硬核揭秘双十一】
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
之前我们 做的 循环不算,那是 一条一条发的
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
来一个 全新 的生产者
package com.dingjiaxiong.mq.rocketmq.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
// 生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.指定Nameserver地址
producer.setNamesrvAddr("192.168.19.100:9876;192.168.19.102:9876");
// 3.启动producer
producer.start();
List<Message> msgs = new ArrayList<Message>();
// 4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数1:消息主题Topic
* 参数2:消息Tag
* 参数3:消息内容
* */
Message message1 = new Message("BatchTopic", "Tag1", ("Hello world" + 1).getBytes());
Message message2= new Message("BatchTopic", "Tag1", ("Hello world" + 2).getBytes());
Message message3 = new Message("BatchTopic", "Tag1", ("Hello world" + 3).getBytes());
msgs.add(message1);
msgs.add(message2);
msgs.add(message3);
// 5. 发送消息
SendResult result = producer.send(msgs);
//发送状态
SendStatus status = result.getSendStatus();
// 消息ID
String msgId = result.getMsgId();
//消息接受队列ID
int queueId = result.getMessageQueue().getQueueId();
//打印
System.out.println("发送状态:" + status + ",消息ID:" + msgId + ",队列" + queueId);
TimeUnit.SECONDS.sleep(1); //让线程睡一秒钟
// 6.关闭生产者producer
producer.shutdown();
}
}
再来一个消费者
package com.dingjiaxiong.mq.rocketmq.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
// 消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2. 指定Nameserver地址
consumer.setNamesrvAddr("192.168.19.100:9876;192.168.19.102:9876");
// 3. 订阅主题Topic和Tag
consumer.subscribe("BatchTopic","Tag1");
// 4. 设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接收消息内容
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者consumer
consumer.start();
System.out.println("消费者 启动了....");
}
}
OK, 直接先启动消费者 进行监听
OK, 再启动 生产者
OK,消息发送 成功 了,是“一堆”
OK, 消费者这边 直接拿到了 3 条消息,这就是 批量 发送和接收
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:【就是我们 刚刚演示的 情况】
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Qyk5wNHW-1672018673891)(G:\丁家雄自学javajava\12RocketMQ笔记\assets\1668998179424.png)]
但是,如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
这里 黑马老师 也没有进行 实现,了解 下就行了,批量发送时,有大小限制
官网的解释
https://rocketmq.apache.org/zh/docs/4.x/producer/08message4
其实官网文档上,已经不是 4M 了,
果然啊, 视频太旧了【这就是 批量消息的发送与接收】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。