赞
踩
The LevelDB store has been deprecated and is no longer supported or recommended for use. The recommended store is KahaDB
但官方推荐的KahaDB存储方案,目前没有原生的高可用支持。
基于以上原因,我建议考虑新的队列方案:rabbitmq或者rocketmq。
示例代码
- public class Producer {
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("sina-www");
- producer.setNamesrvAddr("10.40.20.200:9876,10.40.20.201:9876");
- producer.start();
- for (int i = 0; i < 100000; i++) {
- Message msg = new Message("topic1", "sina.www.abc", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
- producer.shutdown();
- }
- }
-
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sina-www");
- consumer.setNamesrvAddr("10.40.20.200:9876,10.40.20.201:9876");
- consumer.subscribe("topic1", "sina.www.abc");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- }
- }
示例程序
- public class Producer {
- private static String queueName = "sina.goods.updatestock";
- public static void main(String[] args) throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("10.40.20.203");
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setVirtualHost("/");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName,"lbexchange", queueName);
- for (int i = 0; i < 100000; i++) {
- String message = "hello world + " + i;
- channel.basicPublish("lbexchange", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
- System.out.println(" [x] Sent '" + i + "' ");
- }
- channel.close();
- connection.close();
- }
- }
-
- public class Consumer {
- private static String queueName = "sina.goods.updatestock";
- public static void main(String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("10.40.20.203");
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setVirtualHost("/");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(queueName, true, false, false, null);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- int prefetchCount = 1;
- channel.basicQos(prefetchCount);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- }
- }
- }
因为rocketmq的性能肯定比rabbitmq强,因此只测试了rabbitmq的性能。
测试场景:10个生产者,10个消费者,消费者自动ACK,2节点集群,服务器配置8C8G
压测工具:github.com/rabbitmq/rabbitmq-perf-test
以下分别测试消息大小为10字节和1K、直连主节点和直连从节点,一共4种组合。
消息大小10字节,直连主节点
Cpu 负载:主65%,从25%
消息大小1K,直连主节点
Cpu 负载:主65%,从25%
消息大小10字节,直连从节点
Cpu 负载:主35%,从40%
消息大小1K,直连从节点
Cpu 负载:主30%,从35%
消息积压的表现
关闭消费者,让消息积压,消息大小1K
因为内存限制,消息积压到一定程度要page out到磁盘,因此生产者发送消息会有明显的抖动(page out时会block)。
Rocketmq
Rabbitmq
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。