当前位置:   article > 正文

消息队列选型:rocketmq or rabbitmq_rabiitmq和 rocketmq用哪个

rabiitmq和 rocketmq用哪个

1.目前我们用的activemq,面临一些问题

  • activemq高可用基于leveldb的复制,但activemq从5.*开始,leveldb已经不再有任何更新

The LevelDB store has been deprecated and is no longer supported or recommended for use. The recommended store is KahaDB
但官方推荐的KahaDB存储方案,目前没有原生的高可用支持。

  • 我司使用leveldb过程中,遇到一些怪异的问题:队列被消费后,日志不自动清理;删除queue后日志才清理;日志手动清理后,activemq记录的“空间使用百分比”不减少,需要重启才能显示正确的值;“空间使用百分比”一旦100%,集群将不可用。官网上很多人提了类似issue,但都已经不再支持处理。
  • leveldb不支持延迟消息。
  • leveldb在节点failover时,会出现重启后“空间使用百分比”一直不减少的情况,只有将整个集群停止后,清空所有leveldb目录,重新初始化才能解决(预计整个集群停服1分钟)

基于以上原因,我建议考虑新的队列方案:rabbitmq或者rocketmq。

2.rocketmq介绍

  • Nameserver负责broke的自动发现(和dubbo中的zk角色类似);nameserver在rocketmq的早期版本是直接用zk做的,后来自己开发;nameserver无状态,可任意个节点。
  • Producer cluster,可以理解为一个应用的n个节点,rocketmq中用producer cluster的主要作用是一个produder挂掉后,可以通知别的producer继续执行其未完成的事务;
  • Consumer cluster,也可理解为一个应用的n个节点,主要是实现n个consumer的负载均衡。
  • 1个cluster中可以n个topic,1个topic中可以有n个broken,1个broken中可以有1个master和n个slave。
  • Producer向topic 发送消息,n个broken负载均衡,横向扩展;某个broken中的master宕机后,master被摘除,消息负载到其他broken,但原broken的slave仍然可以提供消费服务。

  • Rocketmq中的queue和别的mq不同,不是activemq或rabbitmq中的给特定生产者和消费者配对的逻辑对象,而是为了提高并发性设置的消息存储的sharding。
  • 生产者向每个queue轮询发送消息,消费者负载均衡地消费这些queue;每个broken节点中的queue数目默认16,可自定义,数目必须超过consumer数,否则会出现多出的consumer无法消费的情况。
  • Rocketmq中会对每个消息加特定的tag,这些tag和别的mq中的queue的概念类似,用于生产者和消费者配对特定的消息。

示例代码

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("sina-www");
  4. producer.setNamesrvAddr("10.40.20.200:9876,10.40.20.201:9876");
  5. producer.start();
  6. for (int i = 0; i < 100000; i++) {
  7. Message msg = new Message("topic1", "sina.www.abc", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  8. SendResult sendResult = producer.send(msg);
  9. System.out.printf("%s%n", sendResult);
  10. }
  11. producer.shutdown();
  12. }
  13. }
  14. public class Consumer {
  15. public static void main(String[] args) throws InterruptedException, MQClientException {
  16. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sina-www");
  17. consumer.setNamesrvAddr("10.40.20.200:9876,10.40.20.201:9876");
  18. consumer.subscribe("topic1", "sina.www.abc");
  19. consumer.registerMessageListener(new MessageListenerConcurrently() {
  20. @Override
  21. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
  22. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  23. }
  24. });
  25. consumer.start();
  26. }
  27. }

3.rabbitmq介绍

  • Exchange的职责类似于交换机,提供数据交换功能,将数据分发到特定的queue。
  • Binding: exchange和queue的绑定关系; exchange的不同类型,加上binging配置的routekey,可以实现exchange与queue的任意组合分发规则。
  • 一个cluster中每个节点都有相同的exchange和binding信息。
  • 每个exchange必须将消息发送至master queue,不管master是否与该exchange在一个节点;Master queue与slave queue自动同步,queue副本数可以指定,可以少于node数;Node宕机后,node上的master queue对应的别的节点的副本会升级为master。
     

示例程序

  1. public class Producer {
  2. private static String queueName = "sina.goods.updatestock";
  3. public static void main(String[] args) throws Exception{
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("10.40.20.203");
  6. factory.setUsername("guest");
  7. factory.setPassword("guest");
  8. factory.setVirtualHost("/");
  9. factory.setPort(5672);
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.queueDeclare(queueName, true, false, false, null);
  13. channel.queueBind(queueName,"lbexchange", queueName);
  14. for (int i = 0; i < 100000; i++) {
  15. String message = "hello world + " + i;
  16. channel.basicPublish("lbexchange", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  17. System.out.println(" [x] Sent '" + i + "' ");
  18. }
  19. channel.close();
  20. connection.close();
  21. }
  22. }
  23. public class Consumer {
  24. private static String queueName = "sina.goods.updatestock";
  25. public static void main(String[] args) throws Exception {
  26. ConnectionFactory factory = new ConnectionFactory();
  27. factory.setHost("10.40.20.203");
  28. factory.setUsername("guest");
  29. factory.setPassword("guest");
  30. factory.setVirtualHost("/");
  31. factory.setPort(5672);
  32. Connection connection = factory.newConnection();
  33. Channel channel = connection.createChannel();
  34. channel.queueDeclare(queueName, true, false, false, null);
  35. QueueingConsumer consumer = new QueueingConsumer(channel);
  36. int prefetchCount = 1;
  37. channel.basicQos(prefetchCount);
  38. channel.basicConsume(queueName, true, consumer);
  39. while (true) {
  40. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  41. String message = new String(delivery.getBody());
  42. }
  43. }
  44. }

4.rabbitmq性能测试

因为rocketmq的性能肯定比rabbitmq强,因此只测试了rabbitmq的性能。
测试场景:10个生产者,10个消费者,消费者自动ACK,2节点集群,服务器配置8C8G
压测工具:github.com/rabbitmq/rabbitmq-perf-test
以下分别测试消息大小为10字节和1K、直连主节点和直连从节点,一共4种组合。

消息大小10字节,直连主节点
Cpu 负载:主65%,从25%

消息大小1K,直连主节点
Cpu 负载:主65%,从25%

消息大小10字节,直连从节点
Cpu 负载:主35%,从40%

消息大小1K,直连从节点
Cpu 负载:主30%,从35%

消息积压的表现
关闭消费者,让消息积压,消息大小1K

因为内存限制,消息积压到一定程度要page out到磁盘,因此生产者发送消息会有明显的抖动(page out时会block)。

5.使用总结

Rocketmq

  1. Rocketmq为高并发海量数据设计,架构很先进,支持容量横向扩展。
  2. Rocketmq没有我们传统意义上的queue对象,而是通过tag实现消息的分类。消息的负载数据(enqueue\dequeue\pending等)无法精确到tag维度,只能到broken或consumer group。
  3. 控制台没有包含到原生项目,而是在rocketmq-externals中;控制台更新缓慢,2017年6月第一版后再无新的版本;控制台做得很差
  4. 没有完全遵守AMQP,只支持java客户端;官方文档简陋。

Rabbitmq

  1. 一个2节点的集群能保守提供6-7K/S的请求量,性能满足要求(目前我司还没有单个应用队列吞吐超过该数量的请求);
  2. 准守AMQP 协议,支持多种客户端,如:Python、Ruby、.NET、Java、C、PHP等;
  3. 控制台很友好,很强大,能看每个queue最近24小时的生产、消费、积压走势图
     
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/603569
推荐阅读
相关标签
  

闽ICP备14008679号