当前位置:   article > 正文

第十二章-Broker-同步刷盘(一)

第十二章-Broker-同步刷盘(一)

12.1 刷盘

CommitLog.handleDiskFlush

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 同步刷盘用 GroupCommitService 服务
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) { // 判断同步刷盘情况下,要不要等待刷盘结束,默认是要的
            // 创建刷盘提交请求
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            // 将请求放到缓存List中,供 GroupCommitService 服务线程执行刷盘
            service.putRequest(request);
            // 线程等待刷盘结果
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                          + " client address: " + messageExt.getBornHostString());
                // 等待超时,返回结果为刷盘超时,实际这个时候可能已经刷成功了,也可能没成功
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            // 如不等待,则直接唤醒刷盘服务线程,当前方法的执行就结束了
            service.wakeup();
        }
    }
    // 异步刷盘
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            // 异步刷盘用 CommitRealTimeService 服务,看`章节12.1.2`
            commitLogService.wakeup();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

putRequest 请求放到缓存List中

public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        // 将请求添加到List
        this.requestsWrite.add(request);
    }
    // CAS hasNotified,默认是false,可以通过CAS处理
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // 通知等待在 waitPoint 的线程执行
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

12.1.1 同步刷盘

同步刷盘用的是类 GroupCommitService 来处理,这个类也是一个线程任务类,实现了Runnable,那么就需要启动线程,通过以下调用链启动该任务:

BrokerController.start()

​ -> DefaultMessageStore.start()

​ ->CommitLog.start()

​ ->GroupCommitService.start()

直接进入 GroupCommitService.run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {// 状态判断
        try {
            // 等待请求任务过来后再执行
            this.waitForRunning(10);
            // 执行提交的请求任务
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }

    synchronized (this) {
        this.swapRequests();
    }

    this.doCommit();

    CommitLog.log.info(this.getServiceName() + " service end");
}

protected void waitForRunning(long interval) {
    // 前面提交请求时,已经将 hasNotified CAS 成 true,所以这一步能成功CAS为false
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd(); // 表示收到请求通知,这里等待结束
        return;
    }

    // 没有收到请求任务通知,重置 waitPoint 的 countDown值
    waitPoint.reset();

    try {
        // 继续等待 interval 时间
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        // 等待完成,一定要设置为false,否则下一次请求进来无法通知线程执行,看 putRequest 方法
        hasNotified.set(false);
        this.onWaitEnd();
    }
}

protected void onWaitEnd() {
    // 交换请求,将提交请求和任务执行请求分开来处理,不使用同一个List,保证线程安全和数据一致性
    this.swapRequests();
}


private void doCommit() {
    synchronized (this.requestsRead) {
        // 判断任务要提取的请求不为空
        if (!this.requestsRead.isEmpty()) {
            for (GroupCommitRequest req : this.requestsRead) { // 遍历List
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                // 这里为啥要2次执行,也能理解,requestsRead 支持多消息提交刷盘,多消息就可能涉及2个文件,前一个文件尾部,后一个文件头,但是4.5.1版本的RocketMQ,默认是不存在这种情况的,不管是单条消息还是批量消息,都发生在一个文件中,并作为一个 requestsRead 请求提交,所以实现上,该版本不会存在2个文件的情况。这里可能只是一个扩展功能
                for (int i = 0; i < 2 && !flushOK; i++) {
                    // 判断是不是已经刷过盘(注:每次刷盘都会记录偏移位置)
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                    if (!flushOK) {
                        // 确认没有刷盘,那就直接去刷
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
			   // 通知刷盘成功,通知谁呢,就是通知提交线程,因为入口方法 handleDiskFlush 的线程还在等着结果呢
                req.wakeupCustomer(flushOK);
            }
		   // 记录消息存储的时间
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
			// 清空请求列表
            this.requestsRead.clear();
        } else {
            // 这种情况就是不等待刷盘结果,直接刷盘
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

MappedFileQueue.flush

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    // 根据刷新的位置,确定要刷新的是哪个文件的映射
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        // 刷盘操作,重点看这一句,继续往里跟
        int offset = mappedFile.flush(flushLeastPages);
        // 刷盘成功后要增加已被刷的位置,用于下次刷盘的判断
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

MappedFile.flush

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
		   // 刷盘这一块的操作通过 try...catch... 包裹出来,也就是说真实刷盘 force() 有可能产生异常
            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    // 这里底层调用的是 msync,可以看jvm源码,下面有贴出来
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
		   // 记录已刷盘的最新位置。但是这段代码在try...catch...范围外,也就是说,即使force()产生异常,这一段还是会执行,那么就存在刷盘失败时,依然记录新的刷盘位置,这种极端情况就会导致消息数据丢失,而用户收到的却是成功,所以说,RocketMQ 也是存在数据丢失的。
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

force()的JVM底层实现
这一段源码,在MappedByteBuffer.c 文件中,有兴趣的读者可以去看看。

msync系统调用失败的原因有很多,包含但不限于:

1、文件描述符无效或未正确打开

2、内存映射区域对应的文件被破坏

3、权限被篡改

4、系统资源不足

5、内核错误

6、msync同步问题:多个进程共同操作 msync

JNIEXPORT void JNICALL
    Java_java_nio_MappedByteBuffer_force0(JNIEnv *env, jobject obj, jobject fdo,
                                          jlong address, jlong len)
{
    void* a = (void *)jlong_to_ptr(address);
    // 看这一行,这其实就是一个系统调用API,作用就是将mmap中映射的内存内容同步到相应的文件
    int result = msync(a, (size_t)len, MS_SYNC);
    if (result == -1) {
        JNU_ThrowIOExceptionWithLastError(env, "msync failed");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/469571
推荐阅读
相关标签
  

闽ICP备14008679号