当前位置:   article > 正文

手写数据库轮子项目 MYDB 之四 | DataManager (DM) 数据之日志文件与恢复策略_手写数据库mydb

手写数据库mydb

MYDB提供数据库崩溃后的恢复功能,DM 模块在每次对底层数据操作时,都会记录一条日志到磁盘上。在数据库崩溃之后(即第一页校验不对时),再次启动时,可以根据日志的内容,恢复数据文件,保证其一致性。

一、日志读写

1. 日志格式

日志是一个自定义的 log 格式的二进制文件,按照如下的格式进行排布:

[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]

其中, XChecksum 是一个四字节的整数,是对后面所有日志计算的校验和(注意不包括BadTail)。Log1 ~ LogN 是常规的日志数据,BadTail 是在数据库崩溃时,没有来得及写完的日志数据,这个 BadTail 不一定存在。

每条日志 [LogN] 的格式如下:

[Size][Checksum][Data]

其中,Size 是一个四字节整数,标识了 Data 段的字节数。Checksum 则是该条日志的校验和。

2. 校验日志

单条日志的校验和,其实就是通过一个指定的种子实现的,对所有日志求出校验和,求和就能得到日志文件的校验和了。

  1. private int calChecksum(int xCheck, byte[] log) {
  2. for (byte b : log) {
  3. xCheck = xCheck * SEED + b;
  4. }
  5. return xCheck;
  6. }

3. 读取日志

Logger 被实现成迭代器模式,通过 next() 方法,不断地从文件中读取下一条日志,并将其中的 Data 解析出来并返回。 next() 方法的实现主要依靠 internNext() 。

  1. public byte[] next() {
  2. lock.lock();
  3. try {
  4. byte[] log = internNext();
  5. if(log == null) return null;
  6. /// 返回这条[Log]的[Data]
  7. return Arrays.copyOfRange(log, OF_DATA, log.length);
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  1. private byte[] internNext() {
  2. /// position是当前日志文件中的指针位置,表示[LogN]的起始,OF_DATA为8,表示一个[LogN]中,[Data]的起始
  3. if(position + OF_DATA >= fileSize) {
  4. return null;
  5. }
  6. ByteBuffer tmp = ByteBuffer.allocate(4);
  7. try {
  8. fc.position(position);
  9. fc.read(tmp); /// 将[Size]读到tmp中
  10. } catch(IOException e) {
  11. Panic.panic(e);
  12. }
  13. int size = Parser.parseInt(tmp.array());
  14. /// size + OF_DATA 是当前position所指[Log]的长度
  15. if(position + size + OF_DATA > fileSize) {
  16. return null;
  17. }
  18. ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
  19. try {
  20. fc.position(position);
  21. fc.read(buf); /// 将[LogN]读到buf中
  22. } catch(IOException e) {
  23. Panic.panic(e);
  24. }
  25. byte[] log = buf.array();
  26. /// 根据一条日志的[Data]计算出其校验和
  27. int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
  28. /// 获取一条日志的[CheckSum]
  29. int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));
  30. /// 此[Log]是BadTail
  31. if(checkSum1 != checkSum2) {
  32. return null;
  33. }
  34. /// position指向下一条[Log]的起始
  35. position += log.length;
  36. return log;
  37. }

在打开一个日志文件时,需要首先校验日志文件的 XChecksum,并移除文件尾部可能存在的 BadTail,由于 BadTail 该条日志尚未写入完成,文件的校验和也就不会包含该日志的校验和,去掉 BadTail 即可保证日志文件的一致性。

  1. private void checkAndRemoveTail() {
  2. /// 将position置为4
  3. rewind();
  4. int xCheck = 0;
  5. while(true) {
  6. /// internNext会改变position的值,循环结束后position指向最后一个正常[Log]的末尾
  7. byte[] log = internNext();
  8. if(log == null) break;
  9. /// xCheck是累加的
  10. xCheck = calChecksum(xCheck, log);
  11. }
  12. if(xCheck != xChecksum) {
  13. Panic.panic(Error.BadLogFileException);
  14. }
  15. try {
  16. /// 将文件截断至position位置,即只保留position前的部分
  17. truncate(position);
  18. } catch (Exception e) {
  19. Panic.panic(e);
  20. }
  21. try {
  22. /// 将文件指针设置至position
  23. file.seek(position);
  24. } catch (IOException e) {
  25. Panic.panic(e);
  26. }
  27. rewind();
  28. }
  1. public void truncate(long x) throws Exception {
  2. lock.lock();
  3. try {
  4. fc.truncate(x);
  5. } finally {
  6. lock.unlock();
  7. }
  8. }

4. 写入日志

向日志文件写入日志时,也是首先将数据包裹成日志格式,写入文件后,再更新文件的校验和,更新校验和时,会刷新缓冲区,保证内容写入磁盘。

  1. public void log(byte[] data) {
  2. /// 将data包裹为[Log]格式
  3. byte[] log = wrapLog(data);
  4. ByteBuffer buf = ByteBuffer.wrap(log);
  5. lock.lock();
  6. try {
  7. fc.position(fc.size());
  8. fc.write(buf);
  9. } catch(IOException e) {
  10. Panic.panic(e);
  11. } finally {
  12. lock.unlock();
  13. }
  14. updateXChecksum(log);
  15. }
  16. /// 更新XChecksum
  17. private void updateXChecksum(byte[] log) {
  18. this.xChecksum = calChecksum(this.xChecksum, log);
  19. try {
  20. fc.position(0);
  21. fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
  22. fc.force(false);
  23. } catch(IOException e) {
  24. Panic.panic(e);
  25. }
  26. }
  27. private byte[] wrapLog(byte[] data) {
  28. byte[] checksum = Parser.int2Byte(calChecksum(0, data));
  29. byte[] size = Parser.int2Byte(data.length);
  30. /// Google Guava库中的方法,将多个字节数组连接在一起形成一个新的字节数组
  31. return Bytes.concat(size, checksum, data);
  32. }

二、日志恢复

DM 为上层模块,提供了两种操作,分别是插入新数据(I)和更新现有数据(U)。至于为啥没有删除数据,这个会在 VM 一节叙述。

DM的日志策略:在进行 I 和 U 操作之前,必须先进行对应的日志操作,在保证日志写入磁盘后,才进行数据操作。

对于 I 和 U 操作,DM 记录的格式如下,由两个静态内部类实现。

插入操作 I:(xid, pgno, offset, x) 表示事务 xid 在位置 offset 插入了一条数据 x。

更新操作 U:(xid, pgno, offset, oldx, newx) 表示事务 xid 在位置 offset 将 oldx 更新为 newx。

1. 单线程下的恢复策略

假设日志中最后一个事务是 Ti,恢复时需要:

对 Ti 前的所有事务日志重做(redo),

在 XID 文件中检查 Ti 的状态, 如果 Ti 的状态是已完成(包括 committed 和 aborted),就将 Ti 重做,否则进行撤销(undo)。

接下来,如何对事务 T 重做:

正序扫描事务 T 的所有日志,

如果日志是插入操作 (xid, pgno, offset, x),就将 x 重新插入,

如果日志是更新操作 (xid, pgno, offset, oldx, newx) ,就将 A值设置为 newx。

如何对事务 T 撤销:

倒序扫描事务 T 的所有日志,

如果日志是插入操作(xid, pgno, offset, x),就将数据删除,

如果日志是更新操作(xid, pgno, offset, oldx, newx) ,就将值设置为 oldx。

2. 多线程下的恢复策略

单线程的恢复策略在多线程下会出现问题。

第一种:事务 T1 读到了 T2 未提交的数据:

  1. T1 begin
  2. T2 begin
  3. T2 update(x)
  4. T1 read(x)
  5. ...
  6. T1 commit
  7. DB break down

在系统崩溃时,T2 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会撤销 T2,它对数据库的影响会被消除。但是由于 T1 读取了 T2 更新的值,既然 T2 被撤销,那么 T1 也应当被撤销。这种情况,就是级联回滚。但是,T1 已经 commit 了,所有 commit 的事务的影响,应当被持久化。这里就造成了矛盾。所以这里需要保证:
正在进行的事务,不会读取其他事务未提交的数据。

第二种:事务T2修改了事务T1修改后但是并未提交的数据:

  1. x = 0
  2. T1 begin
  3. T2 begin
  4. T1 set x = x + 1 // x = 1
  5. T2 set x = x + 1 // x = 2
  6. T2 commit
  7. DB break down

在系统崩溃时,T1 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会对 T1 进行撤销,对 T2 进行重做,但是,如果先撤销T1,再重做T2,x变为2,如果先重做T2,再撤销T1,x变为0,都不是1,都是错误的(所有 commit 的事务的影响,应当被持久化)。所以这里需要保证:
正在进行的事务,不会修改取其他事务未提交的修改或产生的数据。

由于 VM层 的存在,传递到 DM 层,真正执行的操作序列,都可以保证规定 1 和规定 2,所以在DM层无需另外的代码来保证这两个规定。

3. 恢复策略的实现

1)日志格式

  1. private static final byte LOG_TYPE_INSERT = 0;
  2. private static final byte LOG_TYPE_UPDATE = 1;
  3. // 即上文中 [Log] 的 [Data] 部分
  4. updateLog:
  5. [LogType] [XID] [UID] [OldRaw] [NewRaw] // UID 经过转化变为 Pgno 和 Offset
  6. insertLog:
  7. [LogType] [XID] [Pgno] [Offset] [Raw]

2)恢复

  1. public static void recover(TransactionManager tm, Logger lg, PageCache pc) {
  2. System.out.println("Recovering...");
  3. lg.rewind(); /// 设置 lg 中指针的位置为4,即第一个[Log]的起始位置
  4. int maxPgno = 0;
  5. while(true) {
  6. byte[] log = lg.next(); /// 拿到[Log]中的[data]
  7. if(log == null) break;
  8. int pgno;
  9. if(isInsertLog(log)) { /// 判断log[0]是否是插入类型
  10. InsertLogInfo li = parseInsertLog(log); ///将[data]转换为对象
  11. pgno = li.pgno;
  12. } else {
  13. UpdateLogInfo li = parseUpdateLog(log);
  14. pgno = li.pgno;
  15. }
  16. if(pgno > maxPgno) {
  17. maxPgno = pgno;
  18. }
  19. }
  20. if(maxPgno == 0) {
  21. maxPgno = 1;
  22. }
  23. pc.truncateByBgno(maxPgno); /// 更新页面文件,保留maxPgno以内的部分
  24. System.out.println("Truncate to " + maxPgno + " pages.");
  25. redoTranscations(tm, lg, pc);
  26. System.out.println("Redo Transactions Over.");
  27. undoTranscations(tm, lg, pc);
  28. System.out.println("Undo Transactions Over.");
  29. System.out.println("Recovery Over.");
  30. }

3)redo 和 undo 的实现

redo 的实现

  1. private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
  2. lg.rewind(); /// 设置 lg 中指针的位置为4,即第一个[Log]的起始位置
  3. while(true) {
  4. byte[] log = lg.next(); /// 拿到[Log]中的[data]
  5. if(log == null) break;
  6. if(isInsertLog(log)) { /// 判断log[0]是否是插入类型
  7. InsertLogInfo li = parseInsertLog(log);
  8. long xid = li.xid;
  9. if(!tm.isActive(xid)) { /// 判断事务是否不处于进行中
  10. doInsertLog(pc, log, REDO); /// redo 插入操作
  11. }
  12. } else {
  13. UpdateLogInfo xi = parseUpdateLog(log);
  14. long xid = xi.xid;
  15. if(!tm.isActive(xid)) {
  16. doUpdateLog(pc, log, REDO); /// redo 更新操作
  17. }
  18. }
  19. }
  20. }

undo 的实现需要将一个事务内的多条日志倒序进行撤销

  1. private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
  2. Map<Long, List<byte[]>> logCache = new HashMap<>(); /// key 是 xid,value 是日志数据
  3. lg.rewind();
  4. while(true) {
  5. byte[] log = lg.next(); /// 拿到[Log]中的[data]
  6. if(log == null) break;
  7. if(isInsertLog(log)) {
  8. InsertLogInfo li = parseInsertLog(log);
  9. long xid = li.xid;
  10. if(tm.isActive(xid)) { /// 判断事务是否处于进行中
  11. if(!logCache.containsKey(xid)) {
  12. logCache.put(xid, new ArrayList<>());
  13. }
  14. logCache.get(xid).add(log);
  15. }
  16. } else {
  17. UpdateLogInfo xi = parseUpdateLog(log);
  18. long xid = xi.xid;
  19. if(tm.isActive(xid)) {
  20. if(!logCache.containsKey(xid)) {
  21. logCache.put(xid, new ArrayList<>());
  22. }
  23. logCache.get(xid).add(log);
  24. }
  25. }
  26. }
  27. // 对所有active log进行倒序undo
  28. for(Entry<Long, List<byte[]>> entry : logCache.entrySet()) {
  29. List<byte[]> logs = entry.getValue();
  30. for (int i = logs.size()-1; i >= 0; i --) {
  31. byte[] log = logs.get(i);
  32. if(isInsertLog(log)) {
  33. doInsertLog(pc, log, UNDO);
  34. } else {
  35. doUpdateLog(pc, log, UNDO);
  36. }
  37. }
  38. tm.abort(entry.getKey()); /// 将事务 xid 状态改为已撤销(回滚)
  39. }
  40. }

4)插入和更新的 redo 和 undo

doInsertLog 中的删除使用的是 setDataItemRawInvalid,将该条数据的有效位设置为无效,进行逻辑删除。

  1. private static void doInsertLog(PageCache pc, byte[] log, int flag) {
  2. InsertLogInfo li = parseInsertLog(log);
  3. Page pg = null;
  4. try {
  5. pg = pc.getPage(li.pgno);
  6. } catch(Exception e) {
  7. Panic.panic(e);
  8. }
  9. try {
  10. if(flag == UNDO) {
  11. DataItem.setDataItemRawInvalid(li.raw);
  12. }
  13. PageX.recoverInsert(pg, li.raw, li.offset);
  14. } finally {
  15. pg.release();
  16. }
  17. }
  1. private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
  2. int pgno;
  3. short offset;
  4. byte[] raw;
  5. if(flag == REDO) {
  6. UpdateLogInfo xi = parseUpdateLog(log);
  7. pgno = xi.pgno;
  8. offset = xi.offset;
  9. raw = xi.newRaw; /// redo 的话,重做为新数据
  10. } else {
  11. UpdateLogInfo xi = parseUpdateLog(log);
  12. pgno = xi.pgno;
  13. offset = xi.offset;
  14. raw = xi.oldRaw; /// undo 的话,撤销为旧数据
  15. }
  16. Page pg = null;
  17. try {
  18. pg = pc.getPage(pgno);
  19. } catch (Exception e) {
  20. Panic.panic(e);
  21. }
  22. try {
  23. PageX.recoverUpdate(pg, raw, offset); /// 将数据放到缓存中
  24. } finally {
  25. pg.release(); /// 释放缓存,将页面数据写入磁盘
  26. }
  27. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/869428
推荐阅读
相关标签
  

闽ICP备14008679号