当前位置:   article > 正文

RocketMQ—Producer(一)启动流程解密_the producer service state not ok, maybe started o

the producer service state not ok, maybe started once, running

以下为RocketMQ—生产者系列文章索引:

1:启动流程(本文)

2:路由动态更新

3:发送方式分析+消息类型区别

4:生产者消息的发送流程

5:路由队列选择,客户端冗错策略(问题答疑)

一、Producer介绍

Producer 是 RocketMQ 消息的投递者,负责生产消息。

它会与NameServer集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从NameServer读取Topic路由信息,将路由信息保存在本地内存中;

它向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳;

它只会向Master Broker发送消息,从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;

它支持发送消息类型有多种,例如:普通消息、事物消息、定时消息等;

它发送消息的方式支持三种:同步、异步、单向方式等.

可简单查看生产端与Master Broker 和NameServer简单交互图:

备注:生产者还可向broker查询消息等其他功能交互。

二、生产者启动流程:

在了解具体生产启动流程之前,我们先提出出几个问题,带着问题去分析源码:

  1. 消息生产者启动时具体做了什么?

  2. 一个应用需要发送多个topic,不同topic需要发送到不同集群的broker,如何处理?

我们可先了解和分析生产者相关的类图关系:

从类图中可以看出,MQProducer有两种实现方式。

一个是 DefaultMQProducer(非事务消息生产者); 

一个是 TransactionMQProducer(支持事务消息); 

接下来先对接个类核心参数或方法进行简单分析:

2.1 MqAdmin

MqAdmin:核心方法解析(Mq管理基础接口)    

  1. //创建一个主题
  2. void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
  3. //根据 时间戳从队列中 查找其偏移量
  4. long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
  5. //查找该消息 队列中 最大的物理偏移量
  6. long maxOffset(final MessageQueue mq) throws MQClientException;
  7. //查找该消息队列中最小物理偏移量。
  8. long minOffset(final MessageQueue mq) throws MQClientException;
  9. //获取最早的存储消息时间
  10. long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
  11. //根据消息偏移量查找消息
  12. MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
  13. //根据条件查询消息
  14. QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
  15. //根据 主题 与 消息ID 查找消息 。
  16. MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

2.2 核心方法解析

MQProducer:核心方法解析(生产者基础接口):

  1. //启动
  2. void start() throws MQClientException;
  3. //关闭
  4. void shutdown();
  5. //根据topic获取对应队列信息
  6. List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
  7. //同步-消息发送
  8. SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;
  9. //异步-消息发送
  10. void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
  11. //同步-选择队列消息发送
  12. SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;
  13. //单向-消息发送
  14. void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
  15. //事务消息-发送
  16. TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
  17. //批量消息-发送
  18. SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

备注:其中启动start()和关闭shutdown()表示生产者的启动和关闭、

2.3 clientConfig

clientConfig:核心属性方法解析(客户端配置)

  1. //nameServer-地址,默认从:系统属性:rocketmq.namesrv.addr 或 环境变量:NAMESRV_ADDR 中获取
  2. private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
  3. //实例名字,默认:DEFAULT 或者 系统属性-rocketmq.client.name
  4. private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
  5. //构建 mq客户端的 id,例子:ip@instanceName@unitName : 172.16.62.75@19312@unitName
  6. public String buildMQClientId() {
  7. StringBuilder sb = new StringBuilder();
  8. sb.append(this.getClientIP());
  9. sb.append("@");
  10. sb.append(this.getInstanceName());
  11. if (!UtilAll.isBlank(this.unitName)) {
  12. sb.append("@");
  13. sb.append(this.unitName);
  14. }
  15. return sb.toString();
  16. }
  17. //设置namesrv地址
  18. public void setNamesrvAddr(String namesrvAddr) {
  19. this.namesrvAddr = namesrvAddr;
  20. }

备注:namesrvAddr表示nameServer地址,可调用setNamesrvAddr方法设置,或者通过环境变量、系统属性设置;buildMQClientId表示设置生产者Id.

三、TransactionMQProducer:(事务消息,后续单独讲解,本章忽略)

(略)

四、DefaultMQProducer 核心属性方法解析:(非事务消息生产者)

  1. // 构造器
  2. public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
  3. this.producerGroup = producerGroup;
  4. defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
  5. }
  6. // 各种发送消息
  7. public SendResult send(
  8. Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  9. return this.defaultMQProducerImpl.send(msg);
  10. }
  11. // 启动方法
  12. public void start() throws MQClientException {
  13. this.defaultMQProducerImpl.start();
  14. if (null != traceDispatcher) {
  15. try {
  16. traceDispatcher.start(this.getNamesrvAddr());
  17. } catch (MQClientException e) {
  18. log.warn("trace dispatcher start failed ", e);
  19. }
  20. }
  21. }

备注:DefaultMQProducer的构造器,send和start等相关的方法,其实都是围绕DefaultMQProducerImpl来转,defaultMQProducerImpl:默认生产者的实现类,其start方法作为生产者启动的核心方法,接下来将核心分析其start方法的实现.

DefaultMQProducerImpl#start

  1. /**
  2. * mq-producer 启动
  3. * @param startFactory
  4. * @throws MQClientException
  5. */
  6. public void start(final boolean startFactory) throws MQClientException {
  7. switch (this.serviceState) {
  8. case CREATE_JUST:
  9. // 0-服务状态设置
  10. this.serviceState = ServiceState.START_FAILED;
  11. //1-检测配置
  12. this.checkConfig();
  13. //2-并改变生产者的 instanceName为进程 ID。
  14. if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  15. this.defaultMQProducer.changeInstanceNameToPID();
  16. }
  17. //3-创建 MQClientlnstance实例
  18. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
  19. //4-向 MQClientlnstance注册生产者。
  20. boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
  21. if (!registerOK) {
  22. this.serviceState = ServiceState.CREATE_JUST;
  23. throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
  24. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  25. null);
  26. }
  27. //5-默认topic信息缓存( this.defaultMQProducer.getCreateTopicKey() = 'TBW102' )
  28. this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
  29. //6-是否启动-mQClientFactory
  30. if (startFactory) {
  31. mQClientFactory.start();
  32. }
  33. log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
  34. this.defaultMQProducer.isSendMessageWithVIPChannel());
  35. this.serviceState = ServiceState.RUNNING;//设置状态为 运行中
  36. break;
  37. case RUNNING:
  38. case START_FAILED:
  39. case SHUTDOWN_ALREADY:
  40. throw new MQClientException("The producer service state not OK, maybe started once, "
  41. + this.serviceState
  42. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  43. null);
  44. default:
  45. break;
  46. }
  47. //7-发送心跳到所有broker
  48. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  49. }

分析如下: 

0-服务状态设置:

设置状态值的意义是为了防止重复启动,其枚举类为:ServiceState; 如果初始化状态不等于:CREATE_JUST,则异常跑出

1-检测配置:

  1. private void checkConfig() throws MQClientException {
  2. Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
  3. if (null == this.defaultMQProducer.getProducerGroup()) {
  4. throw new MQClientException("producerGroup is null", null);
  5. }
  6. //生产所属组 不能等于 DEFAULT_PRODUCER
  7. if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
  8. throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
  9. null);
  10. }
  11. }

备注:为了检测-producerGroup的合法性

2-并改变生产者的instanceName为进程 ID。

  1. // 判断producerGroup是否等于CLIENT_INNER_PRODUCER
  2. if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  3. this.defaultMQProducer.changeInstanceNameToPID();
  4. }
  5. 是调用ClientConfig#changeInstanceNameToPID
  6. public void changeInstanceNameToPID() {
  7. if (this.instanceName.equals("DEFAULT")) {
  8. this.instanceName = String.valueOf(UtilAll.getPid());
  9. }
  10. }

备注:instanceName == DEFAULT, 将其改为 启动的 进程ID,目的是为了MQClientInstance的构建

3-创建MQClientlnstance实例

MQClientManager管理MQClientInstance,其内部维护的数据结构为:ConcurrentHashMap,key:clientId,且MQClientManager本身是单例模式,核心方法分析如下: 

MQClientManager

  1. private static MQClientManager instance = new MQClientManager();//-单列模式
  2. private AtomicInteger factoryIndexGenerator = new AtomicInteger();//index的工厂
  3. // MQClientInstance 缓存
  4. private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>();
  5. //构建返回MQClientInstance
  6. public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
  7. String clientId = clientConfig.buildMQClientId();//构建mq客户端的 id
  8. MQClientInstance instance = this.factoryTable.get(clientId);
  9. if (null == instance) {
  10. instance = new MQClientInstance(clientConfig.cloneClientConfig(),
  11. this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
  12. MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
  13. if (prev != null) {
  14. instance = prev;
  15. log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
  16. } else {
  17. log.info("Created new MQClientInstance for clientId:[{}]", clientId);
  18. }
  19. }
  20. return instance;
  21. }

备注:

ClientConfig.buildMQClientId 在上面已分析,是为了构建clientId;getAndCreateMQClientInstance此方法的目的就是为了构建或查询MQClientInstance.   

 MQClientInstance:封装了 RocketMQ 网络处理 API,是消息生产者( Producer)、消息消费者 (Consumer)与 NameServer、 Broker打交道的网络通道.

接下来分析多个生产者公用同一个MQClientInstance的优点和缺点:

  1. 优点:一般来讲,为了减少客户端的使用资源,如果将所有的instanceName和unitName设置为同样的值,就会只创建一个MQClientInstance实例(用于生产者的topic发送消息在同一套broker集群) 

  2. 缺点:如果多个topic复用MQClientInstance会有怎么的结果呢?这种情况会出现在你在一个JVM里启动了多个Producer时,且没有设置instanceName和unitName,那么这两个Producer会公用一个MQClientInstance,发送的消息会路由到同一个集群。

例如,你起了两个Producer,并且配置的NameServer地址不一样,本意是让这两个Producer往不同集群上分配消息,但是由于共用了一个MQClientInstance,这个MQClientInstance是基于先来的Producer配置构建的,第二个Producer和他公用后被认为是同一instance,配置是相同的,消息的路由就是相同的,就没有达到你想要的效果。

4-向MQClientInstance注册生产者。

  1. //key:group, value: 生产者
  2. private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
  3. // 将当前生产者加入到 MQClientlnstance管理中
  4. public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
  5. if (null == group || null == producer) {
  6. return false;
  7. }
  8. MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
  9. if (prev != null) {
  10. log.warn("the producer group[{}] exist already.", group);
  11. return false;
  12. }
  13. return true;
  14. }

备注:DefaultMQProducerImpl实现的接口类为:MQProducerInner

5-添加默认topic信息缓存,此处需要理解topicPublishInfoTable数据结构的意思

  1. //key:topic value:TopicPublishInfo-路由相关信息,用于消息发送
  2. private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
  3. new ConcurrentHashMap<String, TopicPublishInfo>();

TopicPublishInfo:

分析,熟悉的佩服熟悉的味道,MessageQueue和TopicRouteData在NameServer已分析相当清除,分析如下:

  1. public class TopicPublishInfo {
  2. //是否是顺序消息
  3. private boolean orderTopic = false;
  4. //是否包含路由信息
  5. private boolean haveTopicRouterInfo = false;
  6. //该主题队列的消息队列
  7. private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
  8. //每选择一次消息 队列, 该值会自增1,如果 Integer.MAX_VALUE, 则重置为 0,用于选择消息队列。
  9. private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
  10. //路由信息
  11. private TopicRouteData topicRouteData;
  12. //选择队列方法,lastBrokerName其实上一次发送失败的brokerName,如果不为空,本次选择队列发送所在的brokerName则选择其他的brokerName
  13. public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  14. if (lastBrokerName == null) {
  15. return selectOneMessageQueue();
  16. } else {
  17. //如果消息发 送再失败的话 , 下次进行 消息队列选择 时规避上次 MesageQueue 所 在的 Broker, 否 则还 是 很有可能再次失败 。
  18. int index = this.sendWhichQueue.getAndIncrement();
  19. for (int i = 0; i < this.messageQueueList.size(); i++) {
  20. int pos = Math.abs(index++) % this.messageQueueList.size();
  21. if (pos < 0)
  22. pos = 0;
  23. MessageQueue mq = this.messageQueueList.get(pos);
  24. if (!mq.getBrokerName().equals(lastBrokerName)) {
  25. return mq;
  26. }
  27. }
  28. return selectOneMessageQueue();
  29. }
  30. }
  31. //直接用 sendWhichQueue 自增再获取值 , 与当前路由 表 中消息 队列个数取模, 返回该位置的 MessageQueue(selectOneMessageQueue()方法)
  32. public MessageQueue selectOneMessageQueue() {
  33. int index = this.sendWhichQueue.getAndIncrement();
  34. int pos = Math.abs(index) % this.messageQueueList.size();
  35. if (pos < 0)
  36. pos = 0;
  37. return this.messageQueueList.get(pos);
  38. }
  39. //messageQueueList-长度大于0
  40. public boolean ok() {
  41. return null != this.messageQueueList && !this.messageQueueList.isEmpty();
  42. }
  43. //是否包含路由信息
  44. public boolean isHaveTopicRouterInfo() {
  45. return haveTopicRouterInfo;
  46. }

6-启动-MQClientInstance

   MQClientInstance#start

  1. public void start() throws MQClientException {
  2. synchronized (this) {
  3. switch (this.serviceState) {
  4. case CREATE_JUST:
  5. this.serviceState = ServiceState.START_FAILED; //1>状态-设置启动失败
  6. // If not specified,looking address from name server
  7. if (null == this.clientConfig.getNamesrvAddr()) { //2>判断nameSrvAddr地址是否为空,http获取nameSrvAddr
  8. this.mQClientAPIImpl.fetchNameServerAddr();
  9. }
  10. // Start request-response channel : netty
  11. this.mQClientAPIImpl.start(); // 3>启动netty相关
  12. // Start various schedule tasks
  13. this.startScheduledTask(); //4>【重要】启动相关定时任务
  14. // Start pull service
  15. this.pullMessageService.start(); //5>消费端相关,后续讲解
  16. // Start rebalance service
  17. this.rebalanceService.start(); //6>消费端相关,后续讲解
  18. // Start push service
  19. this.defaultMQProducer.getDefaultMQProducerImpl().start(false); //7>内部启动一个mqProducter,startFactory=false
  20. log.info("the client factory [{}] start OK", this.clientId);
  21. this.serviceState = ServiceState.RUNNING; >8 状态设置运行中
  22. break;
  23. case RUNNING:
  24. break;
  25. case SHUTDOWN_ALREADY:
  26. break;
  27. case START_FAILED:
  28. throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
  29. default:
  30. break;
  31. }
  32. }
  33.     }

备注:后续单独讲解:this.startScheduledTask();

7-发送心跳到所有broker

(this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();)

  1. public void sendHeartbeatToAllBrokerWithLock() {
  2. if (this.lockHeartbeat.tryLock()) {
  3. try {
  4. this.sendHeartbeatToAllBroker(); 1> 发送心跳到所有broker
  5. this.uploadFilterClassSource(); 2> 更新过滤filterSource 可忽略不看
  6. } catch (final Exception e) {
  7. log.error("sendHeartbeatToAllBroker exception", e);
  8. } finally {
  9. this.lockHeartbeat.unlock();
  10. }
  11. } else {
  12. log.warn("lock heartBeat, but failed.");
  13. }
  14. }

备注:sendHeartbeatToAllBroker,相对简单,

对返回结果维护了brokerVersionTable(ConcurrentHashMap),你不可错过.因为会有定时任务定时发送心跳至所有broker

小结:通过7个步骤我们已经了解到生产者的启动流程,大致分为:检测相关配置、注册构建相关类(例如:MQClientInstance相关、netty相关等)、然后启动相关定时任务;简单总结生产者的启动流程,如下:


文章导航

类别标题发布
RedisRedis(一):单线程为何还能这么快己发布
Redis(二):内存模型及回收算法己发布
Redis(三):持久化己发布
Redis(四):主从同步2021.11.05
Redis(五):集群搭建即将上线
Redis(六):实战应用即将上线
ElasticsearchElasticsearch:概述己发布
Elasticsearch:核心己发布
Elasticsearch:实战己发布
Elasticsearch写入流程详解即将上线
Elasticsearch查询流程详解即将上线
Elasticsearch集群一致性即将上线
Lucene的基本概念即将上线
Elasticsearch部署架构和容量规划即将上线
RocketMQRocketMQ—NameServer总结及核心源码剖析己发布
RocketMQ—Producer(一)启动流程解密本文章
RocketMQ—Producer(二)路由动态更新2021.11.08
RocketMQ—Producer(三)发送方式和消息类型2021.11.10
RocketMQ—Producer(四)消息发送流程2021.11.12
RocketMQ—Producer(五)路由队列选择,客户端冗错策略
Broker—启动流程源码解密即将上线
Broker—接受消息处理流程解密即将上线
Tomcat源码分析Tomcat(一):项目结构及架构分析即将上线
Tomcat(二):启动关闭流程分析即将上线
Tomcat(三):应用加载原理分析即将上线
Tomcat(四):网络请求原理分析即将上线
Tomcat(五):嵌入式及性能调优即将上线
NacosNacos项目结构及架构分析即将上线
Nacos服务注册源码解析即将上线
Nacos配置管理源码解析即将上线
Nacos2.0版本优化功能解析即将上线
Netty计算机网络&nio核心原理即将上线
详细解读kafka是如何基于原生nio封装网络通信组件的?即将上线
netty初识之核心组件介绍即将上线
源码解析netty服务端,端口是如何拉起来的?新连接接入又如何处理?即将上线
深入netty中各种事件如何在pipeline中传播的?即将上线
网络编程中拆包粘包是什么?kafka以及netty是如何解决的?即将上线
深入解读netty中最为复杂的缓存分配是如何进行的?即将上线
源码分析netty、kafka、sentinel中不同时间轮实现方式以及细节即将上线
尝试从上帝角度对比kafka&netty中的性能优化,各种设计模式的丰富运用即将上线
Netty在Rocketmq中的实践,RocketMq的消息协议解读即将上线

​​​

关注IT巅峰技术,私信作者,获取以下2021全球架构师峰会PDF资料。

​​

图片​​

图片

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/257000
推荐阅读
相关标签
  

闽ICP备14008679号