当前位置:   article > 正文

rocketmq学习笔记 五 源码之rocketmq-store_rocketmq服务端修改store

rocketmq服务端修改store

因为broker东西比较多,所以放到最后。今天来学习下 rocketmq-store


核心流程


问题

1.看看消息如何做的持久化

2.看看如何做的主从同步


config 存储的配置信息

  1. public enum BrokerRole {
  2. ASYNC_MASTER,
  3. SYNC_MASTER,
  4. SLAVE;
  5. }

  1. public enum FlushDiskType {
  2. SYNC_FLUSH,
  3. ASYNC_FLUSH
  4. }

  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package com.alibaba.rocketmq.store.config;
  18. import com.alibaba.rocketmq.common.annotation.ImportantField;
  19. import com.alibaba.rocketmq.store.ConsumeQueue;
  20. import java.io.File;
  21. /**
  22. * @author vongosling
  23. * @author shijia.wxr
  24. */
  25. public class MessageStoreConfig {
  26. //The root directory in which the log data is kept
  27. @ImportantField
  28. private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
  29. //The directory in which the commitlog is kept
  30. @ImportantField
  31. private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
  32. + File.separator + "commitlog";
  33. // CommitLog file size,default is 1G
  34. private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
  35. // ConsumeQueue file size, default is 30W
  36. private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQStoreUnitSize;
  37. // CommitLog flush interval
  38. @ImportantField
  39. private int flushIntervalCommitLog = 1000;
  40. // Whether schedule flush,default is real-time
  41. @ImportantField
  42. private boolean flushCommitLogTimed = false;
  43. // ConsumeQueue flush interval
  44. private int flushIntervalConsumeQueue = 1000;
  45. // Resource reclaim interval
  46. private int cleanResourceInterval = 10000;
  47. // CommitLog removal interval
  48. private int deleteCommitLogFilesInterval = 100;
  49. // ConsumeQueue removal interval
  50. private int deleteConsumeQueueFilesInterval = 100;
  51. private int destroyMapedFileIntervalForcibly = 1000 * 120;
  52. private int redeleteHangedFileInterval = 1000 * 120;
  53. // When to delete,default is at 4 am
  54. @ImportantField
  55. private String deleteWhen = "04";
  56. private int diskMaxUsedSpaceRatio = 75;
  57. // The number of hours to keep a log file before deleting it (in hours)
  58. @ImportantField
  59. private int fileReservedTime = 72;
  60. // Flow control for ConsumeQueue
  61. private int putMsgIndexHightWater = 600000;
  62. // The maximum size of a single log file,default is 512K
  63. private int maxMessageSize = 1024 * 1024 * 4;
  64. // Whether check the CRC32 of the records consumed.
  65. // This ensures no on-the-wire or on-disk corruption to the messages occurred.
  66. // This check adds some overhead, so it may be disabled in cases seeking extreme performance.
  67. private boolean checkCRCOnRecover = true;
  68. // How many pages are to be flushed when flush CommitLog
  69. private int flushCommitLogLeastPages = 4;
  70. // Flush page size when the disk in warming state
  71. private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
  72. // How many pages are to be flushed when flush ConsumeQueue
  73. private int flushConsumeQueueLeastPages = 2;
  74. private int flushCommitLogThoroughInterval = 1000 * 10;
  75. private int flushConsumeQueueThoroughInterval = 1000 * 60;
  76. @ImportantField
  77. private int maxTransferBytesOnMessageInMemory = 1024 * 256;
  78. @ImportantField
  79. private int maxTransferCountOnMessageInMemory = 32;
  80. @ImportantField
  81. private int maxTransferBytesOnMessageInDisk = 1024 * 64;
  82. @ImportantField
  83. private int maxTransferCountOnMessageInDisk = 8;
  84. @ImportantField
  85. private int accessMessageInMemoryMaxRatio = 40;
  86. @ImportantField
  87. private boolean messageIndexEnable = true;
  88. private int maxHashSlotNum = 5000000;
  89. private int maxIndexNum = 5000000 * 4;
  90. private int maxMsgsNumBatch = 64;
  91. @ImportantField
  92. private boolean messageIndexSafe = false;
  93. private int haListenPort = 10912;
  94. private int haSendHeartbeatInterval = 1000 * 5;
  95. private int haHousekeepingInterval = 1000 * 20;
  96. private int haTransferBatchSize = 1024 * 32;
  97. @ImportantField
  98. private String haMasterAddress = null;
  99. private int haSlaveFallbehindMax = 1024 * 1024 * 256;
  100. @ImportantField
  101. private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
  102. @ImportantField
  103. private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
  104. private int syncFlushTimeout = 1000 * 5;
  105. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  106. private long flushDelayOffsetInterval = 1000 * 10;
  107. @ImportantField
  108. private boolean cleanFileForciblyEnable = true;
  109. private boolean warmMapedFileEnable = false;
  110. private boolean offsetCheckInSlave = false;
  111. private boolean debugLockEnable = false;
  112. private boolean duplicationEnable = false;
  113. private boolean diskFallRecorded = true;
  114. private long osPageCacheBusyTimeOutMills = 1000;
  115. private int defaultQueryMaxNum = 32;


DefaultMessageStore


看下其启动任务,包含了哪些东西

  1. public void start() throws Exception {
  2. this.flushConsumeQueueService.start();
  3. this.commitLog.start();
  4. this.storeStatsService.start();
  5. if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
  6. this.scheduleMessageService.start();
  7. }
  8. if (this.getMessageStoreConfig().isDuplicationEnable()) {
  9. this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
  10. } else {
  11. this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
  12. }
  13. this.reputMessageService.start();
  14. this.haService.start();
  15. this.createTempFile();
  16. this.addScheduleTask();
  17. this.shutdown = false;
  18. }

FlushConsumeQueueService


  1. private void doFlush(int retryTimes) {
  2. int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
  3. if (retryTimes == RetryTimesOver) {
  4. flushConsumeQueueLeastPages = 0;
  5. }
  6. long logicsMsgTimestamp = 0;
  7. int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
  8. long currentTimeMillis = System.currentTimeMillis();
  9. if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
  10. this.lastFlushTimestamp = currentTimeMillis;
  11. flushConsumeQueueLeastPages = 0;
  12. logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
  13. }
  14. ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
  15. for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
  16. for (ConsumeQueue cq : maps.values()) {
  17. boolean result = false;
  18. for (int i = 0; i < retryTimes && !result; i++) {
  19. result = cq.commit(flushConsumeQueueLeastPages);
  20. }
  21. }
  22. }
  23. if (0 == flushConsumeQueueLeastPages) {
  24. if (logicsMsgTimestamp > 0) {
  25. DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
  26. }
  27. DefaultMessageStore.this.getStoreCheckpoint().flush();
  28. }
  29. }



CommitLog


StoreStatsService

统计输出message store的TPS

  1. public void run() {
  2. log.info(this.getServiceName() + " service started");
  3. while (!this.isStoped()) {
  4. try {
  5. this.waitForRunning(FrequencyOfSampling);
  6. this.sampling();
  7. this.printTps();
  8. } catch (Exception e) {
  9. log.warn(this.getServiceName() + " service has exception. ", e);
  10. }
  11. }
  12. log.info(this.getServiceName() + " service end");
  13. }


ScheduleMessageService



  1. public void start() {
  2. for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
  3. Integer level = entry.getKey();
  4. Long timeDelay = entry.getValue();
  5. Long offset = this.offsetTable.get(level);
  6. if (null == offset) {
  7. offset = 0L;
  8. }
  9. if (timeDelay != null) {
  10. this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
  11. }
  12. }
  13. this.timer.scheduleAtFixedRate(new TimerTask() {
  14. @Override
  15. public void run() {
  16. try {
  17. ScheduleMessageService.this.persist();
  18. } catch (Exception e) {
  19. log.error("scheduleAtFixedRate flush exception", e);
  20. }
  21. }
  22. }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
  23. }


ReputMessageService

  1. private void doReput() {
  2. for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
  3. if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
  4. && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
  5. break;
  6. }
  7. SelectMapedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
  8. if (result != null) {
  9. try {
  10. this.reputFromOffset = result.getStartOffset();
  11. for (int readSize = 0; readSize < result.getSize() && doNext; ) {
  12. DispatchRequest dispatchRequest =
  13. DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
  14. int size = dispatchRequest.getMsgSize();
  15. if (dispatchRequest.isSuccess()) {
  16. if (size > 0) {
  17. DefaultMessageStore.this.doDispatch(dispatchRequest);
  18. if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
  19. && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
  20. DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
  21. dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
  22. dispatchRequest.getTagsCode());
  23. }
  24. // FIXED BUG By shijia
  25. this.reputFromOffset += size;
  26. readSize += size;
  27. if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
  28. DefaultMessageStore.this.storeStatsService
  29. .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
  30. DefaultMessageStore.this.storeStatsService
  31. .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
  32. .addAndGet(dispatchRequest.getMsgSize());
  33. }
  34. }
  35. else if (size == 0) {
  36. this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
  37. readSize = result.getSize();
  38. }
  39. } else if (!dispatchRequest.isSuccess()) {
  40. if (size > 0) {
  41. log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
  42. this.reputFromOffset += size;
  43. }
  44. else {
  45. doNext = false;
  46. if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
  47. log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
  48. this.reputFromOffset);
  49. this.reputFromOffset += (result.getSize() - readSize);
  50. }
  51. }
  52. }
  53. }
  54. } finally {
  55. result.release();
  56. }
  57. } else {
  58. doNext = false;
  59. }
  60. }
  61. }


HAService

  1. public void start() {
  2. this.acceptSocketService.beginAccept();
  3. this.acceptSocketService.start();
  4. this.groupTransferService.start();
  5. this.haClient.start();
  6. }




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/603714
推荐阅读
相关标签
  

闽ICP备14008679号