赞
踩
RMQ的消息封装在org.apache.rocketmq.common.message类中,属性:
private String topic; //消息所属topic
private int flag; //消息flag
private Map<String, String> properties; //扩展属性
private byte[] body; //消息体
private String transactionId;
Message的基础属性包含消息所属的topic、消息的flag、扩展的属性、消息体等;
Message的Flag中定义的内容:
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
DefaultMQProducerImpl是默认的消息生产者实现类,它实现MQAdmin的接口,了解消息生产者就可以直接从该类入手;
消息生产者的启动,我们可以从DefaultMQProducerlmpl的start方法跟踪:
step1:检查productGroup是否符合要求;并改变生产者的instanceName为进程ID
this .checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(
MixAll.CL ENT INNER PRODUCER GROUP)) {
this.defaultMQProducer.changeinstanceNameToPID();
}
step2:创建MQClientInstance实例。整个JVM实例中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表ConcurrentMap<String,MQClientInstance> factoryTable,也就是同一个clientId只会创建一个MQClientInstance。
this.mQClientFactory = MQClientManager.getinstance().
getAndCreateMQClientinstance(this.defaultMQProducer, rpcHook);
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; }
step3:向MQClientInstance注册,将当前生产者加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
step4:启动MQClientInstance,如果MQClientInstance已经启动,则本次启动不会真正执行。
同样的,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。