当前位置:   article > 正文

RocketMQ之Producer篇:可靠的消息传输_消息 topic 数据结构

消息 topic 数据结构

一、消息数据结构

RMQ的消息封装在org.apache.rocketmq.common.message类中,属性:

    private String topic;                   //消息所属topic
    private int flag;                       //消息flag
    private Map<String, String> properties; //扩展属性
    private byte[] body;                    //消息体
    private String transactionId;
  • 1
  • 2
  • 3
  • 4
  • 5

Message的基础属性包含消息所属的topic、消息的flag、扩展的属性、消息体等;

Message的Flag中定义的内容:

    public final static int COMPRESSED_FLAG = 0x1;
    public final static int MULTI_TAGS_FLAG = 0x1 << 1;
    public final static int TRANSACTION_NOT_TYPE = 0;
    public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
    public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
    public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

二、生产者启动流程

DefaultMQProducerImpl是默认的消息生产者实现类,它实现MQAdmin的接口,了解消息生产者就可以直接从该类入手;
消息生产者的启动,我们可以从DefaultMQProducerlmpl的start方法跟踪:
在这里插入图片描述

step1:检查productGroup是否符合要求;并改变生产者的instanceName为进程ID

    this .checkConfig();
    if (!this.defaultMQProducer.getProducerGroup().equals(
        MixAll.CL ENT INNER PRODUCER GROUP)) {
            this.defaultMQProducer.changeinstanceNameToPID();
    }    
  • 1
  • 2
  • 3
  • 4
  • 5

step2:创建MQClientInstance实例。整个JVM实例中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表ConcurrentMap<String,MQClientInstance> factoryTable,也就是同一个clientId只会创建一个MQClientInstance。

    this.mQClientFactory = MQClientManager.getinstance().
    getAndCreateMQClientinstance(this.defaultMQProducer, rpcHook); 
  • 1
  • 2
    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }
        return instance;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

step3:向MQClientInstance注册,将当前生产者加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。

    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
    if (!registerOK) {
        this.serviceState = ServiceState.CREATE_JUST;
        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
            null);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

step4:启动MQClientInstance,如果MQClientInstance已经启动,则本次启动不会真正执行。

三、消息发送流程

在这里插入图片描述

同样的,

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/256907
推荐阅读
相关标签
  

闽ICP备14008679号