赞
踩
这两个结构体定义代码在xlog.c,它们在日志落盘过程中非常重要,会反反复复出现。
- typedef struct XLogwrtRqst
- {
- XLogRecPtr Write; /* last byte + 1 to write out */
- XLogRecPtr Flush; /* last byte + 1 to flush */
- } XLogwrtRqst;
-
- typedef struct XLogwrtResult
- {
- XLogRecPtr Write; /* last byte + 1 written out */
- XLogRecPtr Flush; /* last byte + 1 flushed */
- } XLogwrtResult;
Write与Flush的区别
源码中关于这两个结构体的注释很长,值得关注的是:
XLogFlush函数用于将record之前的所有XLog全部落盘,它是XLogWrite的上层函数,XLogWrite是真正的落盘函数,下一篇我们会讨论。
在上面三种情况中,最简单也是最可控的就是commit之前的XLOG落盘,为了方便调试排除其他干扰因素,我们需要首先将后台落盘的进程walwriter挂起,以免在commit之前后台进程将XLOG进行落盘。后台进程挂起后,在commit之前,事务中的相关XLOG一定没有落盘,就可以很好地观察了。
断点函数为XLogFlush
insert into t_insert values(1,'11','12','13');
函数入参,待刷入的LSN 24119528
比较record与本地缓存的XLogwrtResult.Flush值。LogwrtResult.Flush表示已经刷入的LSN 24118560,小于24119528,说明入参的LSN 还未被刷入磁盘。如果已经刷入,则直接结束函数。
LogwrtRqst由info_lck保护,读取需要先获取锁,如果没能获取,则循环等待。参考postgresql源码学习(21)—— 事务日志②-日志初始化_Hehuyi_In的博客-CSDN博客
代码注释有提到,fsync是一个昂贵的操作,因此我们尽量每次多刷一些。获取锁后if语句对比请求写入的LSN和入参LSN,并将请求写入的WriteRqstPtr 设为其中较大的值,可以多写一点。这里WriteRqstPtr的值更大,因此直接用这个值即可。
这里遇到一个问题是调试过程中pg挂掉了,怀疑跟挂起了walwriter进程有关。下面重新debug了一次,所以LSN号变了。
加锁后获取全局XLogwrtResult,以更新本地XLogwrtResult。
更新本地XLogwrtResult后再次判断record之前的XLOG是否已经落盘。这是一个典型的乐观锁方式,以此提高并发性。
这里如果没有挂起WAL后台进程,很有可能在调试过程中它已经把record对应LSN给落盘了,就会出现record<=LogwrtResult.Flush 退出循环的情况。因为我们挂起了WAL后台进程,目前record还是大于LogwrtResult.Flush,即还未落盘。
再往下走,遇到一个老朋友WaitXLogInsertionsToFinish,这就是前篇提到的,日志落盘之前必须先确认相关进程XLOG都已写入WAL Buffer。
https://blog.csdn.net/Hehuyi_In/article/details/125481617?spm=1001.2014.3001.5502
然后,我们需要获取WALWriteLock锁,前面提到过必须拿到两个锁才能更新。如果还没获取到,则进入下一次循环,继续等待。
获取锁之后,需要再次读取全局XLogwrtResult,然后判断record之前的XLOG是否已经落盘。如果已经刷入,则释放锁,退出循环。
判断有没有设置CommitDelay(组提交)、enableFsync是否为true,最小活跃事务数量是否大于CommitSiblings(值为5)。 这里CommitDelay=0 ,所以不会进这个if。如果进的话需要sleep CommitDelay的时间,另外再次调用WaitXLogInsertionsToFinish函数。
insertpos表示最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。
因此将insertpos赋值给WriteRqst.Write和WriteRqst.Flush,表示希望将该LSN之前的数据全部刷入磁盘。
下面调用XLogWrite函数,真正将日志刷入磁盘(我们下篇来看)。
最后判断,确保已落盘的LSN大于入参LSN,否则说明落盘失败了。
- /*
- * Ensure that all XLOG data through the given position is flushed to disk.
- *
- * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
- * already held, and we try to avoid acquiring it if possible.
- */
- void
- XLogFlush(XLogRecPtr record)
- {
- XLogRecPtr WriteRqstPtr;
- XLogwrtRqst WriteRqst;
-
- /*
- * During REDO, we are reading not writing WAL. Therefore, instead of
- * trying to flush the WAL, we should update minRecoveryPoint instead. We
- * test XLogInsertAllowed(), not InRecovery, because we need checkpointer
- * to act this way too, and because when it tries to write the
- * end-of-recovery checkpoint, it should indeed flush.
- */
- if (!XLogInsertAllowed())
- {
- UpdateMinRecoveryPoint(record, false);
- return;
- }
-
- /* Quick exit if already known flushed,比较record LSN与本地缓存的XLogwrtResult.Flush LSN值。如果已经刷入,则直接结束函数。 */
- if (record <= LogwrtResult.Flush)
- return;
-
- #ifdef WAL_DEBUG
- if (XLOG_DEBUG)
- elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
- LSN_FORMAT_ARGS(record),
- LSN_FORMAT_ARGS(LogwrtResult.Write),
- LSN_FORMAT_ARGS(LogwrtResult.Flush));
- #endif
-
- START_CRIT_SECTION();
-
- /*
- * fsync是一个昂贵的操作,因此我们尽量每次多刷一些。获取锁后if语句对比请求写入的LSN和入参LSN,并将请求写入的WriteRqstPtr 设为其中较大的值,可以多写一点。
- */
-
- /* initialize to given target; may increase below,初始化请求写入的位置为传入的参数record */
- WriteRqstPtr = record;
-
- /*
- * Now wait until we get the write lock, or someone else does the flush for us. 读取需要先获取锁,如果没能获取,则循环等待。或者等到有别人(例如walwriter)替我们完成日志落盘。
- */
- for (;;)
- {
- XLogRecPtr insertpos;
-
- /* read LogwrtResult and update local state,获取info_lck锁,读取全局变量XLogwrtRqst和XLogwrtResult,以更新本地XLogwrtResult */
- SpinLockAcquire(&XLogCtl->info_lck);
- if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
- WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
- LogwrtResult = XLogCtl->LogwrtResult;
- SpinLockRelease(&XLogCtl->info_lck);
-
- /* done already? 当前记录是否已经落盘?这就是所谓的等到有别人(例如walwriter)替我们完成日志落盘。*/
- if (record <= LogwrtResult.Flush)
- break;
-
- /*
- * Before actually performing the write, wait for all in-flight
- * insertions to the pages we're about to write to finish. 老朋友WaitXLogInsertionsToFinish,这就是前篇提到的,日志落盘之前必须先确认相关进程XLOG都已写入WAL Buffer。
- */
- insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
-
- /*
- * 然后,我们需要获取WALWriteLock锁,前面提到过必须拿到两个锁才能更新。如果还没获取到,则进入下一次循环,继续等待。
- */
- if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
- {
- /*
- * The lock is now free, but we didn't acquire it yet. Before we
- * do, loop back to check if someone else flushed the record for
- * us already.
- */
- continue;
- }
-
- /* Got the lock; recheck whether request is satisfied. 获取锁之后,需要再次读取全局XLogwrtResult,然后判断record之前的XLOG是否已经落盘。如果已经刷入,则释放锁,退出循环。 */
- LogwrtResult = XLogCtl->LogwrtResult;
- if (record <= LogwrtResult.Flush)
- {
- LWLockRelease(WALWriteLock);
- break;
- }
-
- /*
- * 判断有没有设置CommitDelay(组提交)、enableFsync是否为true,最小活跃事务数量是否大于CommitSiblings
- */
- if (CommitDelay > 0 && enableFsync &&
- MinimumActiveBackends(CommitSiblings))
- {
- pg_usleep(CommitDelay);
-
- /*
- * 如果进这个if,需要sleep CommitDelay的时间,另外再次调用WaitXLogInsertionsToFinish函数。
- */
- insertpos = WaitXLogInsertionsToFinish(insertpos);
- }
-
- /* try to write/flush later additions to XLOG as well,insertpos表示最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。因此将insertpos赋值给WriteRqst.Write和WriteRqst.Flush,表示希望将该LSN之前的数据全部刷入磁盘。*/
- WriteRqst.Write = insertpos;
- WriteRqst.Flush = insertpos;
-
- // 调用XLogWrite函数,真正将日志刷入磁盘(我们下篇来看)
- XLogWrite(WriteRqst, false);
- // 释放锁
- LWLockRelease(WALWriteLock);
- /* done */
- break;
- }
-
- END_CRIT_SECTION();
-
- /* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
-
- /*
- * 最后判断,确保已落盘的LSN大于入参LSN,否则说明落盘失败了。
- */
- if (LogwrtResult.Flush < record)
- elog(ERROR,
- "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
- LSN_FORMAT_ARGS(record),
- LSN_FORMAT_ARGS(LogwrtResult.Flush));
- }
参考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。