赞
踩
文档基于RocketMQ版本:4.2.0
- 绑定namesrvConfig、nettyServerConfig
- 创建KVConfigManager、RouteInfoManager、BrokerHousekeepingService、Configuration(绑定nameserver、nettyserver配置)
- 设置存储路径从config配置中获取(即从namesrv配置中获取configStorePath属性的值)
- 创建ServerBootstrap,绑定nettyServerConfig配置,绑定通道事件监听器channelEventListener(brokerHousekeepingService)
- 从nettyServer配置中获取回调线程数配置:publicThreadNums,如果小于等于0则降级为4
- 创建发布线程池publicExecutor
controller启启动broker Start Broker
,线程数为publicThreadNums- 创建NioEventLoopGroup:eventLoopGroupBoss,线程数:1
- 如果是linux平台并且nettyServer配置配置使用epoll并且系统开启了epoll配置,则使用epoll选择器EpollEventLoopGroup作为eventLoopGroupSelector,否则使用NioEventLoopGroup
- 判断是否使用SSL证书
- 如果超过两分钟没有更新心跳时间,则说明已超时,从活跃列表中移除,并销毁channel
- 绑定broker配置brokerConfig
- 绑定nettyServerConfig配置
- 绑定nettyClientConfig配置
- 绑定messageStoreConfig配置
- 创建消费者下标管理器ConsumerOffsetManager
- 创建主题配置管理器TopicConfigManager
- 创建拉取消息处理器PullMessageProcessor
- 创建拉取消息hold服务PullRequestHoldService
- 创建消息到达监听NotifyMessageArrivingListener
- 创建消费者ids变更监听DefaultConsumerIdsChangeListener
- 创建消费者管理器ConsumerManager
- 创建消费者过滤管理器ConsumerFilterManager
- 创建生产者管理器ProducerManager
- 创建客户端客房服务ClientHousekeepingService
- 创建Broker2Client
- 创建订阅组管理器SubscriptionGroupManager
- 创建BrokerOuterAPI
- 创建过滤server管理器FilterServerManager
- 创建slave同步SlaveSynchronize
- 创建发送线程池队列,默认10000
- 创建拉取线程池队列,默认10W
- 创建查询线程池队列,默认20000
- 创建客户端管理器线程池队列,默认100W
- 创建消费者管理器线程池队列,默认100W
- 创建broker状态管理器BrokerStatsManager
- 创建broker快速失败机制BrokerFastFailure
- 如果scheduleMessageService不为空则加载,即加载ScheduleMessageService,反序列化调度消息服务配置至DelayOffsetSerializeWrapper
- 解析消息存储延迟级别配置
- 如果成功则继续加载commit日志CommitLog,即加载MappedFileQueue
- 同步主题配置syncTopicConfig:RequestCode.GET_ALL_TOPIC_CONFIG
- 同步消费者下标
- 同步延迟下标
- 同步订阅组配置
- 刷新提交日志服务启动:flushCommitLogService:
- 如果是SYNC_FLUSH使用GroupCommitService,
- 否则使用FlushRealTimeService,
- 同步刷新:刷新commit日志至文件,如果存在读请求,判断读请求的下标是否已经被刷新至文件,没有的话继续刷新至文件,刷新文件的下标大于请求的下标时,唤醒消费者继续消费;
- 异步刷新:每10秒刷新一次,如果是调度类型,则按照调度周期刷新,否则实时刷新至文件。如果刷新配置实时存储池启用(transientStorePoolEnable=true),并且是异步刷新类型,启动commitLogService,提交日志时唤醒刷新服务。
- 异步存储消息:org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage异步提交消息;org.apache.rocketmq.store.CommitLog#asyncPutMessage异步提交消息
- 如果是延迟消息(org.apache.rocketmq.common.message.Message#getDelayTimeLevel>0),且消息类型非事务类型:TRANSACTION_NOT_TYPE,或者是事务提交消息类型:TRANSACTION_COMMIT_TYPE,将消息写入延迟消息主题:RMQ_SYS_SCHEDULE_TOPIC,队列queueId为延迟级别delayLevel-1。delayLevel(org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel)对应延迟时间:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,1对应1s,2对应5s,3对应10s以此类推
处理拉取请求入口
a. netty服务端请求处理时:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand,处理方法:org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)
b. ldq处理器或PullRequestHoldService启动时:org.apache.rocketmq.broker.longpolling.PullRequestHoldService#checkHoldRequest,处理方法:PullMessageProcessor(org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup)
以TransactionMQProducer生产者为例
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
- 获取clientConfig中NameServer地址配置,如果为空,则尝试从系统配置中获取rocketmq.namesrv.domain配置,默认为:jmenv.tbsite.net,系统配置中获取rocketmq.namesrv.domain.subgroup配置,默认为:nsaddr,组装http请求url,请求获取NameServer地址,默认为:http://jmenv.tbsite.net:8080/rocketmq/nsaddr
- 启动mQClientAPIImpl,根据配置工作线程数创建nettyClient工作线程池,每秒扫描一次responseTable响应表,如果channel监听器不为空,则启动netty事件线程池NettyEventExecuter,如果eventQueue事件队列为空休眠3秒,负责一直处理事件,根据事件类型回调监听器。主要委派ClientRemotingProcessor根据类型处理请求。例如:CHECK_TRANSACTION_STATE检查本地事务状态
- 启动拉取消息服务pullMessageService,如果pullRequestQueue拉取请求队列为空则阻塞,否则一直处理
- 启动负载均衡服务RebalanceService
- 启动defaultMQProducer.getDefaultMQProducerImpl().start(false)
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET,
CONSUME_FROM_TIMESTAMP,
}
- 构建订阅data并存储至RebalanceImpl.subscriptionInner
- 如果mQClientFactory不为空发送心跳信息至NameServer
- 如果消费者指定的监听器是MessageListenerOrderly类型,创建顺序消费服务ConsumeMessageOrderlyService
- 如果指定监听器是MessageListenerConcurrently类型,创建并发消费服务ConsumeMessageConcurrentlyService
- 启动mq客户端mQClientAPIImpl
- 启动拉取消息服务pullMessageService,服务等待PullRequest进入pullRequestQueue队列,选择消费者org.apache.rocketmq.client.impl.factory.MQClientInstance#selectConsumer,使用选消费拉取消息:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
- 启动负载均衡服务rebalanceService
- 启动默认生产者服务,不启动客户端工厂mQClientFactory
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
根据主题、消费组、客户端id、mq列表、消费者id列表分配策略分配消息队列
- 根据mq读取本地上次的消费下标;类型LocalFileOffsetStore读取本地存储文件;类型RemoteBrokerOffsetStore从Broker根据key(topic+TOPIC_GROUP_SEPARATOR+group)读取最新下标(如果小于0则根据主题、队列id读取最小下标返回)
- 如果读取到的下标为-1,如果主题开始名称为%RETRY%,则使用0,否则使用getMQAdminImpl根据主题、队列id获取最大下标
- 均失败则使用-1
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
- FOUND
- NO_NEW_MSG
- NO_MATCHED_MSG
- OFFSET_ILLEGAL
与DefaultMQPushConsumer类似,但是客户端不需要指定哪种类型的下标消费,因为可以任意指定下标,本地管理消费的下标
问题1:如果broker响应客户端失败,下标却异步持久化成功,会不会有问题?
答:不会,客户端拉取消息如果没有收到broker响应会再次retry
问题2:远程管理下标场景,何时上报消费者下标?
答:
问题3: 批次消费消息,如果其中一个消息失败了怎么上报offset?
答:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。