当前位置:   article > 正文

RocketMQ源码学习

rocketmq源码

文档基于RocketMQ版本:4.2.0

目录

  1. 启动NameServer
  2. 启动Broker
  3. 发送消息
  4. 接收消息

1. 启动NameServer NamesrvStartup

1.启动nameserver
  1. 组装log配置,使用logback日志工具
  2. 创建netty配置(默认端口号:9876,解析配置中会重写该配置),NameServer配置namesrvConfig
  3. 创建NamesrvController
  1. 绑定namesrvConfig、nettyServerConfig
  2. 创建KVConfigManager、RouteInfoManager、BrokerHousekeepingService、Configuration(绑定nameserver、nettyserver配置)
  3. 设置存储路径从config配置中获取(即从namesrv配置中获取configStorePath属性的值)
  1. 将NameServer配置注册至Controller
  2. 初始化NamesrvController
  3. 启动NamesrvController
2.初始化控制器
  1. kv配置管理器加载kvConfigManager.load,加载namesrv的kvConfig.json,将json反序列化为KVConfigSerializeWrapper对象
  2. 创建服务端NettyRemotingServer
  1. 创建ServerBootstrap,绑定nettyServerConfig配置,绑定通道事件监听器channelEventListener(brokerHousekeepingService)
  2. 从nettyServer配置中获取回调线程数配置:publicThreadNums,如果小于等于0则降级为4
  3. 创建发布线程池publicExecutor
    controller启启动broker Start Broker
    ,线程数为publicThreadNums
  4. 创建NioEventLoopGroup:eventLoopGroupBoss,线程数:1
  5. 如果是linux平台并且nettyServer配置配置使用epoll并且系统开启了epoll配置,则使用epoll选择器EpollEventLoopGroup作为eventLoopGroupSelector,否则使用NioEventLoopGroup
  6. 判断是否使用SSL证书
  1. 创建remotingExecutor线程池,线程数默认为8:nettyServerConfig.getServerWorkerThreads()
  2. 注册DefaultRequestProcessor至server,使用remotingExecutor线程池
  3. 启动每10秒执行一次的定时任务扫描非活跃的broker,线程名称为:NSScheduledThread规则,线程池为单线程类型,工作线程为NamesrvController.this.routeInfoManager.scanNotActiveBroker()
  1. 如果超过两分钟没有更新心跳时间,则说明已超时,从活跃列表中移除,并销毁channel
  1. 启动每10分钟执行一次的定时任务打印所有KVConfigManager.configTable中的配置,printAllPeriodically
3. 启动控制器
  1. 启动nettyServer:remotingServer
  2. 创建默认的事件处理线程池DefaultEventExecutorGroup,默认线程数8:nettyServerConfig.getServerWorkerThreads()
  3. 为服务绑定线程池defaultEventExecutorGroup,序列化:NettyEncoder,反序列化:NettyDecoder,handle:IdleStateHandler、NettyConnectManageHandler、NettyServerHandler
  4. 绑定端口启动namesrv服务
  5. 如果channelEventListener监听器不为空则启动netty事件执行器:nettyEventExecutor
  6. 启动NettyEventExecutor,死循环读取eventQueue事件队列,处理事件,事件类型:IDLE、CLOSE、CONNECT、EXCEPTION,委派监听器处理事件,即BrokerHousekeepingService,所以上一步判断channelEventListener监听器不能为空
  7. 使用Time(ServerHouseKeepingService)创建每秒执行一次的定时任务,扫描responseTable,工作线程:NettyRemotingServer.this.scanResponseTable()
  8. 扫描responseTable,如果响应已经超时,则移除并释放response,并调用回调函数,使用回调线程池:publicExecutor
  9. controller启动完成

2. 启动Broker BrokerStartup

1.创建BrokerController
  1. 创建broker控制器,createBrokerController
  2. 如果socket send buffer设置为空则使用131072,如果socket receive buffer设置为空则使用131072
  3. 创建broker配置BrokerConfig,NettyServerConfig,NettyClientConfig
  4. nettyServerConfig设置默认监听端口10911,解析配置中会重写该配置
  5. 创建消息存储配置messageStoreConfig
  6. 设置消息存储监听端口为nettyServerConfig监听端口+1即:10912
  7. 配置日志工具,使用logback
  8. 创建BrokerController
  1. 绑定broker配置brokerConfig
  2. 绑定nettyServerConfig配置
  3. 绑定nettyClientConfig配置
  4. 绑定messageStoreConfig配置
  5. 创建消费者下标管理器ConsumerOffsetManager
  6. 创建主题配置管理器TopicConfigManager
  7. 创建拉取消息处理器PullMessageProcessor
  8. 创建拉取消息hold服务PullRequestHoldService
  9. 创建消息到达监听NotifyMessageArrivingListener
  10. 创建消费者ids变更监听DefaultConsumerIdsChangeListener
  11. 创建消费者管理器ConsumerManager
  12. 创建消费者过滤管理器ConsumerFilterManager
  13. 创建生产者管理器ProducerManager
  14. 创建客户端客房服务ClientHousekeepingService
  15. 创建Broker2Client
  16. 创建订阅组管理器SubscriptionGroupManager
  17. 创建BrokerOuterAPI
  18. 创建过滤server管理器FilterServerManager
  19. 创建slave同步SlaveSynchronize
  20. 创建发送线程池队列,默认10000
  21. 创建拉取线程池队列,默认10W
  22. 创建查询线程池队列,默认20000
  23. 创建客户端管理器线程池队列,默认100W
  24. 创建消费者管理器线程池队列,默认100W
  25. 创建broker状态管理器BrokerStatsManager
  26. 创建broker快速失败机制BrokerFastFailure
2.初始化BrokerController
  1. 主题配置管理器加载,反序列化主题配置为TopicConfigSerializeWrapper
  2. 如果成功则继续消费者下标管理器加载,反序列化消费者下标管理器配置为ConsumerOffsetManager
  3. 如果成功则继续订阅组管理器加载,反序列化订阅组管理器配置为SubscriptionGroupManager
  4. 如果成功则继续消费者过滤管理器加载,反序列化消费者过滤管理器配置为ConsumerFilterManager
  5. 如果成功则继续创建消息存储对象DefaultMessageStore、broker状态BrokerStats、消息存储插件上下文MessageStorePluginContext
  6. 根据消息存储插件上下文(使用上下文中的插件配置),默认消息存储作为入参构造消息存储插件
  7. 向消息存储的dispatch列表中添加CommitLogDispatcherCalcBitMap至列表的头部
  8. 如果成功则继续消息存储加载
  1. 如果scheduleMessageService不为空则加载,即加载ScheduleMessageService,反序列化调度消息服务配置至DelayOffsetSerializeWrapper
  2. 解析消息存储延迟级别配置
  3. 如果成功则继续加载commit日志CommitLog,即加载MappedFileQueue
  1. 如果成功则继续消费者队列加载
  2. 如果成功则继续创建存储检查点StoreCheckpoint,索引服务加载,recover恢复
  3. 如果成功则继续
  4. 创建NettyServer服务NettyRemotingServer
  5. 创建fastRemotingServer服务NettyRemotingServer,端口号为NettyServer端口-2
  6. 线程池均为:BrokerFixedThreadPoolExecutor实现
  7. 创建发送消息线程池,默认线程数1
  8. 创建拉取消息线程池,默认线程数:16+cpu核心数*2
  9. 创建查询消息线程池,默认线程数:8+cpu核心数
  10. 创建adminBroker线程池,默认线程数16
  11. 创建客户端管理器线程池,默认线程数32
  12. 创建消费者管理器线程池,默认线程数32
  13. 注册处理器registerProcessor:SendMessageProcessor、PullMessageProcessor、QueryMessageProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor、AdminBrokerProcessor(默认处理器DefaultProcessor)
  14. 启动一天执行一次的定时任务(延迟至凌晨0点执行,即每天0点执行),打印昨天一天put,get消息数量
  15. 启动默认5秒(org.apache.rocketmq.common.BrokerConfig#getFlushConsumerOffsetInterval)执行一次的定时任务,持久化消费者下标:org.apache.rocketmq.broker.BrokerController#consumerOffsetManager.persist()
  16. 启动3分钟执行一次的定时任务,保护broker,如果启用当消费者读取很慢时则禁用消费者功能,遍历brokerStats,如果消费者读取消息延迟超过阈值(默认为16Mb),则订阅组管理器禁用改订阅组
  17. 启动每秒执行一次的定时任务打印读取队列一个元素cost时间及水位线:发送线程池队列,拉取线程池队列,查询线程池队列
  18. 启动每分钟执行一次的定时任务打印已经存储至commitLog但还没有调度至消费者队列中的消息字节数
  19. 启动每两分钟执行一次的定时任务更新nameServer地址
  20. 如果当前broker角色是slave,如果存在ha配置,更新ha配置至messageStore,启动每分钟执行一次的定时任务同步master数据至本地syncAll,BrokerController.this.slaveSynchronize.syncAll()
  1. 同步主题配置syncTopicConfig:RequestCode.GET_ALL_TOPIC_CONFIG
  2. 同步消费者下标
  3. 同步延迟下标
  4. 同步订阅组配置
  1. 如果当前broker不是slave角色(同步或者异步master角色),启动每分钟执行一次的定时任务打印master与slave的diff信息,master与slave的消息相差字节数
3.启动BrokerController
1. 启动messageStore
  1. 获取文件锁mqhome/store/lock
  2. 启动刷新消费者队列服务flushConsumeQueueService,一分钟刷新一次消费者队列至文件
  3. 启动commitLog
  1. 刷新提交日志服务启动:flushCommitLogService:
  2. 如果是SYNC_FLUSH使用GroupCommitService,
  3. 否则使用FlushRealTimeService,
  4. 同步刷新:刷新commit日志至文件,如果存在读请求,判断读请求的下标是否已经被刷新至文件,没有的话继续刷新至文件,刷新文件的下标大于请求的下标时,唤醒消费者继续消费;
  5. 异步刷新:每10秒刷新一次,如果是调度类型,则按照调度周期刷新,否则实时刷新至文件。如果刷新配置实时存储池启用(transientStorePoolEnable=true),并且是异步刷新类型,启动commitLogService,提交日志时唤醒刷新服务。
  6. 异步存储消息:org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage异步提交消息;org.apache.rocketmq.store.CommitLog#asyncPutMessage异步提交消息
  7. 如果是延迟消息(org.apache.rocketmq.common.message.Message#getDelayTimeLevel>0),且消息类型非事务类型:TRANSACTION_NOT_TYPE,或者是事务提交消息类型:TRANSACTION_COMMIT_TYPE,将消息写入延迟消息主题:RMQ_SYS_SCHEDULE_TOPIC,队列queueId为延迟级别delayLevel-1。delayLevel(org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel)对应延迟时间:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,1对应1s,2对应5s,3对应10s以此类推
  1. 启动storeStats服务,报告存储状态状态
  2. 如果存在调度消息服务ScheduleMessageService并且当前角色不是slave,启动ScheduleMessageService
  3. 设置reput下标,如果允许重复,使用确认的下标ConfirmOffset,否则使用MaxOffset
  4. 启动reputMessageService
  5. 启动haService
2. 启动remotingServer
  1. 启动netty服务
3. 启动fastRemotingServer
  1. 启动fastRemotingServer服务
4. 启动brokerOuterAPI
  1. 根据nettyClientConfig启动nettyClient
5. 启动pullRequestHoldService

处理拉取请求入口
a. netty服务端请求处理时:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand,处理方法:org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)
b. ldq处理器或PullRequestHoldService启动时:org.apache.rocketmq.broker.longpolling.PullRequestHoldService#checkHoldRequest,处理方法:PullMessageProcessor(org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup)

  1. 处理拉取请求
  2. 如果处理PullRequest成功,如果brokerAllowSuspend=true&&hasCommitOffsetFlag=true&&当前broker=master。则提交offset:org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset(java.lang.String, java.lang.String, java.lang.String, int, long)。request请求如果携带了要提交的下标,则提交下标
  3. 处理请求的callback回调中响应客户端response对象
6. 启动clientHousekeepingService
  1. 处理发生异常非活跃状态的channel
7. 启动filterServerManager
8. 注册BrokerAll,registerBrokerAll
  1. BrokerOuterAPI将broker信息注册至所有NameServer,发送RequestCode.REGISTER_BROKER类型Request至NameServer
9. 启动每30秒执行一次的定时任务注册registerBrokerAll
10. 启动brokerStatsManager
11. 启动brokerFastFailure

3. 发送消息

以TransactionMQProducer生产者为例

3.1 创建生产者
  1. 创建生产者并完成初始化
  2. 根据配置设置生产者group组名称、nameserver地址、设置defaultTopicQueueNums
  3. 启动生产者producer.start()
  4. 初始化事务环境initTransactionEnv,创建check校验线程池checkExecutor
  5. 启动生产者默认实现
public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}
  • 1
  • 2
  • 3
  1. 获取客户端mQClientFactory:MQClientInstance
  2. 注册生产者registerProducer至mQClientFactory的producerTable缓存
  3. 主题发布信息表中放入默认主题信息topicPublishInfoTable
  4. 如果指定启动工厂,则启动客户端工厂mQClientFactory
  1. 获取clientConfig中NameServer地址配置,如果为空,则尝试从系统配置中获取rocketmq.namesrv.domain配置,默认为:jmenv.tbsite.net,系统配置中获取rocketmq.namesrv.domain.subgroup配置,默认为:nsaddr,组装http请求url,请求获取NameServer地址,默认为:http://jmenv.tbsite.net:8080/rocketmq/nsaddr
  2. 启动mQClientAPIImpl,根据配置工作线程数创建nettyClient工作线程池,每秒扫描一次responseTable响应表,如果channel监听器不为空,则启动netty事件线程池NettyEventExecuter,如果eventQueue事件队列为空休眠3秒,负责一直处理事件,根据事件类型回调监听器。主要委派ClientRemotingProcessor根据类型处理请求。例如:CHECK_TRANSACTION_STATE检查本地事务状态
  3. 启动拉取消息服务pullMessageService,如果pullRequestQueue拉取请求队列为空则阻塞,否则一直处理
  4. 启动负载均衡服务RebalanceService
  5. 启动defaultMQProducer.getDefaultMQProducerImpl().start(false)
  1. 向所有broker发送心跳信息sendHeartbeatToAllBrokerWithLock
  2. 生产者启动完成
3.2 发送消息
  1. 发送消息send
  2. 调用父类DefaultMQProducer发送,委派DefaultMQProducerImpl实现发送消息
  3. 异步发送消息sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout)
  4. 根据topic名称获取主题发布信息TopicPublishInfo:tryToFindTopicPublishInfo
  5. 检查本地topicPublishInfoTable缓存中是否存在主题发布信息并且状态为ok
  6. 不存在则创建新主题发布信息TopicPublishInfo并缓存至本地topicPublishInfoTable,并上报至name server:org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
  7. 如果TopicPublishInfo中不包含路由信息并且messageQueueList为空,则从nameserver获取最新的主题路由信息org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)。更新生产者信息:org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo(基于writeQueueNums或者orderTopicConf如果配置了的话,创建队列),更新消费者信息:org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicSubscribeInfo(基于readQueueNums创建队列)。如果使用defaultMQProducer,则路由信息中readQueueNums writeQueueNums取读写队列数的最小值:Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  8. 创建RequestCode.GET_ROUTEINTO_BY_TOPIC ,同步方式,3秒的超时,请求nameserver获取主题的路由信息
  9. 响应不存在主题则抛出异常,成功则返回主题路由信息
  10. NameServer端处理request请求:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor,创建TopicRouteData,从主题队列表topicQueueTable中获取主题对应的队列数据,如果不为空,遍历队列获取每个队列信息对应的brokerName,遍历brokerName,根据brokerName从brokerAddrTable中获取brokerData放入brokerDataList中,获取brokerData中的地址,根据地址获取filterServerTable中对应的过滤的server地址列表放入路由信息中,返回路由
  11. 如果主题路由信息不为空,更新topicRouteTable中的主题路由信息
  12. 如果主题路由信息为空打印警告信息
  13. 选择一个消息队列org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue,根据tpInfo获取队列:org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(),如果notBestBroker不为空,则取tpInfo中的writeQueueNums数据,sendWhichQueue递增并对其取余数
  14. 发送消息核心实现sendKernelImpl
  15. 根据brokerName获取broker地址表brokerAddrTable,findBrokerAddressInPublish,从地址表中按照masterId(常量0)获取地址
  16. 发送消息至broker:RequestCode.SEND_MESSAGE
  17. NettyRemotingClient异步或同步发送消息至broker

4. 接收消息

DefaultMQPushConsumer 推送模式的消费者
  1. 创建消费者指定消费组consumerGroup
  2. 设置从哪开始消费setConsumeFromWhere
public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,

    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 订阅主题subscribe
  1. 构建订阅data并存储至RebalanceImpl.subscriptionInner
  2. 如果mQClientFactory不为空发送心跳信息至NameServer
  1. 注册消息的监听器registerMessageListener,将listener与DefaultMQPushConsumerImpl.messageListenerInner绑定
  2. 启动消费者consumer.start(),委派:this.defaultMQPushConsumerImpl.start();
  3. 校验配置checkConfig
  4. 复制订阅信息copySubscription,将defaultMQPushConsumer订阅信息复制到defaultMQPushConsumerImpl实例
  5. 如果消息模型是CLUSTERING类型,则将实例名称转为pid
  6. 获取或创建客户端工厂mQClientFactory
  7. rebalanceImpl设置消费主题、消息模型(广播或集群)、分配消息队列策略(默认为平均散列算法)、客户端工厂实例
  8. 创建拉取消息API:PullAPIWrapper(客户端工厂、消费组、是否unit模式)
  9. PullAPIWrapper注册过滤消息钩子列表registerFilterMessageHook
  10. 如果defaultMQPushConsumer存在offsetStore复制到Impl实现实例中的offsetStore
  11. 如果offsetStore为空,则判断消息模型,如果是广播BROADCASTING类型则创建为LocalFileOffsetStore;CLUSTERING类型创建RemoteBrokerOffsetStore
  12. 加载下标offsetStore.load,RemoteBrokerOffsetStore加载方法为空方法,什么都不做。LocalFileOffsetStore加载读取本地存储文件:…/offsets.json
  13. 设置消费消息服务consumeMessageService
  1. 如果消费者指定的监听器是MessageListenerOrderly类型,创建顺序消费服务ConsumeMessageOrderlyService
  2. 如果指定监听器是MessageListenerConcurrently类型,创建并发消费服务ConsumeMessageConcurrentlyService
  1. 启动消费服务,假定ConsumeMessageConcurrentlyService类型,默认为15分钟清楚一次超时消息cleanExpireMsg
  2. 注册消费者,mQClientFactory.registerConsumer,将实例缓存至客户端工厂的消费者表consumerTable.putIfAbsent(group, consumer)
  3. 启动mq客户端工厂mQClientFactory.start
  1. 启动mq客户端mQClientAPIImpl
  2. 启动拉取消息服务pullMessageService,服务等待PullRequest进入pullRequestQueue队列,选择消费者org.apache.rocketmq.client.impl.factory.MQClientInstance#selectConsumer,使用选消费拉取消息:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
  3. 启动负载均衡服务rebalanceService
  4. 启动默认生产者服务,不启动客户端工厂mQClientFactory
  1. 启动完成
  2. 从NameServer刷新主题路由信息this.updateTopicSubscribeInfoWhenSubscriptionChanged()
  3. 消费者表遍历,如果订阅是tag类型跳过,根据主题获取broker地址(轮询获取其中一个地址),发送校验客户端配置请求RequestCode.CHECK_CLIENT_CONFIG至broker,this.mQClientFactory.checkClientInBroker();
  4. 发送心跳至所有broker,this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  5. 立即对客户端实例MQClientInstance执行一次均衡,this.mQClientFactory.rebalanceImmediately();
  6. wakeup唤醒均衡服务
public void rebalanceImmediately() {
    this.rebalanceService.wakeup();
}
  • 1
  • 2
  • 3
  1. 均衡服务默认每20秒执行一次均衡,中途可通过wakeup唤醒,默认时间可通过rocketmq.client.rebalance.waitInterval配置执行;调用客户端工厂MQClientInstance.doRebalance均衡方法
  2. 遍历消费者表consumerTable,调用消费者均衡方法org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
  3. 调用RebalancePushImpl均衡方法,继承至父类:org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
  4. 遍历subscriptionInner订阅表,获取订阅表的主题名称,根据主题、是否顺序消费调用均衡方法rebalanceByTopic
根据主题均衡消费消息 rebalanceByTopic

根据主题、消费组、客户端id、mq列表、消费者id列表分配策略分配消息队列

广播模式
  1. 从主题订阅信息表topicSubscribeInfoTable中根据主题名称获取消息队列信息集合
  2. 在均衡实例中更新处理队列表updateProcessQueueTableInRebalance,详情在集群模式中讲述
集群模式
  1. 从主题订阅信息表topicSubscribeInfoTable中根据主题名称获取消息队列信息集合
  2. 根据主题、消费组发送请求至broker获取消费者id列表findConsumerIdList
  3. 排序MessageQueue集合、消费者id集合
  4. 按照分配消息队列策略allocateMessageQueueStrategy分配allocate,选择一个MessageQueue
  5. 在均衡实例中更新处理队列表updateProcessQueueTableInRebalance
  6. 遍历处理队列表processQueueTable,如果处理中的MessageQueue的主题与入参需要消费的remote主题相同,并且入参需要消费remote的mqSet不包含遍历的mq(处理中的mq),表示老的处理中的mq在remote服务端已经不存在,丢弃并删除removeUnnecessaryMessageQueue;如果处理中的mq在remote服务端的mq列表中存在,如果拉取超时,并且消费类型是push则丢弃并删除,否则继续
  7. 遍历需要消费remote服务端的消息队列集合mqSet,如果处理中的队列表processQueueTable不包含remote服务端的MessageQueue
  8. 移除脏下标removeDirtyOffset重新计算消费标并缓存至处理中的队列表processQueueTable
  9. 创建新ProcessQueue
  10. 计算从哪个位置消费computePullFromWhere,下标读取类型:READ_FROM_STORE(读取文件),READ_FROM_MEMORY(读取内存),MEMORY_FIRST_THEN_STORE
  11. pull类型直接返回0
  12. push类型根据类型返回
  13. 从最后的下标消费:CONSUME_FROM_LAST_OFFSET。
  1. 根据mq读取本地上次的消费下标;类型LocalFileOffsetStore读取本地存储文件;类型RemoteBrokerOffsetStore从Broker根据key(topic+TOPIC_GROUP_SEPARATOR+group)读取最新下标(如果小于0则根据主题、队列id读取最小下标返回
  2. 如果读取到的下标为-1,如果主题开始名称为%RETRY%,则使用0,否则使用getMQAdminImpl根据主题、队列id获取最大下标
  3. 均失败则使用-1
  1. 从开始下标读取:CONSUME_FROM_FIRST_OFFSET
  2. 根据时间戳读取下标:CONSUME_FROM_TIMESTAMP
  3. 如果下标大于等于0则将MessageQueue,ProcessQueue缓存至processQueueTable
  4. 如果存在老ProcessQueue打印日志
  5. 如果不存在,封装PullRequest
  6. 分派拉取请求dispatchPullRequest
  7. 遍历放入拉取请求队列executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 如果超过阈值进行流控(com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage,count个数默认1000,size大小默认100 MiB,span跨度(仅针对无序消费场景)默认2000)并打印日志,50毫秒休眠
  2. 如果是并发则不需加锁,否则加processQueue锁
  3. 如果获取订阅信息发生异常延迟3秒再请求拉取消息
  4. 创建拉取消息回调对象PullCallback
  1. FOUND
  2. NO_NEW_MSG
  3. NO_MATCHED_MSG
  4. OFFSET_ILLEGAL
  1. 拉取消息pullAPIWrapper.pullKernelImpl
  2. 拉取消息成功,回调PullCallback,提交最新消息给消费者:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#consumeMessageService.submitConsumeRequest
  3. 封装消费请求提交至线程池等待消费:ConsumeRequest,线程池执行ConsumeRequest
  4. ConsumeRequest(org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run或者org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run)执行消费成功(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)上报消费下标,如果是顺序消费这尝试lock队列随后上报下标:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#defaultMQPushConsumerImpl.getOffsetStore().updateOffset
  5. 等待异步线程持久化下标至remote,即上报消费者消费下标至broker:org.apache.rocketmq.client.impl.factory.MQClientInstance#persistAllConsumerOffset,或者shutdown钩子等方法触发上报持久化
  6. broker端处理更新下标请求:org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset
DefaultMQPullConsumer 拉取模式的消费者

与DefaultMQPushConsumer类似,但是客户端不需要指定哪种类型的下标消费,因为可以任意指定下标,本地管理消费的下标

总结

  1. push模式下消费下标默认类型:CONSUME_FROM_LAST_OFFSET。广播模式下:如果从本地读取下标返回-1则从服务端获取最大下标开始消费
  2. 消费者默认消息模型类型:CLUSTERING
  3. 客户端从服务端查询消费下标,服务端获取下标优先级由高到低:
    a. 服务器端按照格式:topic + TOPIC_GROUP_SEPARATOR + group 组装key从缓存中按照queueId获取消费下标
    b. 根据topic按照queueId获取队列中最小下标(偏移量为CQ_STORE_UNIT_SIZE=20倍数,所以获取时要除以偏移量)
    c. 如果下标0已经交换至磁盘则返回0
    d. 响应查询失败:QUERY_NOT_FOUND
下标管理
  1. 客户端本地管理
  2. 远程管理:broker端,如果收到客户端上报的下标,或者(PullRequest携带了客户端已经消费成功的下标,并且brokerAllowSuspend,且是master),提交下标至内存,异步持久化下标至磁盘。

问题1:如果broker响应客户端失败,下标却异步持久化成功,会不会有问题?
答:不会,客户端拉取消息如果没有收到broker响应会再次retry

问题2:远程管理下标场景,何时上报消费者下标?
答:

  1. 消费成功后上报,注意是会提交至本地缓存,等待定时任务定时上报。会不会重复消费?常规情况下不会,因为一个队列对应一个唯一的clientId(一个队列不会同时属于两个client)。当然上报前发生rebalance会造成重复消费。
  2. 拉取消息的请求会携带上次消费成功的下标,并上报下标给broker。
  3. 定时任务兜底,定时上报

问题3: 批次消费消息,如果其中一个消息失败了怎么上报offset?
答:

  1. 不存在一个消息失败场景,因为接口定义入参也是批量消息,可以理解为是一个事务,要么全失败,要么全成功。例如:org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(final List msgs,
    final ConsumeConcurrentlyContext context)
  2. 但是也不是完全事务,如果消费成功了,可以指定某个下标之后的消息重新消费,可以参考org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult方法中的ackIndex属性逻辑,可以理解为一刀切,ackIndex之前的消息全部上报不再重新消费。ackIndex之后的消息全部重新消费
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/603747
推荐阅读
相关标签
  

闽ICP备14008679号