赞
踩
在高并发、分布式场景下处理消息积压的问题,就像是一个繁忙的邮局在节假日前夕面对成堆的包裹和信件。想象一下,随着节日的临近,邮局里的每个角落都堆满了待发送的礼物和问候。邮递员们忙得不可开交,尽管他们尽力加快速度,包裹和信件还是越积越多。
RocketMQ,在高并发、分布式场景下的表现,也正如这个邮局的故事。当面对巨大的消息流量时,如何有效地处理积压的消息成为了一项挑战。
下面跟大家分享10个小技巧,帮助大家在工作或面试中更好的应对。
提升消费者的并发处理能力,可以有效加快消息的消费速度。
比如:提高消费者实例的并发消费的线程数
// 创建消费者实例,指定消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); // 设置NameServer地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 设置并发消费的线程数 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(40); // 订阅主题和标签 consumer.subscribe("TopicTest", "*"); // 注册消息监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // 实现消息处理逻辑 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者 consumer.start();
通过设置消费者批量消费消息,减少网络通信和消息处理的次数,提升效率。
// 设置批量消费的大小
consumer.setConsumeMessageBatchMaxSize(10);
调整消费者的拉取策略,平衡消费者的负载,避免一次拉取过多导致处理不过来。
// 设置每次拉取的间隔时间
consumer.setPullInterval(1000);
// 设置每次拉取的消息数量
consumer.setPullBatchSize(32);
对于处理逻辑复杂或耗时的消息,采用异步处理也不失为一个好办法。
比如我们可以开启一个异步线程池,慢慢处理消息的业务逻辑处理
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
msgs.forEach(msg -> CompletableFuture.runAsync(() -> {
// 异步处理消息内容
}));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
在消费端提高tag标签实现消息过滤,只消费自己需要的消息,减少无效处理。
// 订阅时只消费Tag为"TagA"或"TagB"的消息
consumer.subscribe("TopicTest", "TagA || TagB");
RocketMQ的消息持久化策略,影响消息存取的延迟。
比如如果开启了同步刷盘策略,每当有消息写入CommitLog(消息文件)后,必须立刻刷盘,确保消息持久化到磁盘。这种模式下,消息的可靠性最高,但由于IO操作的延迟,会影响消息的发送速度。
所以建议将RocketMQ的消息持久化存在策略改为异步刷盘机制
异步刷盘策略允许消息先写入内存,然后由后台线程定期将内存中的数据刷写到磁盘。这种方式提高了消息的发送速度,但在发生故障时,可能会导致最近写入的部分消息丢失。
对于不急于处理的消息,可以设置延时级别,错峰处理。
比如在双十一抢购期间,通知商家发货的消息就可以延迟一段时间再发送,避过双十一抢购峰值
// 创建消息实例,设置延时级别
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes());
msg.setDelayTimeLevel(3); // 设置延时级别
// 发送消息
producer.send(msg);
建立良好的监控和告警机制,对于一些对消息延迟比较敏感的业务,对消息挤压进行实时监控,及时响应,人工介入。
比如
增加消费者实例(加机器),此种方案仅在RocketMQ的Group大于机器数的时候
判断是否有消息堵塞,调整死信队列策略,将消息尽快丢到死信队列中
判断下游消费者应用性能是否下降,导致消息堵塞等等
合理配置消息重试机制,确保消息能够在处理失败后被再次消费,但是又不会长时间堵塞后续消息。
比如:
// 设置消息重试次数
consumer.setMaxReconsumeTimes(3);
根据监控数据,动态调整消费者资源,应对流量高峰。
线上突发消息积压时,最快的方式就是可以通过增加消费者数量或者提升机器配置实现(比如机器从4核8G升级为8核16G)
如何处理高并发、分布式场景下的消息积压,需要智慧和策略。跟大家分享的10个优化技巧策略,每一个技巧都需要根据实际情况来灵活应用,没有一劳永逸的解决方案。在实际场景中,我们需要持续监控、优化,不断调整,根据实际业务场景,选择最适合自己的解决方案。
没有最优的技术方案,只有最适合的方案。在合适的时间选择最适合的技术方案,是一个架构师的必备素质。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。