赞
踩
CommitLog.handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 同步刷盘用 GroupCommitService 服务 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { // 判断同步刷盘情况下,要不要等待刷盘结束,默认是要的 // 创建刷盘提交请求 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); // 将请求放到缓存List中,供 GroupCommitService 服务线程执行刷盘 service.putRequest(request); // 线程等待刷盘结果 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); // 等待超时,返回结果为刷盘超时,实际这个时候可能已经刷成功了,也可能没成功 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { // 如不等待,则直接唤醒刷盘服务线程,当前方法的执行就结束了 service.wakeup(); } } // 异步刷盘 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { // 异步刷盘用 CommitRealTimeService 服务,看`章节12.1.2` commitLogService.wakeup(); } } }
putRequest 请求放到缓存List中
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
// 将请求添加到List
this.requestsWrite.add(request);
}
// CAS hasNotified,默认是false,可以通过CAS处理
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // 通知等待在 waitPoint 的线程执行
}
}
同步刷盘用的是类 GroupCommitService 来处理,这个类也是一个线程任务类,实现了Runnable,那么就需要启动线程,通过以下调用链启动该任务:
BrokerController.start()
-> DefaultMessageStore.start()
->CommitLog.start()
->GroupCommitService.start()
直接进入 GroupCommitService.run()方法
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {// 状态判断 try { // 等待请求任务过来后再执行 this.waitForRunning(10); // 执行提交的请求任务 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } protected void waitForRunning(long interval) { // 前面提交请求时,已经将 hasNotified CAS 成 true,所以这一步能成功CAS为false if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); // 表示收到请求通知,这里等待结束 return; } // 没有收到请求任务通知,重置 waitPoint 的 countDown值 waitPoint.reset(); try { // 继续等待 interval 时间 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 等待完成,一定要设置为false,否则下一次请求进来无法通知线程执行,看 putRequest 方法 hasNotified.set(false); this.onWaitEnd(); } } protected void onWaitEnd() { // 交换请求,将提交请求和任务执行请求分开来处理,不使用同一个List,保证线程安全和数据一致性 this.swapRequests(); } private void doCommit() { synchronized (this.requestsRead) { // 判断任务要提取的请求不为空 if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // 遍历List // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; // 这里为啥要2次执行,也能理解,requestsRead 支持多消息提交刷盘,多消息就可能涉及2个文件,前一个文件尾部,后一个文件头,但是4.5.1版本的RocketMQ,默认是不存在这种情况的,不管是单条消息还是批量消息,都发生在一个文件中,并作为一个 requestsRead 请求提交,所以实现上,该版本不会存在2个文件的情况。这里可能只是一个扩展功能 for (int i = 0; i < 2 && !flushOK; i++) { // 判断是不是已经刷过盘(注:每次刷盘都会记录偏移位置) flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { // 确认没有刷盘,那就直接去刷 CommitLog.this.mappedFileQueue.flush(0); } } // 通知刷盘成功,通知谁呢,就是通知提交线程,因为入口方法 handleDiskFlush 的线程还在等着结果呢 req.wakeupCustomer(flushOK); } // 记录消息存储的时间 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 清空请求列表 this.requestsRead.clear(); } else { // 这种情况就是不等待刷盘结果,直接刷盘 CommitLog.this.mappedFileQueue.flush(0); } } }
MappedFileQueue.flush
public boolean flush(final int flushLeastPages) { boolean result = true; // 根据刷新的位置,确定要刷新的是哪个文件的映射 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 刷盘操作,重点看这一句,继续往里跟 int offset = mappedFile.flush(flushLeastPages); // 刷盘成功后要增加已被刷的位置,用于下次刷盘的判断 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
MappedFile.flush
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); // 刷盘这一块的操作通过 try...catch... 包裹出来,也就是说真实刷盘 force() 有可能产生异常 try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { // 这里底层调用的是 msync,可以看jvm源码,下面有贴出来 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 记录已刷盘的最新位置。但是这段代码在try...catch...范围外,也就是说,即使force()产生异常,这一段还是会执行,那么就存在刷盘失败时,依然记录新的刷盘位置,这种极端情况就会导致消息数据丢失,而用户收到的却是成功,所以说,RocketMQ 也是存在数据丢失的。 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
force()的JVM底层实现
这一段源码,在MappedByteBuffer.c 文件中,有兴趣的读者可以去看看。
msync系统调用失败的原因有很多,包含但不限于:
1、文件描述符无效或未正确打开
2、内存映射区域对应的文件被破坏
3、权限被篡改
4、系统资源不足
5、内核错误
6、msync同步问题:多个进程共同操作 msync
JNIEXPORT void JNICALL
Java_java_nio_MappedByteBuffer_force0(JNIEnv *env, jobject obj, jobject fdo,
jlong address, jlong len)
{
void* a = (void *)jlong_to_ptr(address);
// 看这一行,这其实就是一个系统调用API,作用就是将mmap中映射的内存内容同步到相应的文件
int result = msync(a, (size_t)len, MS_SYNC);
if (result == -1) {
JNU_ThrowIOExceptionWithLastError(env, "msync failed");
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。