赞
踩
本篇详细介绍消息发送、消息消费、RocketMQ queryMsgById 命令以及 rocketmq-console 等使用场景中究竟是用的哪一个ID。
package org.apache.rocketmq.example.quickstart; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("TestTopic" /* Topic */,null /* Tag */, ("Hello RocketMQ test1" ).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); producer.shutdown(); } catch (Throwable e) { e.printStackTrace(); } } }
执行效果如图所示:
即消息发送会返回 msgId 与 offsetMsgId。
package org.apache.rocketmq.example.quickstart; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TestTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("MessageExt msg.getMsgId():" + msgs.get(0).getMsgId()); System.out.println("-------------------分割线-----------------"); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
执行效果如图所示:
不知道大家是否有注意到,调用 msgs.get(0).getMsgId()返回的msgId 与直接输出msgs中的 msgId 不一样,那这又是为什么呢?答案在本文的第二部分有详细分析。
从消息发送的结果可以得知,RocketMQ 发送的返回结果会返回msgId 与 offsetMsgId,那这两个 msgId 分别是代表什么呢?
从这张图可以看出,msgId确实是客户端生成的,接下来我们详细分析一下其生成算法。
MessageClientIDSetter#createUniqID
public static String createUniqID() {
StringBuilder sb = new StringBuilder(LEN * 2);
sb.append(FIX_STRING); // @1
sb.append(UtilAll.bytes2string(createUniqIDBuffer())); // @2
return sb.toString();
}
一个 uniqID 的构建主要分成两个部分:FIX_STRING 与唯一 ID 生成算法,顾名思义,FIX_STRING 就是一个客户端固定一个前缀,那接下来先看一下固定字符串的生成规则。
MessageClientIDSetter静态代码块
static { byte[] ip; try { ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 2 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4); tempBuffer.position(0); tempBuffer.put(ip); tempBuffer.position(ip.length); tempBuffer.putInt(UtilAll.getPid()); tempBuffer.position(ip.length + 2); tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); }
从这里可以看出 FIX_STRING 的主要由:客户端的IP、进程ID、加载 MessageClientIDSetter 的类加载器的 hashcode。
msgId 的唯一性算法由 MessageClientIDSetter 的createUniqIDBuffer 方法实现。
private static byte[] createUniqIDBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
buffer.putInt((int) (System.currentTimeMillis() - startTime));
buffer.putShort((short) COUNTER.getAndIncrement());
return buffer.array();
}
可以得出 msgId 的后半段主要由:当前时间与系统启动时间的差值,以及自增序号。
在消息 Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的文件,然后会再次生成一个id,代码中虽然也叫 msgId,其实这里就是我们常说的 offsetMsgId,即记录了消息的物理偏移量,故我们重点来看一下其具体生成规则:
MessageDecoder#createMessageId
public static String createMessageId(final ByteBuffer input ,
final ByteBuffer addr, final long offset) {
input.flip();
int msgIDLength = addr.limit() == 8 ? 16 : 28;
input.limit(msgIDLength);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
首先结合该方法的调用上下文,先解释一下该方法三个入参的含义:
温馨提示:即在 RocketMQ中,只需要提供 offsetMsgId,可用不必知道该消息所属的topic信息即可查询该条消息的内容。
消息发送时会在 SendSesult中返回 msgId、offsetMsgId,在了解了这个两个 ID 的含义时则问题不大,接下来重点介绍一下消息消费时返回的 msgId 到底是哪一个。
在消息消费时,我们更加希望因为 msgId (即客户端生成的全局唯一性ID),因为该全局性 ID 非常方便实现消费端的幂等。
在本文的1.2节我们也提到一个现象,为什么如下图代码中输出的 msgId 会不一样呢?
在客户端返回的 msg 信息,其最终返回的对象是 MessageClientExt ,继承自 MessageExt。
那我们接下来分别看一下其 getMsgId() 方法与 toString 方法即可。
@Override
public String getMsgId() {
String uniqID = MessageClientIDSetter.getUniqID(this);
if (uniqID == null) {
return this.getOffsetMsgId();
} else {
return uniqID;
}
}
原来在调用 MessageClientExt 中的 getMsgId 方法时,如果消息的属性中存在其唯一ID,则返回消息的全局唯一ID,否则返回消息的 offsetMsgId。
而 MessageClientExt 方法并没有重写 MessageExt 的 toString 方法,其实现如图所示:
故返回的是 MessageExt中 的 msgId,该 msgId 存放的是offsetMsgId,所以才造成了困扰。
温馨提示:如果消息消费失败需要重试,RocketMQ 的做法是将消息重新发送到 Broker 服务器,此时全局 msgId 是不会发送变化的,但该消息的 offsetMsgId 会发送变化,因为其存储在服务器中的位置发生了变化。
在回答了消息发送与消息消费关于msgId与offsetMsgId的困扰后,再来介绍一下如果根据msgId去查询消息。
想必大家对 rocketmq-console ,那在消息查找界面,展示的消息列表中返回的 msgId 又是哪一个呢?
这里的 Message ID 返回的是消息的全局唯一ID。
其实 RokcetMQ 也提供了 queryMsgById 命令来查看消息的内容,不过这里的 msgId 是 offsetMsgId,我们首先将全局唯一ID传入命令,其执行效果如下:
发现报错,那我们将 offsetMsgId 传入其执行效果如图所示:
但在 rocketmq-console 的根据消息ID去查找消息,无论传入哪个msgId,下图该功能都能返回正确的结果:
这是因为 rocketmq-console 做了兼容,首先将传入的 msgId 用 queryMsgById 该命令去查,如果报错,则当成 uniqID(全局ID)去查,首先全局ID会存储在消息的属性中,并会创建 Hash 索引,即可用通过 indexfile 快速定位到该条消息。
关于 RocketMQ 消息ID的相关问题就介绍到这里了,希望能得到您的认可,帮忙点个赞,谢谢。
见文如面,我是威哥,热衷于成体系剖析JAVA主流中间件,关注公众号『中间件兴趣圈』,回复专栏可获取成体系专栏导航,回复资料可以获取笔者的学习思维导图。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。