当前位置:   article > 正文

RocketMq 重要知识点_rocketmq offsetstore

rocketmq offsetstore

OffserStore和信息存储位置

       实际运行系统,难免会遇到重新消费某条消息,跳过一段时间内的消息等情况。这些异常情况的处理都和offset有关。本节主要分析存储位置以及如何根据需要调整offset的值。

      首先先来明确一下offset的含义,rocketmq中一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个message queue,offset是某个Topic下的一条消息在某个message queue里的位置,通过offset 的值可以定位到这条消息或者指示Consume从这条消息开始向后继续处理。

      如下是offset的类结构,主要分为本地文件类型和broker代存类型两种。对于DefaultMQPushConsume 来说,默认是集群模式,也就是同一个消费者组里的多个消费者每个人消费一部分,各自收到的消息内容就不一样,这种情况下,由broker端存储和控制offset的值,使用RomoteBrokerOffsetStore  结构。

备注:(Diagrams可以展示父类,实现类可以用实现implemention或者光标移到该类上右击Browse Type Hierarchy)

     在广播模式下,每个消费者都收到这个topic的全部消息,各个消费者之间互不干扰,rocketmq使用LocalFileOffsetStore,把offset存到本地,offsetStroe  使用json格式存储,简单明了。

    在使用DefaultMQPushConsume 的时候,我们不用关心OffsetStore的事,但是如果是PullConsume,我们就要自己处理OffsetStore了。在上一篇博文中,pullConsume的示例中,代码把offset存到了内存,没有持久化存储,这样就有可能因为程序的异常或者重启而丢失offset,实际应用中并不推荐这么做。为了能让我们看清楚OfsetStore究竟是何物?

1.首先通过广播模式去消费某条topic 中的消息(LocalFileOffsetStore文件类型是以本地存储的,必须要是广播模式下,广播模式下接受该topic所有的消息,包括历史消息)

  1. public class ProducerQuickStart {
  2. public static void main(String[] args) throws MQClientException,InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("unique_producer_group__name");
  4. producer.setInstanceName("instance1");
  5. producer.setRetryTimesWhenSendFailed(3);//重试次数 192.168.138.47 192.168.142.133 192.168.0.102
  6. producer.setNamesrvAddr("192.168.139.151:9876");//多个用;分割 192.168.138.47
  7. producer.start();
  8. for (int i = 0; i < 1; i++) {
  9. Date date = new Date();
  10. SimpleDateFormat sdf = new SimpleDateFormat();
  11. String format = sdf.format(date);
  12. System.out.println("准备发送:" + format);
  13. Message message = new Message("topicName", String.valueOf(i),format.getBytes());
  14. SendResult sendResult= new SendResult();
  15. try {
  16. sendResult = producer.send(message);
  17. } catch (RemotingException |MQBrokerException | InterruptedException e) {
  18. System.out.println("消息发送失败:" + sendResult.getMsgId());
  19. e.printStackTrace();
  20. }
  21. System.out.println("key:"+i + "消息的发送结果为:" + sendResult.toString() + "消息ID为:" + sendResult.getMsgId());
  22. }
  23. producer.shutdown();
  24. }
  25. }
  1. public class ConsumeQuickStart {
  2. public static void main(String[] args) throws MQClientException {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_consume_group_name");
  4. consumer.setNamesrvAddr("192.168.139.151:9876");//
  5. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  6. consumer.setMessageModel(MessageModel.BROADCASTING);//默认是集群模式
  7. consumer.subscribe("topicName",null);
  8. //如果要检查配置信息
  9. consumer.registerMessageListener((MessageListenerConcurrently) (listMsg, consumeConcurrentlyContext) -> {
  10. byte[] body = listMsg.get(0).getBody();
  11. try {
  12. String ms = new String(body,"utf-8");
  13. System.out.println(Thread.currentThread().getName()+"收到消息:" + ms);
  14. } catch (UnsupportedEncodingException e) {
  15. e.printStackTrace();
  16. }
  17. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  18. });
  19. consumer.start();
  20. }
  21. }

2.  查看LocalFileOffsetStore 本次文件存储的位置(LocalFileOffsetStore类的源码)

  1. public final static String LocalOffsetStoreDir = System.getProperty(
  2. "rocketmq.client.localOffsetStoreDir",
  3. System.getProperty("user.home") + File.separator + ".rocketmq_offsets");///Users/humingming/.rocketmq_offsets
  4. this.storePath = LocalOffsetStoreDir + File.separator + //
  5. this.mQClientFactory.getClientId() + File.separator + //
  6. this.groupName + File.separator + //
  7. "offsets.json";//+ 192.168.0.102@DEFAULT/unique_consume_group_name/offsets.json

        我这里地址就是:/Users/humingming/.rocketmq_offsets/192.168.0.102@DEFAULT/unique_consume_group_name/offset.json

所以可以看出其大概组成就是:用户根目录 + .rocketmq_offsets + mq服务端IP地址@? + 消费者组名 + offset.json,因为我消费端起了两个服务(之前向两个服务端发送过两次消息,所以会有两个json文件)

  1. {
  2. "offsetTable":{{
  3. "brokerName":"humingmingdeMacBook-Pro.local",
  4. "queueId":3,
  5. "topic":"topicName"
  6. }:8,{
  7. "brokerName":"humingmingdeMacBook-Pro.local",
  8. "queueId":1,
  9. "topic":"topicName"
  10. }:12,{
  11. "brokerName":"humingmingdeMacBook-Pro.local",
  12. "queueId":2,
  13. "topic":"topicName"
  14. }:8,{
  15. "brokerName":"humingmingdeMacBook-Pro.local",
  16. "queueId":0,
  17. "topic":"topicName"
  18. }:31
  19. }
  20. }
  21. // 可以看出当前消费端已经消费 各个队列最大offset {key:messagqqueue value:bigOffset}

        值得注意的一点就是:因为广播模式下是offset存储在本地,当有消息发送过来时,此时服务端只要没收到消息或者消息接受失败,自然是不会去更改本地存储的OffsetStore 的json文件的,所以你下一次消息消费时候,会先去看该消息队列的最大offset,然后在看本地json文件的该消息队列的value值,如果相同则认为没有新的消息,如果队列的offset > json文件该队列的value 则认为有新的消息过来,则会去更新或者新增此文件,所以我们可以主动去更改json文件的value值来让消费端收不到消息或者重复消费之前消费过的消息。目前据我所知道这仅仅限定于广播模式。

      那么问题来了,因为我们之前有过利用pull模式去拉取消息,但是当时仅仅是将每个队列的offset存于内存中,比如(这里是存于map)

  1. package rocketmq.day04;
  2. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
  3. import com.alibaba.rocketmq.client.consumer.PullResult;
  4. import com.alibaba.rocketmq.client.exception.MQClientException;
  5. import com.alibaba.rocketmq.common.message.MessageExt;
  6. import com.alibaba.rocketmq.common.message.MessageQueue;
  7. import java.io.UnsupportedEncodingException;
  8. import java.util.List;
  9. import java.util.Set;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. import java.util.concurrent.atomic.AtomicLong;
  12. /**
  13. * @author heian
  14. * @create 2020-01-10-11:14 上午
  15. * @description
  16. */
  17. public class OffsetPersistence {
  18. private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<>();
  19. private long getMessageQueueOffset(MessageQueue mq) {
  20. AtomicLong atomicLong = offsetTable.get(mq);
  21. if (atomicLong != null)
  22. return atomicLong.get();
  23. return 0;
  24. }
  25. private void putMessageQueueOffset(MessageQueue mq, long offset) {
  26. offsetTable.put(mq, new AtomicLong(offset));
  27. }
  28. public void pullMsg() throws MQClientException {
  29. DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consume_group");
  30. consumer.setNamesrvAddr("192.168.139.151:9876");// 192.168.0.102
  31. consumer.start();
  32. Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicName");
  33. for (int j=0;j<=0;j++){//不断轮询 演示的话就调一次算了
  34. for (MessageQueue mq : mqs) {
  35. SINGLE_MQ:
  36. while (true) {
  37. try {
  38. //第一次拉去将获得队列的所有消息写入到数据库,保存当时的 nextBeginOffset,
  39. //下一次轮询的时候,在与该队列比较其nextBeginOffset 如果变化了,则说明有消息进来了,则返回FOUND状态
  40. //我这里是100 一拉取 假设某条队列的nextBeginOffset=12 则会去拉去两次 一次拉取8 状态为FOUND,第二次拉取4 状态为FOUND 第三次拉去0 状态为NO_NEW_MSG
  41. PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 100);
  42. System.out.println("拉取结果:"+pullResult + "队列为:" +mq );
  43. putMessageQueueOffset(mq, pullResult.getNextBeginOffset());//可以存在数据库或者redis中 持久化
  44. switch (pullResult.getPullStatus()) {
  45. case FOUND:
  46. List<MessageExt> listMsg = pullResult.getMsgFoundList();
  47. for (int i = 0; i < listMsg.size(); i++) {
  48. byte[] body = listMsg.get(i).getBody();
  49. try {
  50. String ms = new String(body,"utf-8");
  51. //System.out.println("收到消息:" + ms);
  52. } catch (UnsupportedEncodingException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. break;
  57. case NO_MATCHED_MSG:
  58. break;
  59. case NO_NEW_MSG:
  60. break SINGLE_MQ;
  61. case OFFSET_ILLEGAL:
  62. break;
  63. default:
  64. break;
  65. }
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. }
  72. }
  73. public static void main(String[] args) throws MQClientException {
  74. OffsetPersistence offsetPersistence = new OffsetPersistence();
  75. offsetPersistence.pullMsg();
  76. }
  77. }

        那么我们怎么将内存中的map持久化到硬盘中呢,其实参考源码LocalFileOffsetStore类也不难,无非就是将map转为字符串,将字符串写入到file.json中,所以改造下如下:

  1. package rocketmq.day04;
  2. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
  3. import com.alibaba.rocketmq.client.consumer.PullResult;
  4. import com.alibaba.rocketmq.client.consumer.store.OffsetSerializeWrapper;
  5. import com.alibaba.rocketmq.client.exception.MQClientException;
  6. import com.alibaba.rocketmq.common.MixAll;
  7. import com.alibaba.rocketmq.common.message.MessageExt;
  8. import com.alibaba.rocketmq.common.message.MessageQueue;
  9. import java.io.IOException;
  10. import java.io.UnsupportedEncodingException;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.Set;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import java.util.concurrent.atomic.AtomicLong;
  16. /**
  17. * @author heian
  18. * @create 2020-01-10-11:14 上午
  19. * @description
  20. */
  21. public class OffsetPersistence2 {
  22. private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<>();
  23. //offsetStore 持久化地址 这里我就写死了
  24. private static final String groupName="pull_consume_group";
  25. private static final String storePath = "/Users/humingming/.rocketmq_offsets/"+groupName+"/offset.json";
  26. private long getMessageQueueOffset(MessageQueue mq) {
  27. AtomicLong atomicLong = offsetTable.get(mq);
  28. if (atomicLong != null)
  29. return atomicLong.get();
  30. return 0;
  31. }
  32. /**
  33. * 通过message和offset 得到map (存在此条消息则put增加,不存在则去set值更新)
  34. */
  35. public void putMessageQueueOffset(final MessageQueue mq,long offset){
  36. if (mq != null){
  37. AtomicLong offsetOld = this.offsetTable.get(mq);
  38. if (offsetOld == null){
  39. this.offsetTable.put(mq,new AtomicLong(offset));
  40. }else {
  41. offsetOld.set(offset);
  42. }
  43. }
  44. }
  45. //将set集合中的消息队列与公共变量的消息队列求并集,并存到对应的磁盘中
  46. public void persistAll(Set<MessageQueue> mqs){
  47. if (null == mqs || mqs.isEmpty()){
  48. return;
  49. }
  50. OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
  51. //map中的示例逐个与set集合中比较,有则将map中的消息存入 offsetSerializeWrapper(实际上就是一个可以序列化的map)
  52. for (Map.Entry<MessageQueue,AtomicLong> entry:this.offsetTable.entrySet()){
  53. if (mqs.contains(entry.getKey())){
  54. AtomicLong offset = entry.getValue();
  55. offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
  56. }
  57. }
  58. String jsonStr = offsetSerializeWrapper.toJson();
  59. if (jsonStr != null){
  60. try {
  61. MixAll.string2File(jsonStr,this.storePath);
  62. }catch (IOException e){
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  67. public void pullMsg() throws MQClientException {
  68. DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consume_group");
  69. consumer.setNamesrvAddr("192.168.139.151:9876");// 192.168.0.102
  70. consumer.start();
  71. Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topicName");
  72. for (int j=0;j<=0;j++){
  73. for (MessageQueue mq : mqs) {
  74. SINGLE_MQ:
  75. while (true) {
  76. try {
  77. //第一次拉去将获得队列的所有消息写入到数据库,保存当时的 nextBeginOffset,
  78. //下一次轮询的时候,在与该队列比较其nextBeginOffset 如果变化了,则说明有消息进来了,则返回FOUND状态
  79. //我这里是100 一拉取 假设某条队列的nextBeginOffset=12 则会去拉去两次 一次拉取8 状态为FOUND,第二次拉取4 状态为FOUND 第三次拉去0 状态为NO_NEW_MSG
  80. PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 100);
  81. System.out.println("拉取结果:"+pullResult + "队列为:" +mq );
  82. putMessageQueueOffset(mq, pullResult.getNextBeginOffset());//将队列信息存于map
  83. switch (pullResult.getPullStatus()) {
  84. case FOUND:
  85. List<MessageExt> listMsg = pullResult.getMsgFoundList();
  86. for (int i = 0; i < listMsg.size(); i++) {
  87. byte[] body = listMsg.get(i).getBody();
  88. try {
  89. String ms = new String(body,"utf-8");
  90. //System.out.println("收到消息:" + ms);
  91. } catch (UnsupportedEncodingException e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. break;
  96. case NO_MATCHED_MSG:
  97. break;
  98. case NO_NEW_MSG:
  99. break SINGLE_MQ;
  100. case OFFSET_ILLEGAL:
  101. break;
  102. default:
  103. break;
  104. }
  105. } catch (Exception e) {
  106. e.printStackTrace();
  107. }
  108. }
  109. }
  110. //持久化
  111. persistAll(mqs);
  112. }
  113. }
  114. public static void main(String[] args) throws MQClientException {
  115. OffsetPersistence2 offsetPersistence = new OffsetPersistence2();
  116. offsetPersistence.pullMsg();
  117. }
  118. }

 所以会在/Users/humingming/.rocketmq_offsets/pull_consume_group 生成一个json文件 如下: 

控制台打印信息如下:

  1. 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=8]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=2]
  2. 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=2]
  3. 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=12, minOffset=0, maxOffset=12, msgFoundList=12]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=1]
  4. 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=12, minOffset=0, maxOffset=12, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=1]
  5. 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=32, minOffset=0, maxOffset=38, msgFoundList=32]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=0]
  6. 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=38, minOffset=0, maxOffset=38, msgFoundList=6]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=0]
  7. 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=38, minOffset=0, maxOffset=38, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=0]
  8. 拉取结果:PullResult [pullStatus=FOUND, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=8]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=3]
  9. 拉取结果:PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=8, minOffset=0, maxOffset=8, msgFoundList=0]队列为:MessageQueue [topic=topicName, brokerName=bogon, queueId=3]

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/404698
推荐阅读
  

闽ICP备14008679号