赞
踩
在一个系统中,由生产者系统和消费者系统两个环节组成,生产者系统不停的把消息写入RocketMQ里去,然后消费者系统就负责从RocketMQ里消费消息。
该系统在生产环境的高峰期内,大概有100多万条消息进入MQ。然后消费者系统从MQ里获取消息,并依赖Redis去进行一些业务逻辑的实现。
某天在高峰期突然间出了问题。导致消费者系统也阻塞无法继续执行下去了。
消费者系统的阻塞停止运行,意味着不会再从RocketMQ里去消费数据和处理了。而此时的生产者系统依旧不间断往MQ里写入100多万的消息,那么这些消息都会积压在MQ里,而无法被及时消费和处理掉。
所以消息堆积本质
1.业务高峰期业务量激增,生产太快
2.消费者不充足
3.由于某个队列的消息迟迟不能被消费导致消费进度无法提交
4.生产者未感知Broker消费堆积持续向Broker推送消息。
如果这些消息是允许丢失的,那么就紧急修改消费者系统的代码,在代码中对所有的消息都获取到后直接丢弃,也就是不做任何的处理,这样可以快速的将积压的百多万消息处理掉,这是丢弃的方案。
而如果消息是不能丢弃掉的话,就可以考虑采取其他相应的处理方案。
假如Topic有20个MessageQueue,而目前只有4个消费者系统在消费,那么每个消费者系统会从5个MessageQueue里获取消息,所以此时仅仅依靠4个消费者系统是不能满足快速处理MQ中积压的百多万消息的。
此时可以再申请16台机器去部署16台消费者系统实例,然后20个消费者系统同时消费,每个实例消费一个MessageQueue的消息,此时的消费速度就会提高了5倍,很快就能将积压的百多万消息处理掉。
假如Topic有8个MessageQueue,而集群中的消费者系统也是正好8个,那么就是每个消费者系统去消费一个MessageQueue里的消息,此时就无法使用上面增加消费者实例的方案来提高消费速度了。
此时应该修改4个消费者系统的代码,让他们取到消息后直接把消息写入一个新的Topic,这样速度就会很快了,因为只是读写MQ而已。
然后新的Topic有20个MessageQueue,再部署20台消费者实例,去消费新的Topic的消息后写入到Redis中,这样就可以快速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理。
为了让消费实例的数量与Queue数量相同,微众基于RocketMQ定制化开发了队列随消费实例动态增减的特性。实现方式是动态调整Topic配置的ReadQueueNum和WriteQueueNum,在扩容时,首先增加可读队列个数,保证Consumer先完成监听,再增加可写队列个数,使得Producer可以往新增加的队列发消息。
队列缩容与扩容的过程相反,先缩小可写队列个数,不再往即将缩掉的队列发消息,等到Consumer将该队列里的消息全部消费完成后,再缩小可读队列个数,完成缩容过程。
假如Topic有8个MessageQueue,而集群中的消费者系统也是正好8个,那么就是每个消费者系统去消费一个MessageQueue里的消息。
此时可以先动态扩容可读队列数量为24,保证Consumer先完成监听,再增加可写队列个数为24,使得Producer可以往新增加的队列发消息。
然后再部署16台消费者, 和之前的8台消费者一起消费消息,此时消费速率可以提升到3倍。
参考链接:https://blog.csdn.net/weixin_42405670/article/details/119153035
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。