赞
踩
因为broker东西比较多,所以放到最后。今天来学习下 rocketmq-store
1.看看消息如何做的持久化
2.看看如何做的主从同步
- public enum BrokerRole {
- ASYNC_MASTER,
- SYNC_MASTER,
- SLAVE;
- }
- public enum FlushDiskType {
- SYNC_FLUSH,
- ASYNC_FLUSH
- }
- /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package com.alibaba.rocketmq.store.config;
-
- import com.alibaba.rocketmq.common.annotation.ImportantField;
- import com.alibaba.rocketmq.store.ConsumeQueue;
-
- import java.io.File;
-
-
- /**
- * @author vongosling
- * @author shijia.wxr
- */
- public class MessageStoreConfig {
- //The root directory in which the log data is kept
- @ImportantField
- private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
-
- //The directory in which the commitlog is kept
- @ImportantField
- private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
- + File.separator + "commitlog";
-
- // CommitLog file size,default is 1G
- private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
- // ConsumeQueue file size, default is 30W
- private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQStoreUnitSize;
- // CommitLog flush interval
- @ImportantField
- private int flushIntervalCommitLog = 1000;
- // Whether schedule flush,default is real-time
- @ImportantField
- private boolean flushCommitLogTimed = false;
- // ConsumeQueue flush interval
- private int flushIntervalConsumeQueue = 1000;
- // Resource reclaim interval
- private int cleanResourceInterval = 10000;
- // CommitLog removal interval
- private int deleteCommitLogFilesInterval = 100;
- // ConsumeQueue removal interval
- private int deleteConsumeQueueFilesInterval = 100;
- private int destroyMapedFileIntervalForcibly = 1000 * 120;
- private int redeleteHangedFileInterval = 1000 * 120;
- // When to delete,default is at 4 am
- @ImportantField
- private String deleteWhen = "04";
- private int diskMaxUsedSpaceRatio = 75;
- // The number of hours to keep a log file before deleting it (in hours)
- @ImportantField
- private int fileReservedTime = 72;
- // Flow control for ConsumeQueue
- private int putMsgIndexHightWater = 600000;
- // The maximum size of a single log file,default is 512K
- private int maxMessageSize = 1024 * 1024 * 4;
- // Whether check the CRC32 of the records consumed.
- // This ensures no on-the-wire or on-disk corruption to the messages occurred.
- // This check adds some overhead, so it may be disabled in cases seeking extreme performance.
- private boolean checkCRCOnRecover = true;
- // How many pages are to be flushed when flush CommitLog
- private int flushCommitLogLeastPages = 4;
- // Flush page size when the disk in warming state
- private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
- // How many pages are to be flushed when flush ConsumeQueue
- private int flushConsumeQueueLeastPages = 2;
- private int flushCommitLogThoroughInterval = 1000 * 10;
- private int flushConsumeQueueThoroughInterval = 1000 * 60;
- @ImportantField
- private int maxTransferBytesOnMessageInMemory = 1024 * 256;
- @ImportantField
- private int maxTransferCountOnMessageInMemory = 32;
- @ImportantField
- private int maxTransferBytesOnMessageInDisk = 1024 * 64;
- @ImportantField
- private int maxTransferCountOnMessageInDisk = 8;
- @ImportantField
- private int accessMessageInMemoryMaxRatio = 40;
- @ImportantField
- private boolean messageIndexEnable = true;
- private int maxHashSlotNum = 5000000;
- private int maxIndexNum = 5000000 * 4;
- private int maxMsgsNumBatch = 64;
- @ImportantField
- private boolean messageIndexSafe = false;
- private int haListenPort = 10912;
- private int haSendHeartbeatInterval = 1000 * 5;
- private int haHousekeepingInterval = 1000 * 20;
- private int haTransferBatchSize = 1024 * 32;
- @ImportantField
- private String haMasterAddress = null;
- private int haSlaveFallbehindMax = 1024 * 1024 * 256;
- @ImportantField
- private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
- @ImportantField
- private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
- private int syncFlushTimeout = 1000 * 5;
- private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- private long flushDelayOffsetInterval = 1000 * 10;
- @ImportantField
- private boolean cleanFileForciblyEnable = true;
- private boolean warmMapedFileEnable = false;
- private boolean offsetCheckInSlave = false;
- private boolean debugLockEnable = false;
- private boolean duplicationEnable = false;
- private boolean diskFallRecorded = true;
- private long osPageCacheBusyTimeOutMills = 1000;
- private int defaultQueryMaxNum = 32;
- public void start() throws Exception {
- this.flushConsumeQueueService.start();
- this.commitLog.start();
- this.storeStatsService.start();
-
-
- if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
- this.scheduleMessageService.start();
- }
-
- if (this.getMessageStoreConfig().isDuplicationEnable()) {
- this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
- } else {
- this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
- }
- this.reputMessageService.start();
-
- this.haService.start();
-
- this.createTempFile();
- this.addScheduleTask();
- this.shutdown = false;
- }
- private void doFlush(int retryTimes) {
- int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
-
- if (retryTimes == RetryTimesOver) {
- flushConsumeQueueLeastPages = 0;
- }
-
- long logicsMsgTimestamp = 0;
-
-
- int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
- long currentTimeMillis = System.currentTimeMillis();
- if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
- this.lastFlushTimestamp = currentTimeMillis;
- flushConsumeQueueLeastPages = 0;
- logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
- }
-
- ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
-
- for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
- for (ConsumeQueue cq : maps.values()) {
- boolean result = false;
- for (int i = 0; i < retryTimes && !result; i++) {
- result = cq.commit(flushConsumeQueueLeastPages);
- }
- }
- }
-
- if (0 == flushConsumeQueueLeastPages) {
- if (logicsMsgTimestamp > 0) {
- DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
- }
- DefaultMessageStore.this.getStoreCheckpoint().flush();
- }
- }
- public void run() {
- log.info(this.getServiceName() + " service started");
-
- while (!this.isStoped()) {
- try {
- this.waitForRunning(FrequencyOfSampling);
-
- this.sampling();
-
- this.printTps();
- } catch (Exception e) {
- log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- log.info(this.getServiceName() + " service end");
- }
- public void start() {
-
- for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
- Integer level = entry.getKey();
- Long timeDelay = entry.getValue();
- Long offset = this.offsetTable.get(level);
- if (null == offset) {
- offset = 0L;
- }
-
- if (timeDelay != null) {
- this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
- }
- }
-
- this.timer.scheduleAtFixedRate(new TimerTask() {
-
- @Override
- public void run() {
- try {
- ScheduleMessageService.this.persist();
- } catch (Exception e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
- }
- }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
- }
- private void doReput() {
- for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
-
- if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
- && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
- break;
- }
-
- SelectMapedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
- if (result != null) {
- try {
- this.reputFromOffset = result.getStartOffset();
-
- for (int readSize = 0; readSize < result.getSize() && doNext; ) {
- DispatchRequest dispatchRequest =
- DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
- int size = dispatchRequest.getMsgSize();
-
- if (dispatchRequest.isSuccess()) {
- if (size > 0) {
- DefaultMessageStore.this.doDispatch(dispatchRequest);
-
- if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
- && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
- DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
- dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
- dispatchRequest.getTagsCode());
- }
- // FIXED BUG By shijia
- this.reputFromOffset += size;
- readSize += size;
- if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
- DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
- DefaultMessageStore.this.storeStatsService
- .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
- .addAndGet(dispatchRequest.getMsgSize());
- }
- }
-
- else if (size == 0) {
- this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
- readSize = result.getSize();
- }
- } else if (!dispatchRequest.isSuccess()) {
-
-
- if (size > 0) {
- log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
- this.reputFromOffset += size;
- }
- else {
- doNext = false;
- if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
- log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
- this.reputFromOffset);
-
- this.reputFromOffset += (result.getSize() - readSize);
- }
- }
- }
- }
- } finally {
- result.release();
- }
- } else {
- doNext = false;
- }
- }
- }
- public void start() {
- this.acceptSocketService.beginAccept();
- this.acceptSocketService.start();
- this.groupTransferService.start();
- this.haClient.start();
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。