赞
踩
MYDB提供数据库崩溃后的恢复功能,DM 模块在每次对底层数据操作时,都会记录一条日志到磁盘上。在数据库崩溃之后(即第一页校验不对时),再次启动时,可以根据日志的内容,恢复数据文件,保证其一致性。
日志是一个自定义的 log 格式的二进制文件,按照如下的格式进行排布:
[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]
其中, XChecksum 是一个四字节的整数,是对后面所有日志计算的校验和(注意不包括BadTail)。Log1 ~ LogN 是常规的日志数据,BadTail 是在数据库崩溃时,没有来得及写完的日志数据,这个 BadTail 不一定存在。
每条日志 [LogN] 的格式如下:
[Size][Checksum][Data]
其中,Size 是一个四字节整数,标识了 Data 段的字节数。Checksum 则是该条日志的校验和。
单条日志的校验和,其实就是通过一个指定的种子实现的,对所有日志求出校验和,求和就能得到日志文件的校验和了。
- private int calChecksum(int xCheck, byte[] log) {
- for (byte b : log) {
- xCheck = xCheck * SEED + b;
- }
- return xCheck;
- }
Logger 被实现成迭代器模式,通过 next() 方法,不断地从文件中读取下一条日志,并将其中的 Data 解析出来并返回。 next() 方法的实现主要依靠 internNext() 。
- public byte[] next() {
- lock.lock();
- try {
- byte[] log = internNext();
- if(log == null) return null;
- /// 返回这条[Log]的[Data]
- return Arrays.copyOfRange(log, OF_DATA, log.length);
- } finally {
- lock.unlock();
- }
- }
- private byte[] internNext() {
- /// position是当前日志文件中的指针位置,表示[LogN]的起始,OF_DATA为8,表示一个[LogN]中,[Data]的起始
- if(position + OF_DATA >= fileSize) {
- return null;
- }
- ByteBuffer tmp = ByteBuffer.allocate(4);
- try {
- fc.position(position);
- fc.read(tmp); /// 将[Size]读到tmp中
- } catch(IOException e) {
- Panic.panic(e);
- }
- int size = Parser.parseInt(tmp.array());
- /// size + OF_DATA 是当前position所指[Log]的长度
- if(position + size + OF_DATA > fileSize) {
- return null;
- }
-
- ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
- try {
- fc.position(position);
- fc.read(buf); /// 将[LogN]读到buf中
- } catch(IOException e) {
- Panic.panic(e);
- }
-
- byte[] log = buf.array();
- /// 根据一条日志的[Data]计算出其校验和
- int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
- /// 获取一条日志的[CheckSum]
- int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));
- /// 此[Log]是BadTail
- if(checkSum1 != checkSum2) {
- return null;
- }
- /// position指向下一条[Log]的起始
- position += log.length;
- return log;
- }

在打开一个日志文件时,需要首先校验日志文件的 XChecksum,并移除文件尾部可能存在的 BadTail,由于 BadTail 该条日志尚未写入完成,文件的校验和也就不会包含该日志的校验和,去掉 BadTail 即可保证日志文件的一致性。
- private void checkAndRemoveTail() {
- /// 将position置为4
- rewind();
-
- int xCheck = 0;
- while(true) {
- /// internNext会改变position的值,循环结束后position指向最后一个正常[Log]的末尾
- byte[] log = internNext();
- if(log == null) break;
- /// xCheck是累加的
- xCheck = calChecksum(xCheck, log);
- }
- if(xCheck != xChecksum) {
- Panic.panic(Error.BadLogFileException);
- }
-
- try {
- /// 将文件截断至position位置,即只保留position前的部分
- truncate(position);
- } catch (Exception e) {
- Panic.panic(e);
- }
- try {
- /// 将文件指针设置至position
- file.seek(position);
- } catch (IOException e) {
- Panic.panic(e);
- }
- rewind();
- }

- public void truncate(long x) throws Exception {
- lock.lock();
- try {
- fc.truncate(x);
- } finally {
- lock.unlock();
- }
- }
向日志文件写入日志时,也是首先将数据包裹成日志格式,写入文件后,再更新文件的校验和,更新校验和时,会刷新缓冲区,保证内容写入磁盘。
- public void log(byte[] data) {
- /// 将data包裹为[Log]格式
- byte[] log = wrapLog(data);
- ByteBuffer buf = ByteBuffer.wrap(log);
- lock.lock();
- try {
- fc.position(fc.size());
- fc.write(buf);
- } catch(IOException e) {
- Panic.panic(e);
- } finally {
- lock.unlock();
- }
- updateXChecksum(log);
- }
-
- /// 更新XChecksum
- private void updateXChecksum(byte[] log) {
- this.xChecksum = calChecksum(this.xChecksum, log);
- try {
- fc.position(0);
- fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
- fc.force(false);
- } catch(IOException e) {
- Panic.panic(e);
- }
- }
-
- private byte[] wrapLog(byte[] data) {
- byte[] checksum = Parser.int2Byte(calChecksum(0, data));
- byte[] size = Parser.int2Byte(data.length);
- /// Google Guava库中的方法,将多个字节数组连接在一起形成一个新的字节数组
- return Bytes.concat(size, checksum, data);
- }

DM 为上层模块,提供了两种操作,分别是插入新数据(I)和更新现有数据(U)。至于为啥没有删除数据,这个会在 VM 一节叙述。
DM的日志策略:在进行 I 和 U 操作之前,必须先进行对应的日志操作,在保证日志写入磁盘后,才进行数据操作。
对于 I 和 U 操作,DM 记录的格式如下,由两个静态内部类实现。
插入操作 I:(xid, pgno, offset, x) 表示事务 xid 在位置 offset 插入了一条数据 x。
更新操作 U:(xid, pgno, offset, oldx, newx) 表示事务 xid 在位置 offset 将 oldx 更新为 newx。
假设日志中最后一个事务是 Ti,恢复时需要:
对 Ti 前的所有事务日志重做(redo),
在 XID 文件中检查 Ti 的状态, 如果 Ti 的状态是已完成(包括 committed 和 aborted),就将 Ti 重做,否则进行撤销(undo)。
接下来,如何对事务 T 重做:
正序扫描事务 T 的所有日志,
如果日志是插入操作 (xid, pgno, offset, x),就将 x 重新插入,
如果日志是更新操作 (xid, pgno, offset, oldx, newx) ,就将 A值设置为 newx。
如何对事务 T 撤销:
倒序扫描事务 T 的所有日志,
如果日志是插入操作(xid, pgno, offset, x),就将数据删除,
如果日志是更新操作(xid, pgno, offset, oldx, newx) ,就将值设置为 oldx。
单线程的恢复策略在多线程下会出现问题。
第一种:事务 T1 读到了 T2 未提交的数据:
- T1 begin
- T2 begin
- T2 update(x)
- T1 read(x)
- ...
- T1 commit
- DB break down
在系统崩溃时,T2 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会撤销 T2,它对数据库的影响会被消除。但是由于 T1 读取了 T2 更新的值,既然 T2 被撤销,那么 T1 也应当被撤销。这种情况,就是级联回滚。但是,T1 已经 commit 了,所有 commit 的事务的影响,应当被持久化。这里就造成了矛盾。所以这里需要保证:
正在进行的事务,不会读取其他事务未提交的数据。
第二种:事务T2修改了事务T1修改后但是并未提交的数据:
- x = 0
- T1 begin
- T2 begin
- T1 set x = x + 1 // x = 1
- T2 set x = x + 1 // x = 2
- T2 commit
- DB break down
在系统崩溃时,T1 仍然是活跃状态。那么当数据库重新启动,执行恢复例程时,会对 T1 进行撤销,对 T2 进行重做,但是,如果先撤销T1,再重做T2,x变为2,如果先重做T2,再撤销T1,x变为0,都不是1,都是错误的(所有 commit 的事务的影响,应当被持久化)。所以这里需要保证:
正在进行的事务,不会修改取其他事务未提交的修改或产生的数据。
由于 VM层 的存在,传递到 DM 层,真正执行的操作序列,都可以保证规定 1 和规定 2,所以在DM层无需另外的代码来保证这两个规定。
- private static final byte LOG_TYPE_INSERT = 0;
- private static final byte LOG_TYPE_UPDATE = 1;
-
-
- // 即上文中 [Log] 的 [Data] 部分
- updateLog:
- [LogType] [XID] [UID] [OldRaw] [NewRaw] // UID 经过转化变为 Pgno 和 Offset
-
- insertLog:
- [LogType] [XID] [Pgno] [Offset] [Raw]
- public static void recover(TransactionManager tm, Logger lg, PageCache pc) {
- System.out.println("Recovering...");
-
- lg.rewind(); /// 设置 lg 中指针的位置为4,即第一个[Log]的起始位置
- int maxPgno = 0;
- while(true) {
- byte[] log = lg.next(); /// 拿到[Log]中的[data]
- if(log == null) break;
- int pgno;
- if(isInsertLog(log)) { /// 判断log[0]是否是插入类型
- InsertLogInfo li = parseInsertLog(log); ///将[data]转换为对象
- pgno = li.pgno;
- } else {
- UpdateLogInfo li = parseUpdateLog(log);
- pgno = li.pgno;
- }
- if(pgno > maxPgno) {
- maxPgno = pgno;
- }
- }
- if(maxPgno == 0) {
- maxPgno = 1;
- }
- pc.truncateByBgno(maxPgno); /// 更新页面文件,保留maxPgno以内的部分
- System.out.println("Truncate to " + maxPgno + " pages.");
-
- redoTranscations(tm, lg, pc);
- System.out.println("Redo Transactions Over.");
-
- undoTranscations(tm, lg, pc);
- System.out.println("Undo Transactions Over.");
-
- System.out.println("Recovery Over.");
- }

redo 的实现
- private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
- lg.rewind(); /// 设置 lg 中指针的位置为4,即第一个[Log]的起始位置
- while(true) {
- byte[] log = lg.next(); /// 拿到[Log]中的[data]
- if(log == null) break;
- if(isInsertLog(log)) { /// 判断log[0]是否是插入类型
- InsertLogInfo li = parseInsertLog(log);
- long xid = li.xid;
- if(!tm.isActive(xid)) { /// 判断事务是否不处于进行中
- doInsertLog(pc, log, REDO); /// redo 插入操作
- }
- } else {
- UpdateLogInfo xi = parseUpdateLog(log);
- long xid = xi.xid;
- if(!tm.isActive(xid)) {
- doUpdateLog(pc, log, REDO); /// redo 更新操作
- }
- }
- }
- }

undo 的实现需要将一个事务内的多条日志倒序进行撤销
- private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
- Map<Long, List<byte[]>> logCache = new HashMap<>(); /// key 是 xid,value 是日志数据
- lg.rewind();
- while(true) {
- byte[] log = lg.next(); /// 拿到[Log]中的[data]
- if(log == null) break;
- if(isInsertLog(log)) {
- InsertLogInfo li = parseInsertLog(log);
- long xid = li.xid;
- if(tm.isActive(xid)) { /// 判断事务是否处于进行中
- if(!logCache.containsKey(xid)) {
- logCache.put(xid, new ArrayList<>());
- }
- logCache.get(xid).add(log);
- }
- } else {
- UpdateLogInfo xi = parseUpdateLog(log);
- long xid = xi.xid;
- if(tm.isActive(xid)) {
- if(!logCache.containsKey(xid)) {
- logCache.put(xid, new ArrayList<>());
- }
- logCache.get(xid).add(log);
- }
- }
- }
-
- // 对所有active log进行倒序undo
- for(Entry<Long, List<byte[]>> entry : logCache.entrySet()) {
- List<byte[]> logs = entry.getValue();
- for (int i = logs.size()-1; i >= 0; i --) {
- byte[] log = logs.get(i);
- if(isInsertLog(log)) {
- doInsertLog(pc, log, UNDO);
- } else {
- doUpdateLog(pc, log, UNDO);
- }
- }
- tm.abort(entry.getKey()); /// 将事务 xid 状态改为已撤销(回滚)
- }
- }

doInsertLog 中的删除使用的是 setDataItemRawInvalid,将该条数据的有效位设置为无效,进行逻辑删除。
- private static void doInsertLog(PageCache pc, byte[] log, int flag) {
- InsertLogInfo li = parseInsertLog(log);
- Page pg = null;
- try {
- pg = pc.getPage(li.pgno);
- } catch(Exception e) {
- Panic.panic(e);
- }
- try {
- if(flag == UNDO) {
- DataItem.setDataItemRawInvalid(li.raw);
- }
- PageX.recoverInsert(pg, li.raw, li.offset);
- } finally {
- pg.release();
- }
- }

- private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
- int pgno;
- short offset;
- byte[] raw;
- if(flag == REDO) {
- UpdateLogInfo xi = parseUpdateLog(log);
- pgno = xi.pgno;
- offset = xi.offset;
- raw = xi.newRaw; /// redo 的话,重做为新数据
- } else {
- UpdateLogInfo xi = parseUpdateLog(log);
- pgno = xi.pgno;
- offset = xi.offset;
- raw = xi.oldRaw; /// undo 的话,撤销为旧数据
- }
- Page pg = null;
- try {
- pg = pc.getPage(pgno);
- } catch (Exception e) {
- Panic.panic(e);
- }
- try {
- PageX.recoverUpdate(pg, raw, offset); /// 将数据放到缓存中
- } finally {
- pg.release(); /// 释放缓存,将页面数据写入磁盘
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。