赞
踩
目录
上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:
rocketmq的物理部署结构:
通信关系:
为提高消息读写并发能力,将一个topic消息进行拆分,kafka称为分区,rocketmq称为队列。
所以RocketMQ需要2次寻找,第一次先找队列中的消息概要信息,拿到概要信息中的offset,根据这个offset再到混杂文件中找到想要的消息。而kafka则只需要直接读取分区中的文件即可得到想要的消息。
RocketMq并不会立即删除消息,所以消息是可以被重复消费的。 RocketMq的消息时定期清除,默认3天。
Producer端如何来发现新的broker地址。
对于kafka来说:Producer端需要配置broker的列表地址,Producer也从一个broker中来更新broker列表地址(从中发现新加入的broker)。
对于RocketMQ来说:Producer端需要Name Server的列表地址,同时还可以定时从一个HTTP地址中来获取最新的Name Server的列表地址,然后从其中的一台Name Server来获取全部的路由信息,从中发现新的broker。
对于kafka:Consumer将消费的offset定时存储到ZooKeeper上,利用ZooKeeper保障了offset的高可用问题。
对于RocketMQ:Consumer将消费的offset定时存储到broker所在的机器上,这个broker优先是master,如果master挂了的话,则会选择slave来存储,broker也是将这些offset定时刷新到本地磁盘上,同时slave会定时的访问master来获取这些offset。
对于负载均衡,在出现分区或者队列增加或者减少的时候、Consumer增加或者减少的时候都会进行reblance操作。
对于RocketMQ:客户端自己会定时对所有的topic的进行reblance操作,对于每个topic,会从broker获取所有Consumer列表,从broker获取队列列表,按照负载均衡策略,计算各自负责哪些队列。这种就要求进行负载均衡的时候,各个Consumer获取的数据是一致的,不然不同的Consumer的reblance结果就不同。
对于kafka:kafka之前也是客户端自己进行reblance,依靠ZooKeeper的监听,来监听上述2种情况的出现,一旦出现则进行reblance。现在的版本则将这个reblance操作转移到了broker端来做,不但解决了RocketMQ上述的问题,同时减轻了客户端的操作,是的客户端更加轻量级,减少了和其他语言集成的工作量
Name Server和ZooKeeper的作用大致是相同的,从宏观上来看,Name Server做的东西很少,就是保存一些运行数据,Name Server之间不互连,这就需要broker端连接所有的Name Server,运行数据的改动要发送到每一个Name Server来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了Name Server很轻量级,但是broker端就要做更多的东西了。
而ZooKeeper呢,broker只需要连接其中的一台机器,运行数据分发、一致性都交给了ZooKeeper来完成。
每一个Topic默认会创建多个队列,消息发送默认是会采用轮询的方式发送到不通的MQ,而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费,多个消费者同时消费多个MQ。如果要保证顺序消息只要满足下面两点:
1.producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。
消息发送到同一个队列,和消费者消费同一个队列的消息这两个条件比较好满足。保证producer端发送消息的有序,这点需要我们应用自己控制,消息发送有重试机制,在网络不稳定的情况下,第一个消息发送失败,在没有收到服务器发送成功的消息之前,是不能发送第二个消息的,不然会导致消息无序。
producer
- package com.yunsheng.orderExample;
-
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.client.producer.MQProducer;
- import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
- import com.alibaba.rocketmq.client.producer.SendResult;
- import com.alibaba.rocketmq.common.message.Message;
- import com.alibaba.rocketmq.common.message.MessageQueue;
-
- import java.util.List;
-
-
- public class OrderedProducer {
- public static void main(String[] args) throws Exception {
- //Instantiate with a producer group name.
- DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
- producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
- //Launch the instance.
- producer.start();
- for (int i = 0; i < 10; i++) {
- int orderId = 0;
- //Create a message instance, specifying topic, tag and message body.
- Message msg = new Message("TopicOrder","TagA", "KEY" + i,
- ("Hello RocketMQ " + i).getBytes());
- SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- Integer id = (Integer) arg;
- int index = id % mqs.size();
- return mqs.get(index);
- }
- }, orderId);
-
- System.out.printf("%s%n", sendResult);
- }
- //server shutdown
- producer.shutdown();
- }
- }
解析:
要保证消息的顺序性,在发送消息时,这一组消息必须发送到同一个queue中。(一个broker默认4个queue)。
在上面的代码中,orderId表示一个订单号。
在send方法中实现了一个选择器。这个选择器的作用就是根据orderId对queue的数量取模,保证同一个orderId的所有消息落到同一个queue上。
consumer
- package com.yunsheng.orderExample;
-
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.MessageExt;
- import com.yunsheng.Factory;
-
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicLong;
-
- public class OrderedConsumer {
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
- consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- consumer.subscribe("TopicOrder", "TagA");
-
- consumer.registerMessageListener(new MessageListenerOrderly() {
-
- Random random = new Random(10);
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeOrderlyContext context) {
- context.setAutoCommit(true);
-
- for (MessageExt msg : msgs) {
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");
-
- }
-
- try {
- Thread.sleep(random.nextInt());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
-
- }
- });
-
- consumer.start();
-
- System.out.printf("Consumer Started.%n");
- }
- }
解析:
上面保证了生产端的消息顺序性,那么消费端必须保证消息被顺序的消费。使用MessageListenerOrderly。作用是,必须等前面的消息消费完,后面的消息才能进行消费。
在代码里加了sleep验证。
- ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
- ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
- ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
- ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
- ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
- ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
- ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
- ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
- ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9
可以看到并不是单线程处理的,但是保证了顺序消费。
RocketMQ基于二阶段提交方式来实现事务消息,可以保证消息发送和本地事件逻辑同时成功和失败。我们看下面的步骤:
producer收到回查请求后,根据本地事件执行结果重新发送二次确认消息。
如果希望消息被所有的订阅者消费,可以使用广播机制。
- package com.yunsheng.broadcast;
-
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.client.producer.SendResult;
- import com.alibaba.rocketmq.common.message.Message;
-
- public class BroadcastProducer {
- public static void main(String[] args) throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
- producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- Message msg = new Message("broadcastTopic",
- "TagA",
- "OrderID188",
- "Hello world".getBytes());
- SendResult sendResult = producer.send(msg);
- System.out.printf("%s%n", sendResult);
- }
- producer.shutdown();
- }
- }
producer就是最简单的发送代码。
- package com.yunsheng.broadcast;
-
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.MessageExt;
- import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
- import java.util.List;
-
- public class BroadcastConsumer1 {
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
- consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- //set to broadcast mode
- consumer.setMessageModel(MessageModel.BROADCASTING);
-
- consumer.subscribe("broadcastTopic", "TagA || TagC || TagD");
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- consumer.start();
- System.out.printf("Broadcast Consumer Started.%n");
- }
- }
再复制一份作BroadcastConsumer2
测试
启动BroadcastConsumer1,BroadcastConsumer2,BroadcastProducer。
可以看到两个consumer都消费了10个消息。
将consumer的consumer.setMessageModel(MessageModel.BROADCASTING);注释掉。
再测试一次,可以发现两个consumer分别消费了部分消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。