当前位置:   article > 正文

RocketMQ原理_rocketmqmessagelistener

rocketmqmessagelistener

目录

RocketMQ概览

简介

支持的特性

架构图

对比其他mq

消息存储

producer端发现

消费offset的存储

consumer负载均衡

Name Server和zk

RocketMQ顺序消息

如何保证顺序消息

RocketMQ实现事务消息

RocketMQ广播消息

生产者

消费者


 

 

 

 

RocketMQ概览

概览

上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:

  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
  • Producer、Consumer、队列都可以分布式。
  • Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 较少的依赖

rocketmq的物理部署结构:

  • Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

支持的特性

  • 发布/订阅。
  • 优先级,支持队列优先级,数字表达。
  • 消息顺序的严格保证。
  • 消息过滤,Broker端过滤:支持类型,tag,语法表达式过滤;Consumer端过滤:自定义实现即可。
  • 持久化,充分利用linux系统内存cache提升性能。
  • 消息可靠性,支持异步,同步双写。
  • 低延迟,在消息不堆积情况下,消息到达Broker后,能立刻到达Consumer。RocketMQ使用长轮询Pull方式 可保证消息非常实时,消息实时性不低于Push。
  • 每个消息必须消费并ack一次。
  • 队列持久化,定期删除某段时间之前的数据。
  • 回溯消费,支持往前,往后,按照时间,可达毫秒级别。
  • 消息堆积,
  • 分布式事务,根据offset更改msg状态。
  • 定时消息,支持级别,5s,5s,1m。
  • 消息重试,

架构图

  • producer集群:拥有相同的producerGroup,一般来讲,Producer不必要有集群的概念,这里的集群仅仅在RocketMQ的分布式事务中有用到
  • Name Server集群:提供topic的路由信息,路由信息数据存储在内存中,broker会定时的发送路由信息到nameserver中的每一个机器,来进行更新,所以name server集群可以简单理解为无状态(实际情况下可能存在每个nameserver机器上的数据有短暂的不一致现象,但是通过定时更新,大部分情况下都是一致的)
  • broker集群:一个集群有一个统一的名字,即brokerClusterName,默认是DefaultCluster。一个集群下有多个master,每个master下有多个slave。master和slave算是一组,拥有相同的brokerName,不同的brokerId,master的brokerId是0,而slave则是大于0的值。master和slave之间可以进行同步复制或者是异步复制。
  • consumer集群:拥有相同的consumerGroup。

通信关系:

对比其他mq

消息存储

为提高消息读写并发能力,将一个topic消息进行拆分,kafka称为分区,rocketmq称为队列。

  • 对于kafka:为了防止一个分区的消息文件过大,会拆分成一个个固定大小的文件,所以一个分区就对应了一个目录。分区与分区之间是相互隔离的。
  • 对于RocketMQ:RocketMQ采用了单一的日志文件,即把同1台机器上面所有topic的所有queue的消息,存放在一个文件里面,从而避免了随机的磁盘写入。所有消息都存在一个单一的CommitLog文件里面,然后有后台线程异步的同步到ConsumeQueue,再由Consumer进行消费。所有topic的数据混在一起进行存储,默认超过1G的话,则重新创建一个新的文件。消息的写入过程即写入该混杂的文件中,然后又有一个线程服务,在不断的读取分析该混杂文件,将消息进行分拣,然后存储在对应队列目录中(存储的是简要信息,如消息在混杂文件中的offset,消息大小等)
  • 所以RocketMQ需要2次寻找,第一次先找队列中的消息概要信息,拿到概要信息中的offset,根据这个offset再到混杂文件中找到想要的消息。而kafka则只需要直接读取分区中的文件即可得到想要的消息。

  • RocketMq并不会立即删除消息,所以消息是可以被重复消费的。 RocketMq的消息时定期清除,默认3天。

producer端发现

Producer端如何来发现新的broker地址。

  • 对于kafka来说:Producer端需要配置broker的列表地址,Producer也从一个broker中来更新broker列表地址(从中发现新加入的broker)。

  • 对于RocketMQ来说:Producer端需要Name Server的列表地址,同时还可以定时从一个HTTP地址中来获取最新的Name Server的列表地址,然后从其中的一台Name Server来获取全部的路由信息,从中发现新的broker。

消费offset的存储

  • 对于kafka:Consumer将消费的offset定时存储到ZooKeeper上,利用ZooKeeper保障了offset的高可用问题。

  • 对于RocketMQ:Consumer将消费的offset定时存储到broker所在的机器上,这个broker优先是master,如果master挂了的话,则会选择slave来存储,broker也是将这些offset定时刷新到本地磁盘上,同时slave会定时的访问master来获取这些offset。


consumer负载均衡

对于负载均衡,在出现分区或者队列增加或者减少的时候、Consumer增加或者减少的时候都会进行reblance操作。

  • 对于RocketMQ:客户端自己会定时对所有的topic的进行reblance操作,对于每个topic,会从broker获取所有Consumer列表,从broker获取队列列表,按照负载均衡策略,计算各自负责哪些队列。这种就要求进行负载均衡的时候,各个Consumer获取的数据是一致的,不然不同的Consumer的reblance结果就不同。

  • 对于kafka:kafka之前也是客户端自己进行reblance,依靠ZooKeeper的监听,来监听上述2种情况的出现,一旦出现则进行reblance。现在的版本则将这个reblance操作转移到了broker端来做,不但解决了RocketMQ上述的问题,同时减轻了客户端的操作,是的客户端更加轻量级,减少了和其他语言集成的工作量

Name Server和zk

Name Server和ZooKeeper的作用大致是相同的,从宏观上来看,Name Server做的东西很少,就是保存一些运行数据,Name Server之间不互连,这就需要broker端连接所有的Name Server,运行数据的改动要发送到每一个Name Server来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了Name Server很轻量级,但是broker端就要做更多的东西了。

而ZooKeeper呢,broker只需要连接其中的一台机器,运行数据分发、一致性都交给了ZooKeeper来完成。

 

RocketMQ顺序消息

 

如何保证顺序消息

每一个Topic默认会创建多个队列,消息发送默认是会采用轮询的方式发送到不通的MQ,而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费,多个消费者同时消费多个MQ。如果要保证顺序消息只要满足下面两点:

1.producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

消息发送到同一个队列,和消费者消费同一个队列的消息这两个条件比较好满足。保证producer端发送消息的有序,这点需要我们应用自己控制,消息发送有重试机制,在网络不稳定的情况下,第一个消息发送失败,在没有收到服务器发送成功的消息之前,是不能发送第二个消息的,不然会导致消息无序。

 

producer

  1. package com.yunsheng.orderExample;
  2. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  3. import com.alibaba.rocketmq.client.producer.MQProducer;
  4. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
  5. import com.alibaba.rocketmq.client.producer.SendResult;
  6. import com.alibaba.rocketmq.common.message.Message;
  7. import com.alibaba.rocketmq.common.message.MessageQueue;
  8. import java.util.List;
  9. public class OrderedProducer {
  10. public static void main(String[] args) throws Exception {
  11. //Instantiate with a producer group name.
  12. DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
  13. producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
  14. //Launch the instance.
  15. producer.start();
  16. for (int i = 0; i < 10; i++) {
  17. int orderId = 0;
  18. //Create a message instance, specifying topic, tag and message body.
  19. Message msg = new Message("TopicOrder","TagA", "KEY" + i,
  20. ("Hello RocketMQ " + i).getBytes());
  21. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  22. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  23. Integer id = (Integer) arg;
  24. int index = id % mqs.size();
  25. return mqs.get(index);
  26. }
  27. }, orderId);
  28. System.out.printf("%s%n", sendResult);
  29. }
  30. //server shutdown
  31. producer.shutdown();
  32. }
  33. }

解析:
要保证消息的顺序性,在发送消息时,这一组消息必须发送到同一个queue中。(一个broker默认4个queue)。
在上面的代码中,orderId表示一个订单号。
在send方法中实现了一个选择器。这个选择器的作用就是根据orderId对queue的数量取模,保证同一个orderId的所有消息落到同一个queue上。


consumer

  1. package com.yunsheng.orderExample;
  2. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  5. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
  6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  7. import com.alibaba.rocketmq.common.message.MessageExt;
  8. import com.yunsheng.Factory;
  9. import java.util.List;
  10. import java.util.Random;
  11. import java.util.concurrent.atomic.AtomicLong;
  12. public class OrderedConsumer {
  13. public static void main(String[] args) throws Exception {
  14. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  15. consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
  16. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  17. consumer.subscribe("TopicOrder", "TagA");
  18. consumer.registerMessageListener(new MessageListenerOrderly() {
  19. Random random = new Random(10);
  20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
  21. ConsumeOrderlyContext context) {
  22. context.setAutoCommit(true);
  23. for (MessageExt msg : msgs) {
  24. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");
  25. }
  26. try {
  27. Thread.sleep(random.nextInt());
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. return ConsumeOrderlyStatus.SUCCESS;
  32. }
  33. });
  34. consumer.start();
  35. System.out.printf("Consumer Started.%n");
  36. }
  37. }

解析:
上面保证了生产端的消息顺序性,那么消费端必须保证消息被顺序的消费。使用MessageListenerOrderly。作用是,必须等前面的消息消费完,后面的消息才能进行消费。
在代码里加了sleep验证。

  1. ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
  2. ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
  3. ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
  4. ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
  5. ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
  6. ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
  7. ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
  8. ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
  9. ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9

可以看到并不是单线程处理的,但是保证了顺序消费。

 

RocketMQ实现事务消息

RocketMQ基于二阶段提交方式来实现事务消息,可以保证消息发送和本地事件逻辑同时成功和失败。我们看下面的步骤:

  1. producer向RocketMQ发送待确认消息(consumer不可见)。
  2. RocketMQ收到待确认消息并持久化成功后,向发送方回复消息已经发送成功。
  3. producer开始执行本地事件逻辑。
  4. producer根据本地事件执行结果向RocketMQ发送二次确认(Commit或Rollback)消息,RocketMQ收到Commit状态则标记消息为可投递(consumer可见),收到Rollback则删除消息。
  5. 如果出现异常情况(比如网络波动)导致提交的二次确认没有到达RocketMQ,RocketMQ在经过固定的时间后对待确认消息发起回查请求。

producer收到回查请求后,根据本地事件执行结果重新发送二次确认消息。

 

RocketMQ广播消息

如果希望消息被所有的订阅者消费,可以使用广播机制。

生产者

  1. package com.yunsheng.broadcast;
  2. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  3. import com.alibaba.rocketmq.client.producer.SendResult;
  4. import com.alibaba.rocketmq.common.message.Message;
  5. public class BroadcastProducer {
  6. public static void main(String[] args) throws Exception {
  7. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
  8. producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
  9. producer.start();
  10. for (int i = 0; i < 10; i++) {
  11. Message msg = new Message("broadcastTopic",
  12. "TagA",
  13. "OrderID188",
  14. "Hello world".getBytes());
  15. SendResult sendResult = producer.send(msg);
  16. System.out.printf("%s%n", sendResult);
  17. }
  18. producer.shutdown();
  19. }
  20. }

producer就是最简单的发送代码。

消费者

  1. package com.yunsheng.broadcast;
  2. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  7. import com.alibaba.rocketmq.common.message.MessageExt;
  8. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
  9. import java.util.List;
  10. public class BroadcastConsumer1 {
  11. public static void main(String[] args) throws Exception {
  12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  13. consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
  14. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  15. //set to broadcast mode
  16. consumer.setMessageModel(MessageModel.BROADCASTING);
  17. consumer.subscribe("broadcastTopic", "TagA || TagC || TagD");
  18. consumer.registerMessageListener(new MessageListenerConcurrently() {
  19. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  20. ConsumeConcurrentlyContext context) {
  21. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  22. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  23. }
  24. });
  25. consumer.start();
  26. System.out.printf("Broadcast Consumer Started.%n");
  27. }
  28. }

再复制一份作BroadcastConsumer2

测试

启动BroadcastConsumer1,BroadcastConsumer2,BroadcastProducer。
可以看到两个consumer都消费了10个消息。

将consumer的consumer.setMessageModel(MessageModel.BROADCASTING);注释掉。
再测试一次,可以发现两个consumer分别消费了部分消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/603650
推荐阅读
相关标签
  

闽ICP备14008679号