赞
踩
以下为RocketMQ—生产者系列文章索引:
1:启动流程(本文)
2:路由动态更新
3:发送方式分析+消息类型区别
4:生产者消息的发送流程
5:路由队列选择,客户端冗错策略(问题答疑)
Producer 是 RocketMQ 消息的投递者,负责生产消息。
它会与NameServer集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从NameServer读取Topic路由信息,将路由信息保存在本地内存中;
它向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳;
它只会向Master Broker发送消息,从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;
它支持发送消息类型有多种,例如:普通消息、事物消息、定时消息等;
它发送消息的方式支持三种:同步、异步、单向方式等.
可简单查看生产端与Master Broker 和NameServer简单交互图:
备注:生产者还可向broker查询消息等其他功能交互。
在了解具体生产启动流程之前,我们先提出出几个问题,带着问题去分析源码:
消息生产者启动时具体做了什么?
一个应用需要发送多个topic,不同topic需要发送到不同集群的broker,如何处理?
我们可先了解和分析生产者相关的类图关系:
从类图中可以看出,MQProducer有两种实现方式。
一个是 DefaultMQProducer(非事务消息生产者);
一个是 TransactionMQProducer(支持事务消息);
接下来先对接个类核心参数或方法进行简单分析:
MqAdmin:核心方法解析(Mq管理基础接口)
- //创建一个主题
- void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
- //根据 时间戳从队列中 查找其偏移量
- long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
- //查找该消息 队列中 最大的物理偏移量
- long maxOffset(final MessageQueue mq) throws MQClientException;
- //查找该消息队列中最小物理偏移量。
- long minOffset(final MessageQueue mq) throws MQClientException;
- //获取最早的存储消息时间
- long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
- //根据消息偏移量查找消息
- MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
- //根据条件查询消息
- QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
- //根据 主题 与 消息ID 查找消息 。
- MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
MQProducer:核心方法解析(生产者基础接口):
- //启动
- void start() throws MQClientException;
- //关闭
- void shutdown();
- //根据topic获取对应队列信息
- List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
- //同步-消息发送
- SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;
- //异步-消息发送
- void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
- //同步-选择队列消息发送
- SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
- //单向-消息发送
- void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
- //事务消息-发送
- TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
- //批量消息-发送
- SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
备注:其中启动start()和关闭shutdown()表示生产者的启动和关闭、
clientConfig:核心属性方法解析(客户端配置)
- //nameServer-地址,默认从:系统属性:rocketmq.namesrv.addr 或 环境变量:NAMESRV_ADDR 中获取
- private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
- //实例名字,默认:DEFAULT 或者 系统属性-rocketmq.client.name
- private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
- //构建 mq客户端的 id,例子:ip@instanceName@unitName : 172.16.62.75@19312@unitName
- public String buildMQClientId() {
- StringBuilder sb = new StringBuilder();
- sb.append(this.getClientIP());
- sb.append("@");
- sb.append(this.getInstanceName());
- if (!UtilAll.isBlank(this.unitName)) {
- sb.append("@");
- sb.append(this.unitName);
- }
- return sb.toString();
- }
-
- //设置namesrv地址
- public void setNamesrvAddr(String namesrvAddr) {
- this.namesrvAddr = namesrvAddr;
- }
备注:namesrvAddr表示nameServer地址,可调用setNamesrvAddr方法设置,或者通过环境变量、系统属性设置;buildMQClientId表示设置生产者Id.
(略)
- // 构造器
- public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
- this.producerGroup = producerGroup;
- defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- }
- // 各种发送消息
- public SendResult send(
- Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg);
- }
- // 启动方法
- public void start() throws MQClientException {
- this.defaultMQProducerImpl.start();
- if (null != traceDispatcher) {
- try {
- traceDispatcher.start(this.getNamesrvAddr());
- } catch (MQClientException e) {
- log.warn("trace dispatcher start failed ", e);
- }
- }
- }
备注:DefaultMQProducer的构造器,send和start等相关的方法,其实都是围绕DefaultMQProducerImpl来转,defaultMQProducerImpl:默认生产者的实现类,其start方法作为生产者启动的核心方法,接下来将核心分析其start方法的实现.
DefaultMQProducerImpl#start
- /**
- * mq-producer 启动
- * @param startFactory
- * @throws MQClientException
- */
- public void start(final boolean startFactory) throws MQClientException {
- switch (this.serviceState) {
- case CREATE_JUST:
- // 0-服务状态设置
- this.serviceState = ServiceState.START_FAILED;
- //1-检测配置
- this.checkConfig();
- //2-并改变生产者的 instanceName为进程 ID。
- if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
- this.defaultMQProducer.changeInstanceNameToPID();
- }
- //3-创建 MQClientlnstance实例
- this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
- //4-向 MQClientlnstance注册生产者。
- boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
- if (!registerOK) {
- this.serviceState = ServiceState.CREATE_JUST;
- throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
- }
- //5-默认topic信息缓存( this.defaultMQProducer.getCreateTopicKey() = 'TBW102' )
- this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
- //6-是否启动-mQClientFactory
- if (startFactory) {
- mQClientFactory.start();
- }
-
- log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
- this.defaultMQProducer.isSendMessageWithVIPChannel());
- this.serviceState = ServiceState.RUNNING;//设置状态为 运行中
- break;
- case RUNNING:
- case START_FAILED:
- case SHUTDOWN_ALREADY:
- throw new MQClientException("The producer service state not OK, maybe started once, "
- + this.serviceState
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- default:
- break;
- }
- //7-发送心跳到所有broker
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
分析如下:
0-服务状态设置:
设置状态值的意义是为了防止重复启动,其枚举类为:ServiceState; 如果初始化状态不等于:CREATE_JUST,则异常跑出
1-检测配置:
- private void checkConfig() throws MQClientException {
- Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
- if (null == this.defaultMQProducer.getProducerGroup()) {
- throw new MQClientException("producerGroup is null", null);
- }
- //生产所属组 不能等于 DEFAULT_PRODUCER
- if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
- throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
- null);
- }
- }
备注:为了检测-producerGroup的合法性
2-并改变生产者的instanceName为进程 ID。
- // 判断producerGroup是否等于CLIENT_INNER_PRODUCER
- if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
- this.defaultMQProducer.changeInstanceNameToPID();
- }
-
- 是调用ClientConfig#changeInstanceNameToPID
- public void changeInstanceNameToPID() {
- if (this.instanceName.equals("DEFAULT")) {
- this.instanceName = String.valueOf(UtilAll.getPid());
- }
- }
备注:instanceName == DEFAULT, 将其改为 启动的 进程ID,目的是为了MQClientInstance的构建
3-创建MQClientlnstance实例
MQClientManager管理MQClientInstance,其内部维护的数据结构为:ConcurrentHashMap,key:clientId,且MQClientManager本身是单例模式,核心方法分析如下:
MQClientManager
- private static MQClientManager instance = new MQClientManager();//-单列模式
- private AtomicInteger factoryIndexGenerator = new AtomicInteger();//index的工厂
- // MQClientInstance 缓存
- private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
-
- //构建返回MQClientInstance
- public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
- String clientId = clientConfig.buildMQClientId();//构建mq客户端的 id
- MQClientInstance instance = this.factoryTable.get(clientId);
- if (null == instance) {
- instance = new MQClientInstance(clientConfig.cloneClientConfig(),
- this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
- MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
- if (prev != null) {
- instance = prev;
- log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
- } else {
- log.info("Created new MQClientInstance for clientId:[{}]", clientId);
- }
- }
- return instance;
- }
备注:
ClientConfig.buildMQClientId 在上面已分析,是为了构建clientId;getAndCreateMQClientInstance此方法的目的就是为了构建或查询MQClientInstance.
MQClientInstance:封装了 RocketMQ 网络处理 API,是消息生产者( Producer)、消息消费者 (Consumer)与 NameServer、 Broker打交道的网络通道.
接下来分析多个生产者公用同一个MQClientInstance的优点和缺点:
优点:一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,就会只创建一个MQClientInstance实例(用于生产者的topic发送消息在同一套broker集群)
缺点:如果多个topic复用MQClientInstance会有怎么的结果呢?这种情况会出现在你在一个JVM里启动了多个Producer时,且没有设置instanceName和unitName,那么这两个Producer会公用一个MQClientInstance,发送的消息会路由到同一个集群。
例如,你起了两个Producer,并且配置的NameServer地址不一样,本意是让这两个Producer往不同集群上分配消息,但是由于共用了一个MQClientInstance,这个MQClientInstance是基于先来的Producer配置构建的,第二个Producer和他公用后被认为是同一instance,配置是相同的,消息的路由就是相同的,就没有达到你想要的效果。
4-向MQClientInstance注册生产者。
- //key:group, value: 生产者
- private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
- // 将当前生产者加入到 MQClientlnstance管理中
- public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
- if (null == group || null == producer) {
- return false;
- }
- MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
- if (prev != null) {
- log.warn("the producer group[{}] exist already.", group);
- return false;
- }
- return true;
- }
备注:DefaultMQProducerImpl实现的接口类为:MQProducerInner
5-添加默认topic信息缓存,此处需要理解topicPublishInfoTable数据结构的意思
- //key:topic value:TopicPublishInfo-路由相关信息,用于消息发送
- private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
- new ConcurrentHashMap<String, TopicPublishInfo>();
TopicPublishInfo:
分析,熟悉的佩服熟悉的味道,MessageQueue和TopicRouteData在NameServer已分析相当清除,分析如下:
- public class TopicPublishInfo {
- //是否是顺序消息
- private boolean orderTopic = false;
- //是否包含路由信息
- private boolean haveTopicRouterInfo = false;
- //该主题队列的消息队列
- private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
- //每选择一次消息 队列, 该值会自增1,如果 Integer.MAX_VALUE, 则重置为 0,用于选择消息队列。
- private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
- //路由信息
- private TopicRouteData topicRouteData;
- //选择队列方法,lastBrokerName其实上一次发送失败的brokerName,如果不为空,本次选择队列发送所在的brokerName则选择其他的brokerName
- public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
- if (lastBrokerName == null) {
- return selectOneMessageQueue();
- } else {
- //如果消息发 送再失败的话 , 下次进行 消息队列选择 时规避上次 MesageQueue 所 在的 Broker, 否 则还 是 很有可能再次失败 。
- int index = this.sendWhichQueue.getAndIncrement();
- for (int i = 0; i < this.messageQueueList.size(); i++) {
- int pos = Math.abs(index++) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = this.messageQueueList.get(pos);
- if (!mq.getBrokerName().equals(lastBrokerName)) {
- return mq;
- }
- }
- return selectOneMessageQueue();
- }
- }
- //直接用 sendWhichQueue 自增再获取值 , 与当前路由 表 中消息 队列个数取模, 返回该位置的 MessageQueue(selectOneMessageQueue()方法)
- public MessageQueue selectOneMessageQueue() {
- int index = this.sendWhichQueue.getAndIncrement();
- int pos = Math.abs(index) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- return this.messageQueueList.get(pos);
- }
- //messageQueueList-长度大于0
- public boolean ok() {
- return null != this.messageQueueList && !this.messageQueueList.isEmpty();
- }
- //是否包含路由信息
- public boolean isHaveTopicRouterInfo() {
- return haveTopicRouterInfo;
- }
6-启动-MQClientInstance
MQClientInstance#start
-
- public void start() throws MQClientException {
- synchronized (this) {
- switch (this.serviceState) {
- case CREATE_JUST:
- this.serviceState = ServiceState.START_FAILED; //1>状态-设置启动失败
- // If not specified,looking address from name server
- if (null == this.clientConfig.getNamesrvAddr()) { //2>判断nameSrvAddr地址是否为空,http获取nameSrvAddr
- this.mQClientAPIImpl.fetchNameServerAddr();
- }
- // Start request-response channel : netty
- this.mQClientAPIImpl.start(); // 3>启动netty相关
- // Start various schedule tasks
- this.startScheduledTask(); //4>【重要】启动相关定时任务
- // Start pull service
- this.pullMessageService.start(); //5>消费端相关,后续讲解
- // Start rebalance service
- this.rebalanceService.start(); //6>消费端相关,后续讲解
- // Start push service
- this.defaultMQProducer.getDefaultMQProducerImpl().start(false); //7>内部启动一个mqProducter,startFactory=false
- log.info("the client factory [{}] start OK", this.clientId);
- this.serviceState = ServiceState.RUNNING; >8 状态设置运行中
- break;
- case RUNNING:
- break;
- case SHUTDOWN_ALREADY:
- break;
- case START_FAILED:
- throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
- default:
- break;
- }
- }
- }
备注:后续单独讲解:this.startScheduledTask();
7-发送心跳到所有broker
(this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();)
- public void sendHeartbeatToAllBrokerWithLock() {
- if (this.lockHeartbeat.tryLock()) {
- try {
- this.sendHeartbeatToAllBroker(); 1> 发送心跳到所有broker
- this.uploadFilterClassSource(); 2> 更新过滤filterSource 可忽略不看
- } catch (final Exception e) {
- log.error("sendHeartbeatToAllBroker exception", e);
- } finally {
- this.lockHeartbeat.unlock();
- }
- } else {
- log.warn("lock heartBeat, but failed.");
- }
- }
备注:sendHeartbeatToAllBroker,相对简单,
对返回结果维护了brokerVersionTable(ConcurrentHashMap),你不可错过.因为会有定时任务定时发送心跳至所有broker
小结:通过7个步骤我们已经了解到生产者的启动流程,大致分为:检测相关配置、注册构建相关类(例如:MQClientInstance相关、netty相关等)、然后启动相关定时任务;简单总结生产者的启动流程,如下:
文章导航
类别 | 标题 | 发布 |
Redis | Redis(一):单线程为何还能这么快 | 己发布 |
Redis(二):内存模型及回收算法 | 己发布 | |
Redis(三):持久化 | 己发布 | |
Redis(四):主从同步 | 2021.11.05 | |
Redis(五):集群搭建 | 即将上线 | |
Redis(六):实战应用 | 即将上线 | |
Elasticsearch | Elasticsearch:概述 | 己发布 |
Elasticsearch:核心 | 己发布 | |
Elasticsearch:实战 | 己发布 | |
Elasticsearch写入流程详解 | 即将上线 | |
Elasticsearch查询流程详解 | 即将上线 | |
Elasticsearch集群一致性 | 即将上线 | |
Lucene的基本概念 | 即将上线 | |
Elasticsearch部署架构和容量规划 | 即将上线 | |
RocketMQ | RocketMQ—NameServer总结及核心源码剖析 | 己发布 |
RocketMQ—Producer(一)启动流程解密 | 本文章 | |
RocketMQ—Producer(二)路由动态更新 | 2021.11.08 | |
RocketMQ—Producer(三)发送方式和消息类型 | 2021.11.10 | |
RocketMQ—Producer(四)消息发送流程 | 2021.11.12 | |
RocketMQ—Producer(五)路由队列选择,客户端冗错策略 | ||
Broker—启动流程源码解密 | 即将上线 | |
Broker—接受消息处理流程解密 | 即将上线 | |
Tomcat源码分析 | Tomcat(一):项目结构及架构分析 | 即将上线 |
Tomcat(二):启动关闭流程分析 | 即将上线 | |
Tomcat(三):应用加载原理分析 | 即将上线 | |
Tomcat(四):网络请求原理分析 | 即将上线 | |
Tomcat(五):嵌入式及性能调优 | 即将上线 | |
Nacos | Nacos项目结构及架构分析 | 即将上线 |
Nacos服务注册源码解析 | 即将上线 | |
Nacos配置管理源码解析 | 即将上线 | |
Nacos2.0版本优化功能解析 | 即将上线 | |
Netty | 计算机网络&nio核心原理 | 即将上线 |
详细解读kafka是如何基于原生nio封装网络通信组件的? | 即将上线 | |
netty初识之核心组件介绍 | 即将上线 | |
源码解析netty服务端,端口是如何拉起来的?新连接接入又如何处理? | 即将上线 | |
深入netty中各种事件如何在pipeline中传播的? | 即将上线 | |
网络编程中拆包粘包是什么?kafka以及netty是如何解决的? | 即将上线 | |
深入解读netty中最为复杂的缓存分配是如何进行的? | 即将上线 | |
源码分析netty、kafka、sentinel中不同时间轮实现方式以及细节 | 即将上线 | |
尝试从上帝角度对比kafka&netty中的性能优化,各种设计模式的丰富运用 | 即将上线 | |
Netty在Rocketmq中的实践,RocketMq的消息协议解读 | 即将上线 | |
关注IT巅峰技术,私信作者,获取以下2021全球架构师峰会PDF资料。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。