赞
踩
前言:批量发送和消费消息在一定程度上可以提高吞吐量,减少带宽,那么Rocketmq 中的消息怎么进行批量的发送和批量的消费呢;
1 消息的批量发送:
1.1 批量发送的优点以及实现:
批量发送消息可以提高 RocketMQ 的生产者性能和吞吐量。由于批量发送消息可以减少网络 I/O 操作和降低消息发送延迟,因此它在以下情况下特别有用:
但是,批量发送消息也存在一些注意事项,需要注意以下几点:
批量发送消息是一种提高 RocketMQ 生产者性能和吞吐量的好方法,但需要注意消息列表大小和错误处理机制,以确保生产者的可靠性和稳定性。
public class SimpleBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support String topic = "BatchTest"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes())); producer.send(messages); } }
1.2 批量发送消息为什么要限制maxMessageSize :
消息列表的大小不能超过生产者设置的 maxMessageSize 参数,主要是为了避免消息发送延迟和消息过大导致 broker 出现性能问题。如果尝试发送大于 maxMessageSize 的消息,RocketMQ 会抛出 MessageTooLargeException 异常,并且消息不会被发送到 broker。
如果开发者在开发时遇到了消息列表大小超过 maxMessageSize 的情况,可以考虑以下几种处理方式:
提升 maxMessageSize 参数的大小,这样可以容纳更大的消息列表。但是,需要注意在提升参数大小时,要考虑到 RocketMQ broker 的性能和网络带宽等因素。
考虑将消息列表进行拆分,然后分批发送。这样可以避免一次发送过多的消息。
计算消息的大小并进行压缩。可以使用一些压缩算法,如 LZ4、Snappy 等,对消息进行压缩,以减小消息的大小。
对超过 maxMessageSize 的消息进行过滤或其他处理。可以通过业务逻辑对消息进行分组或分类,对超过 maxMessageSize 的消息进行过滤或其他处理,以避免发送超出限制的消息。
开发者在开发时需要注意消息列表的大小限制,避免出现超出限制的情况。
2 消息的批量消费:
2.1 批量消费的优点:
批量消费消息可以提高 RocketMQ 的消费者性能和吞吐量,因为批量消费消息可以减少网络 I/O 操作和降低消息消费延迟。批量消费消息在以下情况下特别有用:
批量消费消息是一种提高 RocketMQ 消费者性能和吞吐量的好方法,但需要注意消息列表大小、消息顺序和事务性质等问题,以确保消费者的可靠性和稳定性。
2.2 推、拉和长轮询:MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull
Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
2.3 对pull(拉模式)的批量消费:
DefaultLitePullConsumer是RocketMQ中的拉模式消息消费者,其工作流程如下:
在使用DefaultLitePullConsumer时,需要注意控制拉取消息的速率(比如使用定时任务调用poll()方法)及消息处理的并发能力。根据实际业务去实现适当的处理策略,保证在消费者速率和处理能力之间达到一个平衡。
demo:
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setNamesrvAddr("localhost:9876");
litePullConsumer.subscribe("test_topic", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
首先还是初始化DefaultLitePullConsumer并设置ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点。在subscribe模式下,同一个消费组下的多个LitePullConsumer会负载均衡消费。
2.3 对push(推模式)的批量消费:DefaultMQPushConsumer的工作流程如下:
请注意,在使用DefaultMQPushConsumer时,消费者的并发能力由MessageListener实现来保证。因此,在设计MessageListener实现时需要考虑到高并发处理能力。虽然RocketMQ客户端提供了设置消费线程池的配置选项,但还是推荐根据实际需求来实现合适的并发方案。‘
2.3.1 使用DefaultMQPushConsumer :
批量消费消息需要在消费者端设置 ConsumeMessageBatchMaxSize 参数,以指定每次批量消费的消息数量。
public static void main(String[] args) throws InterruptedException, MQClientException { // step1: 创建一个DefaultMQPushConsumer实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchPushConsumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumeMessageBatchMaxSize(10); // 设置每次批量消费的消息数量 // step2: 为消费者订阅一个Topic consumer.subscribe("test_topic", "*"); // step3: 注册一个MessageListenerConcurrently,并实现批量消费逻辑 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 处理批量消息 for (MessageExt msg : msgs) { // 在此处理每条消息,例如保存到数据库等 System.out.println("msg : " + new String(msg.getBody())); } // 返回消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // step4: 启动消费者 consumer.start(); System.out.println("Consumer started!"); // 让主线程等待以保持进程不退出 TimeUnit.SECONDS.sleep(60); }
在上述示例中,每次批量消费的消息数量被设置为10。每次推送过来的消息数量可能并不总是达到这个数字,但是它不会超过这个数量。如果希望调整批量大小,可以通过consumer.setConsumeMessageBatchMaxSize();修改这个值。
2.3.2 :springboot 通过@RocketMQMessageListener 完成消息消费:
:默认的 RocketMQ Spring Boot Starter 并不支持直接设置批量消费模式,消息是一个个处理的。对于RocketMQ Listener可以管理多个线程同时处理消息。可以有多个消息同时处理。通过将顺序消息设置为并发模式并设置消费线程数。
import com.example.demo.MessageProcessor; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(topic = "my-topic", consumerGroup = "myConsumerGroup", consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 3) public class MyConsumer implements RocketMQListener<MessageExt> { @Autowired private MessageProcessor messageProcessor; @Override public void onMessage(MessageExt messageExt) { messageProcessor.process(messageExt); } }
在这个例子中,consumeMode = ConsumeMode.CONCURRENTLY表示消费者将启用并发模式,而consumeThreadMax = 3将同时处理三个消息。注意,这不是真正意义上的批量消费,而是通过多线程来同时处理多个消息。要实现批量消费,需要进一步处理MessageExt消息,这取决于的实际需求。例如,可以缓冲消息,等待足够多的消息可用后一次性处理它们。
在设置 consumeThreadMax 参数时,请确保它不要过大,以避免系统资源过载。同时,优化MessageProcessor中的相关逻辑,尽量减少处理每个消息所需的时间。通过这种机制,虽然无法实现真正意义上的批量消费,但仍然可以帮助提高消息处理的效率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。