赞
踩
五个模块如下:
实现顺序:TM–>DM–>VM–>IM–>TBM
Java中,数组作为对象,以对象形式存储在内存中
public class SubArray {
public byte[] raw;
public int start;
public int end;
public SubArray(byte[] raw, int start, int end) {
this.raw = raw;
this.start = start;
this.end = end;
}
}
TM 通过 XID 文件维护事务的状态,并提供接口供其他模块查询某个事务的状态
create方法:创建事务管理器
public static TransactionManagerImpl create(String path) { File f = new File(path + TransactionManagerImpl.XID_SUFFIX); try { if (!f.createNewFile()) { // 文件已存在 Panic.panic(Error.FileExistsException); } } catch (IOException e) { Panic.panic(e); } if (!f.canRead() || !f.canWrite()) { // 文件不可读或不可写 Panic.panic(Error.FileCannotRWException); } FileChannel fc = null; RandomAccessFile raf = null; try { raf = new RandomAccessFile(f, "rw"); fc = raf.getChannel(); } catch (IOException e) { Panic.panic(e); } // 写空XID文件头 ByteBuffer buf = ByteBuffer.wrap(new byte[TransactionManagerImpl.LEN_XID_HEADER_LENGTH]); try { fc.position(0); fc.write(buf); } catch (IOException e) { Panic.panic(e); } return new TransactionManagerImpl(raf, fc); }
open方法:开启事务管理器
public static TransactionManagerImpl open(String path) { File f = new File(path + TransactionManagerImpl.XID_SUFFIX); if (!f.exists()) { Panic.panic(Error.FileNotExistsException); } if (!f.canRead() || !f.canWrite()) { Panic.panic(Error.FileCannotRWException); } FileChannel fc = null; RandomAccessFile raf = null; try { raf = new RandomAccessFile(f, "rw"); fc = raf.getChannel(); } catch (FileNotFoundException e) { Panic.panic(e); } return new TransactionManagerImpl(raf, fc); }
begin方法:开启事务
// 开启一个事务 并返回xid
public long begin() {
counterLock.lock();
try {
long xid = xidCounter + 1;
updateXID(xid, FIELD_TRAN_ACTIVE);
incrXIDCounter();
return xid;
} finally {
counterLock.unlock();
}
}
commit方法:提交事务
// 提交事务
@Override
public void commit(long xid) {
updateXID(xid, FIELD_TRAN_COMMITTED);
}
abort方法:回滚事务
// 回滚事务
@Override
public void abort(long xid) {
updateXID(xid, FIELD_TRAN_ABORTED);
}
Data Manager 是 MYDB 的数据管理核心
DM 直接管理数据库 DB 文件和日志文件
- 上层模块和文件系统之间的一个抽象层,向下直接读写文件,向上提供数据包装
- 日志功能
主要职责:
public class PageImpl implements Page{
private int pageNumber; // 页面页号,页号从1开始
private byte[] data; // 实际包含的字节数据
private boolean dirty; // 是否是脏页面,脏页面需要被写回磁盘
private Lock lock;
// 用来方便在拿到 Page 的引用时可以快速对这个页面的缓存进行释放操作
private PageCache pc;
}
public interface PageCache {
int newPage(byte[] initData); // 新增页面
Page getPage(int pgno) throws Exception; // 获取页数
// 抽象缓存框架中定义的方法
void close(); // 关闭缓存,写回所有资源
void release(Page page); // 释放缓存
void truncateByPgno(int maxPgno); // 根据页号截断缓存
int getPageNumber(); // 获取当前打开的数据库文件页数
void flushPage(Page pg); // 刷回数据源
}
@Override protected Page getForCache(long key) throws Exception { // 数据源就是文件系统 直接从文件中读取,并包裹成 Page int pgno = (int) key; long offset = PageCacheImpl.pageOffset(pgno); ByteBuffer buf = ByteBuffer.allocate(PAGE_SIZE); fileLock.lock(); try { fc.position(offset); fc.read(buf); } catch (IOException e) { Panic.panic(e); } fileLock.unlock(); return new PageImpl(pgno, buf.array(), this); } private static long pageOffset(int pgno) { // 页号从 1 开始 return (pgno-1) * PAGE_SIZE; } // =========================================== @Override protected void releaseForCache(Page pg) { if (pg.isDirty()) { flush(pg); pg.setDirty(false); } }
@Override
public int newPage(byte[] initData) {
// 打开时计算,新增页面时自增
int pgno = pageNumbers.incrementAndGet();
Page pg = new PageImpl(pgno, initData, null);
flush(pg); // 新建的页面需要立刻写回文件系统
return pgno;
}
数据库文件的第一页通常用作一些特殊用途,比如存储一些元数据,启动检查等
// 启动时设置初始字节 public static void setVcOpen(Page pg) { pg.setDirty(true); setVcOpen(pg.getData()); } private static void setVcOpen(byte[] raw) { System.arraycopy(RandomUtil.randomBytes(LEN_VC), 0, raw, OF_VC, LEN_VC); } // 关闭时拷贝字节 public static void setVcClose(Page pg) { pg.setDirty(true); setVcClose(pg.getData()); } private static void setVcClose(byte[] raw) { System.arraycopy(raw, OF_VC, raw, OF_VC + LEN_VC, LEN_VC); } // 校验字节 public static boolean checkVc(Page pg) { return checkVc(pg.getData()); } private static boolean checkVc(byte[] raw) { return Arrays.equals( Arrays.copyOfRange(raw, OF_VC, OF_VC + LEN_VC), Arrays.copyOfRange(raw, OF_VC + LEN_VC, OF_VC + 2 * LEN_VC) ); }
Free Space Offset(自由空间偏移量)通常用于数据库管理系统中的页分配和管理。在数据库中,数据通常存储在页(Page)中,每个页都有固定大小的存储空间。自由空间偏移量表示了一个页中空闲空间开始的位置,即从该偏移量开始的位置可以用来存储新的数据。当向页中插入新的数据时,数据库系统会根据当前的自由空间偏移量来确定新数据的存储位置,并更新自由空间偏移量以反映已使用的空间。
对普通页的管理,基本都是围绕着对 FSO 进行的
// 向页面插入数据 // 将raw插入pg中,返回插入位置 public static short insert(Page pg, byte[] raw) { pg.setDirty(true); short offset = getFSO(pg.getData()); // 获取到FSO // 将 raw 中的内容复制到 pg.getData() 中 offset 开始的地方 System.arraycopy(raw, 0, pg.getData(), offset, raw.length); setFSO(pg.getData(), (short)(offset + raw.length)); return offset; } /* recoverInsert()和recoverUpdate()用于在数据库崩溃后重新打开时, 恢复例程直接插入数据以及修改数据使用 */ // 将raw插入pg中的offset位置,并将pg的offset设置为较大的offset public static void recoverInsert(Page pg, byte[] raw, short offset) { pg.setDirty(true); System.arraycopy(raw, 0, pg.getData(), offset, raw.length); short rawFSO = getFSO(pg.getData()); if(rawFSO < offset + raw.length) { setFSO(pg.getData(), (short)(offset + raw.length)); } } // 将raw插入pg中的offset位置,不更新update public static void recoverUpdate(Page pg, byte[] raw, short offset) { pg.setDirty(true); System.arraycopy(raw, 0, pg.getData(), offset, raw.length); }
日志的二进制文件,格式如下:
// XChecksum 是一个四字节的整数,是对后续所有日志计算的校验和(Checksum的和)
// Log1 ~ LogN 是常规的日志数据
// BadTail 是在数据库崩溃时,没有来得及写完的日志数据,这个 BadTail 不一定存在
[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]
// Size 是一个四字节整数,标识了 Data 段的字节数。
// Checksum 则是该条日志的校验和。
[Size][Checksum][Data]
// Checksum 通过种子生成
private int calChecksum(int xCheck, byte[] log) {
for (byte b : log) {
xCheck = xCheck * SEED + b;
}
return xCheck;
}
迭代器模式是一种行为型设计模式,它提供了一种顺序访问聚合对象中各个元素的方法,而不暴露其内部表示。
next() 方法提供了按顺序读取日志记录的功能,类似于迭代器模式中的 next() 方法用于按顺序访问集合元素。
@Override public byte[] next() { lock.lock(); try { byte[] log = internNext(); if (log == null) return null; return Arrays.copyOfRange(log, OF_DATA, log.length); } finally { lock.unlock(); } } /** * 从数据库文件中读取下一个记录 * @return 日志记录 */ private byte[] internNext() { // 检查当前位置 + 数据偏移量 和 文件大小的关系 if (position + OF_DATA >= fileSize) { return null; // 到达文件末尾 返回null } // ByteBuffer tmp = ByteBuffer.allocate(4); try { fc.position(position); fc.read(tmp); } catch (IOException e) { Panic.panic(e); } int size = Parser.parseInt(tmp.array()); if (position + size + OF_DATA > fileSize) { return null; } ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size); try { fc.position(position); fc.read(buf); } catch (IOException e) { Panic.panic(e); } byte[] log = buf.array(); int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length)); int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA)); if (checkSum1 != checkSum2) { return null; } position += log.length; return log; }
// 检查并移除bad tail private void checkAndRemoveTail() { rewind(); int xCheck = 0; while (true) { byte[] log = internNext(); if (log == null) break; xCheck = calChecksum(xCheck, log); } if (xCheck != xChecksum) { Panic.panic(Error.BadLogFileException); } try { truncate(position); } catch (Exception e) { Panic.panic(e); } try { file.seek(position); } catch (IOException e) { Panic.panic(e); } rewind(); }
@Override public void log(byte[] data) { byte[] log = wrapLog(data); ByteBuffer buf = ByteBuffer.wrap(log); lock.lock(); try { fc.position(fc.size()); fc.write(buf); } catch (IOException e) { Panic.panic(e); } finally { lock.unlock(); } updateXChecksum(log); } // 更新校验和 private void updateXChecksum(byte[] log) { this.xChecksum = calChecksum(this.xChecksum, log); try { fc.position(0); fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum))); fc.force(false); } catch (IOException e) { Panic.panic(e); } } // 包装数据为日志格式 private byte[] wrapLog(byte[] data) { byte[] checksum = Parser.int2Byte(calChecksum(0, data)); byte[] size = Parser.int2Byte(data.length); return Bytes.concat(size, checksum, data); }
恢复策略:在进行 I 和 U 操作之前,必须先进行对应的日志操作,在保证日志写入磁盘后,才进行数据操作。
// 不考虑并发情况下,日志的情况
(Ti, ?, ?), ..., (Tj, ?, ?), (Tk, ?, ?), ..., (Tk, ?, ?)
日志恢复步骤:
事务 T 进行redo步骤:
事务 T 进行undo步骤:
MYDB 中其实没有真正的删除操作,对于插入操作的 undo,只是将其中的标志位设置为 invalid。
先来看两个规定:
规定1:正在进行的事务,不会读取其他任何未提交的事务产生的数据
规定2:正在进行的事务,不会修改其他任何未提交的事务修改或产生的数据
// 情况一: T1 begin T2 begin T2 U(x) // T2 更新了 x T1 R(x) // T1 读取了 x ... T1 commit MYDB break down // T1 提交,T2 未提交 // 这种情况下,T2 需要撤销,那么 T1 也应当被撤销(级联回滚) // 但是 T1 已经提交(持久化) // 所以需要保证 规定1 //------------------------------------ // 情况二:x 初始值为 0 T1 begin T2 begin T1 set x = x+1 // 产生的日志为(T1, U, A, 0, 1) T2 set x = x+1 // 产生的日志为(T1, U, A, 1, 2) T2 commit MYDB break down // 这种情况下,需要对 T1 进行 undo,对 T2 进行 redo // 但是处理后的结果是错误的,原因在于 MYDB 日志和恢复方式太过简单 // 解决方法有:增加日志种类 or 限制数据库操作,我们选择 2 // 所以需要保证 规定2
在恢复后,数据库就会恢复到所有已完成事务结束,所有未完成事务尚未开始的状态。
// 定义两种日志格式 // updateLog: // [LogType] [XID] [UID] [OldRaw] [NewRaw] private static final byte LOG_TYPE_UPDATE = 1; // 更新 // insertLog: // [LogType] [XID] [Pgno] [Offset] [Raw] private static final byte LOG_TYPE_INSERT = 0; // 插入 /** * 重做所有已完成的事务 * * @param tm 事务 * @param lg 日志 * @param pc 页面缓存 */ private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc){ lg.rewind(); while(true) { byte[] log = lg.next(); if(log == null) break; if(isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if(!tm.isActive(xid)) { doInsertLog(pc, log, REDO); } } else { UpdateLogInfo xi = parseUpdateLog(log); long xid = xi.xid; if(!tm.isActive(xid)) { doUpdateLog(pc, log, REDO); } } } } /** * 撤销所有未完成的事务 * * @param tm 事务 * @param lg 日志 * @param pc 页面缓存 */ private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) { Map<Long, List<byte[]>> logCache = new HashMap<>(); lg.rewind(); while(true) { byte[] log = lg.next(); if(log == null) break; if(isInsertLog(log)) { InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if(tm.isActive(xid)) { if(!logCache.containsKey(xid)) { logCache.put(xid, new ArrayList<>()); } logCache.get(xid).add(log); } } else { UpdateLogInfo xi = parseUpdateLog(log); long xid = xi.xid; if(tm.isActive(xid)) { if(!logCache.containsKey(xid)) { logCache.put(xid, new ArrayList<>()); } logCache.get(xid).add(log); } } } // 对所有active log 进行倒序 undo for(Map.Entry<Long, List<byte[]>> entry : logCache.entrySet()) { List<byte[]> logs = entry.getValue(); for (int i = logs.size()-1; i >= 0; i --) { byte[] log = logs.get(i); if(isInsertLog(log)) { doInsertLog(pc, log, UNDO); } else { doUpdateLog(pc, log, UNDO); } } tm.abort(entry.getKey()); } } private static void doUpdateLog(PageCache pc, byte[] log, int flag) { int pgno; short offset; byte[] raw; if(flag == REDO) { UpdateLogInfo xi = parseUpdateLog(log); pgno = xi.pgno; offset = xi.offset; raw = xi.newRaw; } else { UpdateLogInfo xi = parseUpdateLog(log); pgno = xi.pgno; offset = xi.offset; raw = xi.oldRaw; } Page pg = null; try { pg = pc.getPage(pgno); } catch (Exception e) { Panic.panic(e); } try { PageX.recoverUpdate(pg, raw, offset); } finally { pg.release(); } } private static void doInsertLog(PageCache pc, byte[] log, int flag) { InsertLogInfo li = parseInsertLog(log); Page pg = null; try { pg = pc.getPage(li.pgno); } catch(Exception e) { Panic.panic(e); } try { if(flag == UNDO) { // DataItem 的有效位设置为无效 进行逻辑删除 DataItem.setDataItemRawInvalid(li.raw); } PageX.recoverInsert(pg, li.raw, li.offset); } finally { pg.release(); } }
页面索引,缓存了每一页的空闲空间。用于在上层模块进行插入操作时,能够快速找到一个合适空间的页面,而无需从磁盘或者缓存中检查每一个页面的信息。
实现方式:
public class PageIndex { // 将一页划成40个区间 private static final int INTERVALS_NO = 40; // 每个区间的内存大小 private static final int THRESHOLD = PageCache.PAGE_SIZE / INTERVALS_NO; private Lock lock; // 维护一个页面信息的 List数组,实现页面索引 private List<PageInfo>[] lists; @SuppressWarnings("unchecked") public PageIndex() { lock = new ReentrantLock(); lists = new List[INTERVALS_NO + 1]; for (int i = 0; i < INTERVALS_NO + 1; i++) { lists[i] = new ArrayList<>(); } } /** * 插入页面操作 * 前面被选择的页,会直接从 PageIndex 中移除, * 这意味着,同一个页面是不允许并发写的。 * 在上层模块使用完这个页面后,需要将其重新插入 PageIndex * * @param pgno 页号 * @param freeSpace 空闲空间大小 */ public void add(int pgno, int freeSpace) { lock.lock(); try { int number = freeSpace / THRESHOLD; lists[number].add(new PageInfo(pgno, freeSpace)); } finally { lock.unlock(); } } /** * 从 PageIndex 中获取页面 * 算出区间号,直接取 * * @param spaceSize * @return */ public PageInfo select(int spaceSize) { lock.lock(); try { // 计算出满足请求空间的区间号 int number = spaceSize / THRESHOLD; // 因为区间从1开始,所以要加1操作 if (number < INTERVALS_NO) number++; // 循环查找大于等于请求空间的页面 while (number <= INTERVALS_NO) { if (lists[number].size() == 0) { // 当前区间没有空余页面,后移一个区间 number++; continue; } // 从页面索引List中移除第一个满足的页面信息PageInfo return lists[number].remove(0); } return null; } finally { lock.unlock(); } } }
// 初始化pageIndex
void fillPageIndex() {
int pageNumber = pc.getPageNumber();
for(int i = 2; i <= pageNumber; i ++) {
Page pg = null;
try {
pg = pc.getPage(i);
} catch (Exception e) {
Panic.panic(e);
}
pIndex.add(pg.getPageNumber(), PageX.getFreeSpace(pg));
pg.release(); // 使用完 Page 后需要及时 release
}
}
public class DataItemImpl implements DataItem{ // 偏移量 static final int OF_VALID = 0; // ValueFlag 开始位置 static final int OF_SIZE = 1; // DataSize 开始位置 static final int OF_DATA = 3; // Data 开始位置 private SubArray raw; // 子区间数据,共享内存 private byte[] oldRaw; // 暂存需要修改的数据内容 private Lock rLock; // 读锁 private Lock wLock; // 写锁 // 释放依赖 dm 的释放 // 修改数据时落日志 private DataManagerImpl dm; private long uid;// DataItem缓存的key,uid = 页号 + 偏移量 private Page pg;
DataItem 中保存的数据,结构如下:
// ValidFlag占用一个字节,表示这个DI是否有效
// DataSize占用两个字节,表示后面Data的长度
[ValidFlag] [DataSize] [Data]
上层模块在获取到DataItem之后,可以通过data()方法,该方法返回的数组是数据共享的,而不是拷贝实现的:
/**
* 通过共享内存的方式获取指定的 DATA 数据
* @return
*/
@Override
public SubArray data() {
return new SubArray(raw.raw, raw.start+OF_DATA, raw.end);
}
在上层模块试图对 DataItem 进行修改时,需要遵循一定的流程:
整个流程,主要是为了保存前相数据,并及时落日志。
DM 会保证对 DataItem 的修改是原子性的。
/** * 修改数据之前的操作 * 包含了加写锁,设置脏页面,暂存需要修改的数据内容到oldRaw */ @Override public void before() { wLock.lock(); pg.setDirty(true); System.arraycopy(raw.raw, raw.start, oldRaw, 0, oldRaw.length); } /** * 撤销修改 * 将数据还原,关闭写锁 */ @Override public void unBefore() { System.arraycopy(oldRaw, 0, raw.raw, raw.start, oldRaw.length); wLock.unlock(); } /** * 修改数据完成后的操作 * 记录此事务的修改操作到日志,关闭写锁 * @param xid */ @Override public void after(long xid) { dm.logDataItem(xid, this); // dm中的 对修改操作落日志的方法 wLock.unlock(); } /** * 使用完要及时释放这个DataItem的缓存 */ @Override public void release() { dm.releaseDataItem(this); }
/** * DataItem 缓存,getForCache(), * 只需要从 key 中解析出页号, * 从 pageCache 中获取到页面, * 再根据偏移,解析出 DataItem 即可 * * @param uid dataItem的id,页面+偏移量,前32位是页号,后32位是偏移量 * @return DataItem */ @Override protected DataItem getForCache(long uid) throws Exception { short offset = (short)(uid & ((1L << 16) - 1)); uid >>>= 32; int pgno = (int)(uid & ((1L << 32) - 1)); Page pg = pc.getPage(pgno); return DataItem.parseDataItem(pg, offset, this); } /** * DataItem缓存释放 * 需要将 DataItem 写回数据源, * 由于对文件的读写是以页为单位进行的, * 只需要将 DataItem 所在的页 release 即可 * * @param di */ @Override protected void releaseForCache(DataItem di) { di.page().release(); }
从已有文件创建 DataManager 和从空文件创建 DataManager 的流程稍有不同:
/** * 空文件创建DataManager * * @param path * @param mem * @param tm * @return */ public static DataManager create(String path, long mem, TransactionManager tm) { PageCache pc = PageCache.create(path, mem); // 新建页面缓存 Logger lg = Logger.create(path); // 新建日志 DataManagerImpl dm = new DataManagerImpl(pc, lg, tm); // 新建 DataManager dm.initPageOne(); // 对第一页校验页面 进行初始化 return dm; } /** * 已有文件新建 DataManager * * @param path * @param mem * @param tm * @return */ public static DataManager open(String path, long mem, TransactionManager tm) { PageCache pc = PageCache.open(path, mem); // 打开页面缓存 Logger lg = Logger.open(path); // 打开日志 DataManagerImpl dm = new DataManagerImpl(pc, lg, tm); // 新建 DataManager // 是否执行恢复流程 if (!dm.loadCheckPageOne()) { // 数据库非正常关闭 执行恢复 Recover.recover(tm, lg, pc); } // 重新填写页面索引 dm.fillPageIndex(); // 重新设置 第一页 随机字节 PageOne.setVcOpen(dm.pageOne); // 第一页 刷回数据源 dm.pc.flushPage(dm.pageOne); return dm; }
// 在创建文件时初始化PageOne void initPageOne() { int pgno = pc.newPage(PageOne.InitRaw()); assert pgno == 1; try { pageOne = pc.getPage(pgno); } catch (Exception e) { Panic.panic(e); } pc.flushPage(pageOne); } // 在打开已有文件时时读入PageOne,并验证正确性 boolean loadCheckPageOne() { try { pageOne = pc.getPage(1); } catch (Exception e) { Panic.panic(e); } return PageOne.checkVc(pageOne); }
/** * 根据 UID 从缓存中获取 DataItem,并校验有效位 * * @param uid * @return * @throws Exception */ @Override public DataItem read(long uid) throws Exception { DataItemImpl di = (DataItemImpl)super.get(uid); // 获取 DataItem if(!di.isValid()) { di.release(); return null; } return di; } /** * 在 pageIndex 中获取一个足以存储插入内容的页面的页号, * 获取页面后,首先需要写入插入日志,接着才可以通过 pageX 插入数据,并返回插入位置的偏移。 * 最后需要将页面信息重新插入 pageIndex。 * * @param xid 事务id * @param data 插入数据 * @return * @throws Exception */ @Override public long insert(long xid, byte[] data) throws Exception { // 将数据打包为 DataItem 格式 byte[] raw = DataItem.wrapDataItemRaw(data); // 数据过大 抛出异常 if(raw.length > PageX.MAX_FREE_SPACE) { throw Error.DataTooLargeException; } PageInfo pi = null; // 在 pageIndex 中获取一个足以存储插入内容的页面的页号,最多尝试五次 for(int i = 0; i < 5; i ++) { // 尝试从页面索引中获取 pi = pIndex.select(raw.length); if (pi != null) { break; } else { // 获取失败说明已经存在的数据页没有足够的空闲空间插入数据,那么就新建一个数据页 int newPgno = pc.newPage(PageX.initRaw()); // 更新页面索引 pIndex.add(newPgno, PageX.MAX_FREE_SPACE); } } if(pi == null) { throw Error.DatabaseBusyException; } Page pg = null; int freeSpace = 0; try { // 获取插入页号 pg = pc.getPage(pi.pgno); // 写入插入日志 byte[] log = Recover.insertLog(xid, pg, raw); logger.log(log); // 完成页面数据插入, 返回在此页面中的插入位置偏移量 short offset = PageX.insert(pg, raw); // 释放页面的缓存 pg.release(); // 返回 uid return Types.addressToUid(pi.pgno, offset); } finally { // 更新pIndex 将取出的pg重新插入pIndex if(pg != null) { pIndex.add(pi.pgno, PageX.getFreeSpace(pg)); } else { pIndex.add(pi.pgno, freeSpace); } } }
/**
* DM 关闭
*/
@Override
public void close() {
super.close(); // 关闭缓存
logger.close(); // 关闭日志
PageOne.setVcClose(pageOne); // 设置第一页的校验字节
pageOne.release();
pc.close();
}
Version Manager 是 MYDB 的事务和数据版本的管理核心。
VM 基于两段锁协议实现调度序列的可串行化,并实现 MVCC ,实现两种隔离级别
只看更新操作(U)和读操作(R),两个操作只要满足下面三个条件,就可以说这两个操作相互冲突:
那么这样,对同一个数据操作的冲突,其实就只有下面这两种情况:
处理冲突的意义在于:交换两个互不冲突的操作的顺序,不会对最终的结果造成影响,而交换两个冲突操作的顺序,则是会有影响的
// 譬如,T1 已经因为 U1(x) 锁定了 x,
// 那么 T2 对 x 的读或者写操作都会被阻塞,
// T2 必须等待 T1 释放掉对 x 的锁。
T1 begin
T2 begin
R1(x) // T1读到0
R2(x) // T2读到0
U1(0+1) // T1尝试把x+1
U2(0+1) // T2尝试把x+1
T1 commit
T2 commit
记录和版本:
- DM 层向上层提供了数据项(Data Item)的概念,VM 通过管理所有的数据项,向上层提供了记录(Entry)的概念。上层模块通过 VM 操作数据的最小单位,就是记录。
- VM 则在其内部,为每个记录,维护了多个版本(Version)。每当上层模块对某个记录进行修改时,VM 就会为这个记录创建一个新的版本。
由于 2PL 和 MVCC,DM中定义的这两个条件都被很轻易地满足了:
一条记录存储在一条 Data Item 中,所以 Entry 中保存一个 DataItem 的引用即可:
public class Entry { private static final int OF_XMIN = 0; private static final int OF_XMAX = OF_XMIN+8; private static final int OF_DATA = OF_XMAX+8; private long uid; // 版本id private DataItem dataItem; // 数据项 private VersionManager vm; // 事物的版本管理器 // 读取一个 DataItem 打包成 entry public static Entry loadEntry(VersionManager vm, long uid) throws Exception { DataItem di = ((VersionManagerImpl)vm).dm.read(uid); return newEntry(vm, di, uid); } public void remove() { dataItem.release(); }
规定一条 Entry 中存储的数据格式如下:
// XMIN 是创建该条记录(版本)的事务编号
// XMAX 是删除该条记录(版本)的事务编号
// DATA 是这条记录持有的数据。
[XMIN] [XMAX] [DATA]
根据这个结构,在创建记录时调用的 wrapEntryRaw() 方法如下:
/**
* 将事务id和数据记录打包成一个 entry 格式
*
* @param xid 事务id
* @param data 记录数据
* @return
*/
public static byte[] wrapEntryRaw(long xid, byte[] data) {
byte[] xmin = Parser.long2Byte(xid);
byte[] xmax = new byte[8];
return Bytes.concat(xmin, xmax, data);
}
如果要获取记录中持有的数据,也就需要按照这个结构来解析:
/** * 获取记录中持有的数据 * 以拷贝的形式返回内容 * * @return */ public byte[] data() { dataItem.rLock(); try { SubArray sa = dataItem.data(); byte[] data = new byte[sa.end - sa.start - OF_DATA]; // 数组拷贝操作 System.arraycopy(sa.raw, sa.start+OF_DATA, data, 0, data.length); return data; } finally { dataItem.rUnLock(); } }
这里以拷贝的形式返回数据,如果需要修改的话,需要对 DataItem 执行 before() 方法,这个在设置 XMAX 的值中体现了:
/**
* 修改数据
* @param xid
*/
public void setXmax(long xid) {
// 修改数据前必须执行 包含了加写锁,设置脏页面,暂存需要修改的数据内容到oldRaw
dataItem.before();
try {
SubArray sa = dataItem.data();
System.arraycopy(Parser.long2Byte(xid), 0, sa.raw, sa.start+OF_XMAX, 8);
} finally {
// 修改数据后必须执行 记录此事务的修改操作到日志,关闭写锁
dataItem.after(xid);
}
}
上面提到,如果一个记录的最新版本被加锁,当另一个事务想要修改或读取这条记录时,MYDB 就会返回一个较旧的版本的数据。
这时就可以认为,最新的被加锁的版本,对于另一个事务来说,是不可见的。于是版本可见性的概念就诞生了。
MYDB 实现读提交,为每个版本维护了两个变量,就是上面提到的 XMIN 和 XMAX:
XMIN 应当在版本创建时填写,而 XMAX 则在版本被删除,或者有新版本出现时填写。
XMAX 这个变量,也就解释了为什么 DM 层不提供删除操作,当想删除一个版本时,只需要设置其 XMAX,这样,这个版本对每一个 XMAX 之后的事务都是不可见的,也就等价于删除了。
可以推导出,在读提交下,版本对事务的可见性逻辑如下:
(XMIN == Ti and // 由Ti创建且
XMAX == NULL // 还未被删除
)
or // 或
(XMIN is commited and // 由一个已提交的事务创建且
(XMAX == NULL or // 尚未删除或
(XMAX != Ti and XMAX is not commited) // 由一个未提交的事务删除
))
// 若条件为 true,则版本对 Ti 可见。
// 那么获取 Ti 适合的版本,只需要从最新版本开始,依次向前检查可见性,如果为 true,就可以直接返回。
/** * 判断某个记录(数据版本)对事务 t 是否可见 * * @param tm 事务管理器 * @param t 事务 * @param e 数据版本链 * @return 对事务t的可见性 */ private static boolean readCommitted(TransactionManager tm, Transaction t, Entry e) { long xid = t.xid; // 获取事务id (Transaction 结构提供) long xmin = e.getXmin(); // 获取数据版本链最新版本的操作事务id long xmax = e.getXmax(); // 获取数据版本链最新版本的删除事务(下一个事务)id if(xmin == xid && xmax == 0) // 当前数据版本是事务t创建的,并且没有被删除,则对事务t可见 return true; // 由一个已经提交的事务创建 if(tm.isCommitted(xmin)) { // 如果没有被删除,则对事务t可见 if(xmax == 0) return true; // 如果由一个未提交的事务删除当前版本,也对事务t可见 if(xmax != xid) { if(!tm.isCommitted(xmax)) { return true; } } } return false; }
读提交会导致不可重复读和幻读。
不可重复读:在⼀个事务范围内,两个相同的查询,读取同⼀条记录,却返回了不同的数据
幻读:事务 A 查询⼀个范围的结果集,另⼀个并发事务 B 往这个范围中插入 / 删除了数据,并静悄悄地提交,然后事务 A 再次查询相同的范围,两次读取得到的结果集不⼀样了
我们解决不可重复读的问题,如果想要避免这个情况,就需要引入更严格的隔离级别,即可重复读(repeatable read)。
我们增加一条规定:
规定:事务只能读取它开始时, 就已经结束的那些事务产生的数据版本
在此规定下,事务需要忽略:
可以推导出,在可重复读下,版本对事务的可见性逻辑如下:
(XMIN == Ti and // 由Ti创建且
(XMAX == NULL or // 尚未被删除
))
or // 或
(XMIN is commited and // 由一个已提交的事务创建且
XMIN < XID and // 这个事务小于Ti且
XMIN is not in SP(Ti) and // 这个事务在Ti开始前提交且
(XMAX == NULL or // 尚未被删除或
(XMAX != Ti and // 由其他事务删除但是
(XMAX is not commited or // 这个事务尚未提交或
XMAX > Ti or // 这个事务在Ti开始之后才开始或
XMAX is in SP(Ti) // 这个事务在Ti开始前还未提交
))))
于是,需要提供一个结构,来抽象一个事务,以保存快照数据:
public class Transaction { public long xid; // 事务id public int level; // 事务隔离等级,0:读已提交;1:可重复读 // 活跃事务的快照,用于实现可重复读 public Map<Long, Boolean> snapshot; public Exception err; // 事务的错误 public boolean autoAborted; // 自动回滚标记 // 静态工厂方法 用于创建 Transaction 对象 // active 保存着当前所有 active 的事务 public static Transaction newTransaction(long xid, int level, Map<Long, Transaction> active) { Transaction t = new Transaction(); t.xid = xid; t.level = level; // 只有可重复读才需要 活跃事务列表 if(level != 0) { t.snapshot = new HashMap<>(); for(Long x : active.keySet()) { t.snapshot.put(x, true); } } return t; } // 判断xid是否是活跃事务 public boolean isInSnapshot(long xid) { if(xid == TransactionManagerImpl.SUPER_XID) { return false; } return snapshot.containsKey(xid); } }
可重复读的隔离级别下,一个版本是否对事务可见的判断如下:
/** * 可重复读,多了一个记录活跃事务,简而言之活跃事务操作的数据版本都是不可见的 * 读取事务t操作的版本只要没被删除都是可见的; * 读取其他事务操作过的版本数据, * 只能读取在本事务开始前就已经提交的事务,并且没有在活跃事务列表里面也没有被删除 * * @param tm 事务管理器 * @param t 事务 * @param e 数据版本链 * @return 对事务t的可见性 */ private static boolean repeatableRead(TransactionManager tm, Transaction t, Entry e) { long xid = t.xid; long xmin = e.getXmin(); long xmax = e.getXmax(); // 读取自己操作的版本只要没被删除都是可见的 if(xmin == xid && xmax == 0) return true; // 大范围,只能读取在本事务开始前就已经提交的事务,并且没有在活跃事务列表里面 if(tm.isCommitted(xmin) && xmin < xid && !t.isInSnapshot(xmin)) { // 当前版本还不能被删除 if(xmax == 0) return true; // 删除的事务在本事务之后开始,或者未提交,再或者是活跃事务也是对当前事务可见的 if(xmax != xid) { if(!tm.isCommitted(xmax) || xmax > xid || t.isInSnapshot(xmax)) { return true; } } } return false; }
MVCC 的实现,使得 MYDB 在撤销或是回滚事务很简单:只需要将这个事务标记为 aborted 即可。
一个 aborted 事务产生的数据,不会对其他事务产生任何影响,也就相当于,这个事务不曾存在过
版本跳跃问题,考虑如下的情况,假设 X 最初只有 x0 版本,T1 和 T2 都是可重复读的隔离级别:
T1 begin
T2 begin
R1(X) // T1读取x0
R2(X) // T2读取x0
U1(X) // T1将X更新到x1
T1 commit
U2(X) // T2将X更新到x2
T2 commit
// 这种情况实际运行起来是没问题的,但是逻辑上不太正确。
// T1 将 X 从 x0 更新为了 x1,这是没错的。
// 但是 T2 则是将 X 从 x0 更新成了 x2,跳过了 x1 版本。
解决版本跳跃的思路也很简单:
上一节中总结了,Ti 不可见的 Tj,有两种情况:
版本跳跃的检查:取出要修改的数据 X 的最新提交版本,并检查该最新版本的创建者对当前事务是否可见:
/** * 检查是否存在版本跳跃: * 取出要修改的数据 X 的最新提交版本,并检查该最新版本的创建者对当前事务是否可见 * * @param tm * @param t * @param e * @return */ public static boolean isVersionSkip(TransactionManager tm, Transaction t, Entry e) { long xmax = e.getXmax(); // 如果事务隔离等级是 读已提交,就允许版本跳跃 if(t.level == 0) { return false; } else { // 已提交删除当前事务版本,并且这个删除的事务id是在此事务之后发生 或者 是一个未提交的活跃事务操作删除的,就存在版本跳跃 return tm.isCommitted(xmax) && (xmax > t.xid || t.isInSnapshot(xmax)); } }
MYDB 使用一个 LockTable 对象,在内存中维护这张图。维护结构如下:
public class LockTable {
private Map<Long, List<Long>> x2u; // 某个XID已经获得的资源的UID列表
private Map<Long, Long> u2x; // UID被某个XID持有
private Map<Long, List<Long>> wait; // 正在等待UID的XID列表
private Map<Long, Lock> waitLock; // 正在等待资源的XID的锁
private Map<Long, Long> waitU; // XID正在等待的UID
private Lock lock;
...
}
在每次出现等待的情况时,就尝试向图中增加一条边,并进行死锁检测。
如果检测到死锁,就撤销这条边,不允许添加,并撤销该事务。
/** * 向依赖等待图中添加一个等待记录 * 事务xid 阻塞等待 数据项uid,如果会造成死锁则返回上了锁的 Lock 对象 * * @param xid 事务id * @param uid 数据项key * @return 不需要等待则返回null,否则返回锁对象 * @throws Exception */ public Lock add(long xid, long uid) throws Exception { lock.lock(); try { // dataitem数据已经被事务xid获取到,不需要等待,返回null if(isInList(x2u, xid, uid)) { return null; } // 如果 uid 资源不被持有 xid获得该uid 加入持有列表 不需要等待 返回null if(!u2x.containsKey(uid)) { u2x.put(uid, xid); putIntoList(x2u, xid, uid); return null; } waitU.put(xid, uid); // 添加等待状态 putIntoList(wait, xid, uid); // 加入列表 // 死锁检测 if(hasDeadLock()) { waitU.remove(xid); removeFromList(wait, uid, xid); throw Error.DeadlockException; } // 返回上了锁的 Lock 对象 // 调用方在获取到该对象时,需要尝试获取该对象的锁,由此实现阻塞线程的目的 Lock l = new ReentrantLock(); l.lock(); waitLock.put(xid, l); return l; } finally { lock.unlock(); } }
调用 add,如果需要等待的话,会返回一个上了锁的 Lock 对象。
调用方在获取到该对象时,需要尝试获取该对象的锁,由此实现阻塞线程的目的,例如:
Lock l = lt.add(xid, uid);
if(l != null) {
l.lock(); // 阻塞在这一步
l.unlock();
}
图中是否有环:
// 死锁检测 private boolean hasDeadLock() { xidStamp = new HashMap<>(); stamp = 1; for(long xid : x2u.keySet()) { Integer s = xidStamp.get(xid); if(s != null && s > 0) { continue; } stamp++; if(dfs(xid)) { return true; } } return false; } /** * 深度优先搜索 * * @param xid * @return */ private boolean dfs(long xid) { Integer stp = xidStamp.get(xid); if(stp != null && stp == stamp) { return true; } if(stp != null && stp < stamp) { return false; } xidStamp.put(xid, stamp); Long uid = waitU.get(xid); if(uid == null) return false; Long x = u2x.get(uid); assert x != null; return dfs(x); }
在一个事务 commit 或者 abort 时,就可以释放所有它持有的锁,并将自身从等待图中删除。
/** * 在一个事务 commit 或者 abort 时,就可以释放所有它持有的锁,并将自身从等待图中删除 * * @param xid */ public void remove(long xid) { lock.lock(); try { List<Long> l = x2u.get(xid); if(l != null) { while(l.size() > 0) { Long uid = l.remove(0); selectNewXID(uid); // 释放的资源可以被获取 } } waitU.remove(xid); x2u.remove(xid); waitLock.remove(xid); } finally { lock.unlock(); } } // 从等待队列中选择一个xid来占用uid private void selectNewXID(long uid) { u2x.remove(uid); List<Long> l = wait.get(uid); if(l == null) return; assert l.size() > 0; // while 循环释放掉了这个线程所有持有的资源的锁,这些资源可以被等待的线程所获取: // 从 List 开头开始尝试解锁,还是个公平锁。 // 解锁时,将该 Lock 对象 unlock 即可,这样业务线程就获取到了锁,就可以继续执行了。 while(l.size() > 0) { long xid = l.remove(0); if(!waitLock.containsKey(xid)) { continue; } else { u2x.put(uid, xid); Lock lo = waitLock.remove(xid); waitU.remove(xid); lo.unlock(); break; } } if(l.size() == 0) wait.remove(uid); }
VM 层通过 VersionManager 接口,向上层提供功能,如下
public interface VersionManager { // 数据版本链管理 // 保证可见性的条件下,读取数据 DataItem byte[] read(long xid, long uid) throws Exception; // 通过事务 xid 插入数据 long insert(long xid, byte[] data) throws Exception; // 通过事务 xid 删除数据 boolean delete(long xid, long uid) throws Exception; // 事务管理 long begin(int level); // 事务开启隔离级别 void commit(long xid) throws Exception; // 提交事务 void abort(long xid); // 撤销事务 }
同时,VM 的实现类还被设计为 Entry 的缓存,需要继承 AbstractCache。需要实现的获取到缓存和从缓存释放的方法很简单:
/** * 获取到缓存 * * @param uid * @return * @throws Exception */ @Override protected Entry getForCache(long uid) throws Exception { Entry entry = Entry.loadEntry(this, uid); if(entry == null) { throw Error.NullEntryException; } return entry; } /** * 从缓存释放 * @param entry */ @Override protected void releaseForCache(Entry entry) { entry.remove(); }
begin() 开启一个事务,并初始化事务的结构,将其存放在 activeTransaction 中,用于检查和快照使用:
/** * 开启一个事务,并初始化事务的结构,将其存放在 activeTransaction 中,用于检查和快照使用 * * @param level 隔离等级 * @return */ @Override public long begin(int level) { lock.lock(); try { // 开启一个新事务 long xid = tm.begin(); // 初始化事务的结构 Transaction t = Transaction.newTransaction(xid, level, activeTransaction); // 将其存放在 activeTransaction 中,用于检查和快照使用 activeTransaction.put(xid, t); return xid; } finally { lock.unlock(); } }
commit() 方法提交一个事务,主要就是 free 掉相关的结构,并且释放持有的锁,并修改 TM 状态:
/** * 提交一个事务,主要就是 free 掉相关的结构,并且释放持有的锁,修改 TM 状态 * * @param xid * @throws Exception */ @Override public void commit(long xid) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); try { if(t.err != null) { throw t.err; } } catch(NullPointerException n) { System.out.println(xid); System.out.println(activeTransaction.keySet()); Panic.panic(n); } lock.lock(); activeTransaction.remove(xid); lock.unlock(); lt.remove(xid); tm.commit(xid); }
abort 事务的方法则有两种,手动和自动。
// 手动回滚 @Override public void abort(long xid) { internAbort(xid, false); } /** * 自动回滚 * 1. 在事务被检测出出现死锁时,会自动撤销回滚事务; * 2. 出现版本跳跃时,也会自动回滚 * * @param xid 事务id * @param autoAborted 是否自动回滚 */ private void internAbort(long xid, boolean autoAborted) { lock.lock(); Transaction t = activeTransaction.get(xid); if(!autoAborted) { activeTransaction.remove(xid); } lock.unlock(); if(t.autoAborted) return; lt.remove(xid); tm.abort(xid); }
read() 方法读取一个 entry,注意判断下可见性即可:
@Override public byte[] read(long xid, long uid) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); if(t.err != null) { throw t.err; } Entry entry = null; try { entry = super.get(uid); } catch(Exception e) { if(e == Error.NullEntryException) { return null; } else { throw e; } } try { if(Visibility.isVisible(tm, t, entry)) { return entry.data(); } else { return null; } } finally { entry.release(); } }
insert() 则是将数据包裹成 Entry,无脑交给 DM 插入即可:
@Override
public long insert(long xid, byte[] data) throws Exception {
lock.lock();
Transaction t = activeTransaction.get(xid);
lock.unlock();
if(t.err != null) {
throw t.err;
}
// 包裹成entry交给dm处理
byte[] raw = Entry.wrapEntryRaw(xid, data);
return dm.insert(xid, raw);
}
delete() 方法
/** * 删除操作只有一个设置 XMAX * * 需要的前置的三件事: * 1. 可见性判断 * 2. 获取资源的锁 * 3. 版本跳跃判断 * * @param xid * @param uid * @return * @throws Exception */ @Override public boolean delete(long xid, long uid) throws Exception { lock.lock(); Transaction t = activeTransaction.get(xid); lock.unlock(); if(t.err != null) { throw t.err; } Entry entry = null; try { entry = super.get(uid); } catch(Exception e) { if(e == Error.NullEntryException) { return false; } else { throw e; } } // 可见性判断 try { if(!Visibility.isVisible(tm, t, entry)) { return false; } Lock l = null; try { l = lt.add(xid, uid); // 添加到死锁检测 } catch(Exception e) { t.err = Error.ConcurrentUpdateException; internAbort(xid, true); // 自动回滚 t.autoAborted = true; throw t.err; } if(l != null) { l.lock(); l.unlock(); } if(entry.getXmax() == xid) { return false; } // 版本跳跃判断 if(Visibility.isVersionSkip(tm, t, entry)) { t.err = Error.ConcurrentUpdateException; internAbort(xid, true); t.autoAborted = true; throw t.err; } // 删除操作 entry.setXmax(xid); return true; } finally { entry.release(); } }
IM 实现了基于 B+ 树的索引,目前只支持基于索引查找数据,不支持全表扫描
二叉树由一个个 Node 组成,每个 Node 都存储在一条 DataItem 中。结构如下:
// LeafFlag 标记了该节点是否是个叶子节点
// KeyNumber 为该节点中 key 的个数
// SiblingUid 是其兄弟节点存储在 DM 中的 UID
// 后续是穿插的子节点(SonN)和 KeyN。
// 最后的一个 KeyN 始终为 MAX_VALUE,以此方便查找。
[LeafFlag][KeyNumber][SiblingUid]
[Son0][Key0][Son1][Key1]...[SonN][KeyN]
Node 类持有了其 B+ 树结构的引用,DataItem 的引用和 SubArray 的引用,用于方便快速修改数据和释放数据。
public class Node {
BPlusTree tree; // B+ 树结构的引用
DataItem dataItem; // DM 的数据项引用
SubArray raw; // 每个Node结点的内存地址
long uid; // DataItem 存储的 uid
...
}
于是生成一个根节点的数据可以写成如下:
/** * 生成一个非空根节点数据 * 该根节点的初始两个子节点为 left 和 right * * @param key 初始键值 * @return */ static byte[] newRootRaw(long left, long right, long key) { SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE); setRawIsLeaf(raw, false); // 设置[LeafFlag] setRawNoKeys(raw, 2); // 设置[KeyNumber] setRawSibling(raw, 0); // 设置[SiblingUid] setRawKthSon(raw, left, 0); // 插入left节点 setRawKthKey(raw, key, 0); setRawKthSon(raw, right, 1); // 插入right节点 setRawKthKey(raw, Long.MAX_VALUE, 1); return raw.raw; } /** * 生成一个空的根节点数据 * * @return */ static byte[] newNilRootRaw() { SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE); // 设置节点头部信息 setRawIsLeaf(raw, true); setRawNoKeys(raw, 0); setRawSibling(raw, 0); return raw.raw; }
Node 类有两个方法,用于辅助 B+ 树做插入和搜索操作,分别是 searchNext 方法和 leafSearchRange 方法。
class SearchNextRes { long uid; long siblingUid; } /* * serchNext()方法是在索引树上寻找下一个孩子结点,无限循环直到走到叶子结点为止; * 这个结点没有符合要求的就去下一个兄弟结点找 */ public SearchNextRes searchNext(long key) { dataItem.rLock(); try { SearchNextRes res = new SearchNextRes(); int noKeys = getRawNoKeys(raw); for (int i = 0; i < noKeys; i++) { long ik = getRawKthKey(raw, i); if (key < ik) { // 根据排序树的规则,小于的就往左下走就行了 res.uid = getRawKthSon(raw, i); res.siblingUid = 0; return res; } } res.uid = 0; res.siblingUid = getRawSibling(raw); return res; } finally { dataItem.rUnLock(); } }
leafSearchRange 方法在当前节点进行范围查找,范围是 [leftKey, rightKey],这里约定如果 rightKey 大于等于该节点的最大的 key, 则还同时返回兄弟节点的 UID,方便继续搜索下一个节点。
class LeafSearchRangeRes { List<Long> uids; long siblingUid; } /** * 在当前节点进行范围查找,范围是 [leftKey, rightKey], * 这里约定如果 rightKey 大于等于该节点的最大的 key, * 则返回兄弟节点的 UID,方便继续搜索下一个节点。 * * @param leftKey * @param rightKey * @return */ public LeafSearchRangeRes leafSearchRange(long leftKey, long rightKey) { dataItem.rLock(); try { int noKeys = getRawNoKeys(raw); int kth = 0; while (kth < noKeys) { long ik = getRawKthKey(raw, kth); if (ik >= leftKey) { break; } kth++; } List<Long> uids = new ArrayList<>(); while (kth < noKeys) { long ik = getRawKthKey(raw, kth); if (ik <= rightKey) { uids.add(getRawKthSon(raw, kth)); kth++; } else { break; } } long siblingUid = 0; if (kth == noKeys) { siblingUid = getRawSibling(raw); } LeafSearchRangeRes res = new LeafSearchRangeRes(); res.uids = uids; res.siblingUid = siblingUid; return res; } finally { dataItem.rUnLock(); } }
public class BPlusTree { DataManager dm; long bootUid; Lock bootLock; // 由于 B+ 树在插入删除时,会动态调整,根节点不是固定节点, // 于是设置一个 bootDataItem,该 DataItem 中存储了根节点的 UID。 DataItem bootDataItem; /** * @param dm * @return * @throws Exception */ public static long create(DataManager dm) throws Exception { byte[] rawRoot = Node.newNilRootRaw(); long rootUid = dm.insert(TransactionManagerImpl.SUPER_XID, rawRoot); return dm.insert(TransactionManagerImpl.SUPER_XID, Parser.long2Byte(rootUid)); } public static BPlusTree load(long bootUid, DataManager dm) throws Exception { DataItem bootDataItem = dm.read(bootUid); assert bootDataItem != null; BPlusTree t = new BPlusTree(); t.bootUid = bootUid; t.dm = dm; t.bootDataItem = bootDataItem; t.bootLock = new ReentrantLock(); return t; } private long rootUid() { bootLock.lock(); try { SubArray sa = bootDataItem.data(); return Parser.parseLong(Arrays.copyOfRange(sa.raw, sa.start, sa.start + 8)); } finally { bootLock.unlock(); } } private void updateRootUid(long left, long right, long rightKey) throws Exception { bootLock.lock(); try { byte[] rootRaw = Node.newRootRaw(left, right, rightKey); long newRootUid = dm.insert(TransactionManagerImpl.SUPER_XID, rootRaw); bootDataItem.before(); SubArray diRaw = bootDataItem.data(); System.arraycopy(Parser.long2Byte(newRootUid), 0, diRaw.raw, diRaw.start, 8); bootDataItem.after(TransactionManagerImpl.SUPER_XID); } finally { bootLock.unlock(); } } }
IM 对上层模块主要提供两种能力:插入索引和搜索节点。
IM 为什么不提供删除索引的能力?
当上层模块通过 VM 删除某个 Entry,实际的操作是设置其 XMAX。如果不去删除对应索引的话,当后续再次尝试读取该 Entry 时,是可以通过索引寻找到的,但是由于设置了 XMAX,寻找不到合适的版本而返回一个找不到内容的错误。
B+ 树在操作过程中,可能出现两种错误,分别是节点内部错误和节点间关系错误。
当节点内部错误发生时,即当 Ti 在对节点的数据进行更改时,MYDB 发生了崩溃。由于 IM 依赖于 DM,在数据库重启后,Ti 会被撤销(undo),对节点的错误影响会被消除。
如果出现了节点间错误,那么一定是下面这种情况:某次对 u 节点的插入操作创建了新节点 v, 此时 sibling(u)=v(兄弟节点),但是 v 却并没有被插入到父节点中。
[parent]
|
v
[u] -> [v]
// 正确的状态
[ parent ]
/ \
v v
[u] -> [v]
这时,如果要对节点进行插入或者搜索操作,如果失败,就会继续迭代它的兄弟节点,最终还是可以找到 v 节点。唯一的缺点仅仅是,无法直接通过父节点找到 v 了,只能间接地通过 u 获取到 v。
TBM 实现对字段和表的管理。同时,解析 SQL 语句,并根据语句操作表
Parser 实现了对类 SQL 语句的结构化解析,将语句中包含的信息封装为对应语句的类(在 backend.parser.statement 包下)
MYDB 实现的 SQL 语句语法如下:
<begin statement> begin [isolation level (read committed | repeatable read)] -- 开始事务,设置隔离级别为读提交或可重复读 begin isolation level read committed <commit statement> commit -- 提交事务 <abort statement> abort -- 中止事务 <create statement> create table <table name> <field name> <field type> <field name> <field type> ... <field name> <field type> [(index <field name list>)] -- 创建表格 -- 指定字段名和字段类型 -- 可选项:指定索引字段列表 create table students id int32, name string, age int32, (index id name) <drop statement> drop table <table name> -- 删除表格 drop table students <select statement> select (*|<field name list>) from <table name> [<where statement>] -- 查询数据 -- 可选项:指定字段名列表 -- 可选项:指定条件 select * from student where id = 1 select name from student where id > 1 and id < 4 select name, age, id from student where id = 12 <insert statement> insert into <table name> values <value list> -- 插入数据 insert into student values 5 "Zhang Yuanjia" 22 <delete statement> delete from <table name> [<where statement>] -- 删除数据 -- 可选项:指定条件 delete from student where name = "Zhang Yuanjia" <update statement> update <table name> set <field name>=<value> [<where statement>] -- 更新数据 -- 指定字段和值 -- 可选项:指定条件 update student set name = "ZYJ" where id = 5 <where statement> where <field name> (><=) <value> [(and|or) <field name> (><=) <value>] -- 指定条件 -- 可选项:使用and或or组合条件 where age > 10 or age < 3 <field name> <table name> [a-zA-Z][a-zA-Z0-9_]* <field type> int32 | int64 | string <value> .*
parser 包的 Tokenizer 类,对语句进行逐字节解析,根据空白符或者上述词法规则,将语句切割成多个 token。
对外提供了 peek()、pop() 方法方便取出 Token 进行解析。
Parser 类则直接对外提供了 Parse(byte[] statement) 方法,核心就是一个调用 Tokenizer 类分割 Token,并根据词法规则包装成具体的 Statement 类并返回。
解析过程很简单,仅仅是根据第一个 Token 来区分语句类型,并分别处理。
注意,这里的字段与表管理,不是管理各个条目中不同的字段的数值等信息,而是管理表和字段的结构数据,例如表名、表字段信息和字段索引等。
由于 TBM 基于 VM,单个字段信息和表信息都是直接保存在 Entry 中。字段的二进制表示如下:
[FieldName][TypeName][IndexUid]
这里 FieldName 和 TypeName,以及后面的索引UID,存储的都是字节形式的字符串。
这里规定一个字符串的存储方式,以明确其存储边界。
[StringLength][StringData]
TypeName 为字段的类型,限定为 int32、int64 和 string 类型。
如果这个字段有索引,那个 IndexUID 指向了索引二叉树的根,否则该字段为 0。
根据这个结构,通过一个 UID 从 VM 中读取并解析如下:
public static Field loadField(Table tb, long uid) { byte[] raw = null; try { raw = ((TableManagerImpl) tb.tbm).vm.read(TransactionManagerImpl.SUPER_XID, uid); } catch (Exception e) { Panic.panic(e); } assert raw != null; return new Field(uid, tb).parseSelf(raw); // 解析 } /* 解析 */ private Field parseSelf(byte[] raw) { int position = 0; ParseStringRes res = Parser.parseString(raw); fieldName = res.str; position += res.next; res = Parser.parseString(Arrays.copyOfRange(raw, position, raw.length)); fieldType = res.str; position += res.next; this.index = Parser.parseLong(Arrays.copyOfRange(raw, position, position + 8)); if (index != 0) { try { bt = BPlusTree.load(index, ((TableManagerImpl) tb.tbm).dm); } catch (Exception e) { Panic.panic(e); } } return this; }
创建一个字段的方法类似,将相关的信息通过 VM 持久化即可:
private void persistSelf(long xid) throws Exception {
byte[] nameRaw = Parser.string2Byte(fieldName);
byte[] typeRaw = Parser.string2Byte(fieldType);
byte[] indexRaw = Parser.long2Byte(index);
this.uid = ((TableManagerImpl)tb.tbm).vm.insert(xid, Bytes.concat(nameRaw, typeRaw, indexRaw));
}
一个数据库中存在多张表,TBM 使用链表的形式将其组织起来,每一张表都保存一个指向下一张表的 UID。
表的二进制结构如下:
[TableName][NextTable]
[Field1Uid][Field2Uid]...[FieldNUid]
这里由于每个 Entry 中的数据,字节数是确定的,于是无需保存字段的个数。根据 UID 从 Entry 中读取表数据的过程和读取字段的过程类似。
由于 TBM 的表管理,使用的是链表串起的 Table 结构,所以就必须保存一个链表的头节点,即第一个表的 UID,这样在 MYDB 启动时,才能快速找到表信息。
// 读取启动信息 public byte[] load() { byte[] buf = null; try { buf = Files.readAllBytes(file.toPath()); } catch (IOException e) { Panic.panic(e); } return buf; } // 更新启动信息 // 首先将内容写入一个 bt_tmp 文件中,随后将这个文件重命名为 bt 文件。 // 通过操作系统重命名文件的原子性,来保证操作的原子性。 public void update(byte[] data) { File tmp = new File(path + BOOTER_TMP_SUFFIX); try { tmp.createNewFile(); } catch (Exception e) { Panic.panic(e); } if(!tmp.canRead() || !tmp.canWrite()) { Panic.panic(Error.FileCannotRWException); } try(FileOutputStream out = new FileOutputStream(tmp)) { out.write(data); out.flush(); } catch(IOException e) { Panic.panic(e); } try { Files.move(tmp.toPath(), new File(path+BOOTER_SUFFIX).toPath(), StandardCopyOption.REPLACE_EXISTING); } catch(IOException e) { Panic.panic(e); } file = new File(path+BOOTER_SUFFIX); if(!file.canRead() || !file.canWrite()) { Panic.panic(Error.FileCannotRWException); } }
TBM 层对外提供服务的是 TableManager 接口,如下:
/** * TM 接口 * 由于 TableManager 已经是直接被最外层 Server 调用(MYDB 是 C/S 结构), * 这些方法直接返回执行的结果,例如错误信息或者结果信息的字节数组(可读) */ public interface TableManager { BeginRes begin(Begin begin); byte[] commit(long xid) throws Exception; byte[] abort(long xid); byte[] show(long xid); byte[] create(long xid, Create create) throws Exception; byte[] insert(long xid, Insert insert) throws Exception; byte[] read(long xid, Select select) throws Exception; byte[] update(long xid, Update update) throws Exception; byte[] delete(long xid, Delete delete) throws Exception; }
各个方法的具体实现很简单,不再赘述,无非是调用 VM 的相关方法。唯一值得注意的一个小点是,在创建新表时,采用的是头插法,所以每次创建表都需要更新 Booter 文件。
MYDB 被设计为 C/S 结构,类似于 MySQL。支持启动一个服务器,并有多个客户端去连接,通过 socket 通信,执行 SQL 返回结果。
MYDB 使用了一种特殊的二进制格式,用于客户端和服务端通信。
传输的最基本结构,是 Package:
// 将sql语句和错误一起打包
public class Package {
byte[] data; // sql语句
Exception err;
}
每个 Package 在发送前,由 Encoder 编码为字节数组,在对方收到后同样会由 Encoder 解码成 Package 对象。
编码和解码的规则如下:
// flag为0,表示发送的是数据,data即为这份数据本身;
// flag为1,表示发送的是错误,data是Exception.getMessage()的错误提示信息
[Flag][data]
public class Encoder { public byte[] encode(Package pkg) { if(pkg.getErr() != null) { Exception err = pkg.getErr(); String msg = "Intern server error!"; if(err.getMessage() != null) { msg = err.getMessage(); } return Bytes.concat(new byte[]{1}, msg.getBytes()); } else { return Bytes.concat(new byte[]{0}, pkg.getData()); } } public Package decode(byte[] data) throws Exception { if(data.length < 1) { throw Error.InvalidPkgDataException; } if(data[0] == 0) { return new Package(Arrays.copyOfRange(data, 1, data.length), null); } else if(data[0] == 1) { return new Package(null, new RuntimeException(new String(Arrays.copyOfRange(data, 1, data.length)))); } else { throw Error.InvalidPkgDataException; } } }
public class Transporter { private Socket socket; private BufferedReader reader; private BufferedWriter writer; public Transporter(Socket socket) throws IOException { this.socket = socket; this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); } public void send(byte[] data) throws Exception { String raw = hexEncode(data); writer.write(raw); writer.flush(); } public byte[] receive() throws Exception { String line = reader.readLine(); if(line == null) { close(); } return hexDecode(line); } public void close() throws IOException { writer.close(); reader.close(); socket.close(); } private String hexEncode(byte[] buf) { return Hex.encodeHexString(buf, true)+"n"; } private byte[] hexDecode(String buf) throws DecoderException { return Hex.decodeHex(buf); } }
Packager 则是 Encoder 和 Transporter 的结合体,直接对外提供 send 和 receive 方法:
public class Packager { private Transporter transpoter; private Encoder encoder; public Packager(Transporter transpoter, Encoder encoder) { this.transpoter = transpoter; this.encoder = encoder; } public void send(Package pkg) throws Exception { byte[] data = encoder.encode(pkg); transpoter.send(data); } public Package receive() throws Exception { byte[] data = transpoter.receive(); return encoder.decode(data); } public void close() throws Exception { transpoter.close(); } }
Packager packager = null; try { Transporter t = new Transporter(socket); Encoder e = new Encoder(); packager = new Packager(t, e); } catch(IOException e) { e.printStackTrace(); try { socket.close(); } catch (IOException e1) { e1.printStackTrace(); } return; } Executor exe = new Executor(tbm); while(true) { Package pkg = null; try { pkg = packager.receive(); } catch(Exception e) { break; } byte[] sql = pkg.getData(); byte[] result = null; Exception e = null; try { result = exe.execute(sql); } catch (Exception e1) { e = e1; e.printStackTrace(); } pkg = new Package(result, e); try { packager.send(pkg); } catch (Exception e1) { e1.printStackTrace(); break; } }
处理的核心是 Executor 类,Executor 调用 Parser 获取到对应语句的结构化信息对象,并根据对象的类型,调用 TBM 的不同方法进行处理。
Launcher 类,则是服务器的启动入口。这个类解析了命令行参数。很重要的参数就是 -open 或者 -create。Launcher 根据两个参数,来决定是创建数据库文件,还是启动一个已有的数据库。
// 创建数据库文件 private static void createDB(String path) { TransactionManager tm = TransactionManager.create(path); // 新建tm DataManager dm = DataManager.create(path, DEFALUT_MEM, tm);// 新建dm VersionManager vm = new VersionManagerImpl(tm, dm); // 新建vm TableManager.create(path, vm, dm); // 新建tbm tm.close(); dm.close(); } // 开启数据库文件 private static void openDB(String path, long mem) { TransactionManager tm = TransactionManager.open(path); // 打开tm DataManager dm = DataManager.open(path, mem, tm); // 打开dm VersionManager vm = new VersionManagerImpl(tm, dm); // 打开vm TableManager tbm = TableManager.open(path, vm, dm); // 打开tbm new Server(port, tbm).start(); // 打开sql服务器 }
客户端连接服务器的过程,也是背板。客户端有一个简单的 Shell,实际上只是读入用户的输入,并调用 Client.execute()。
// 接收 shell 发过来的sql语句,并打包成pkg进行单次收发操作,得到执行结果并返回
public byte[] execute(byte[] stat) throws Exception {
Package pkg = new Package(stat, null);
Package resPkg = rt.roundTrip(pkg);
if(resPkg.getErr() != null) {
throw resPkg.getErr();
}
return resPkg.getData();
}
RoundTripper 类实际上实现了单次收发动作:
public Package roundTrip(Package pkg) throws Exception {
packager.send(pkg);
return packager.receive();
}
最后是客户端的启动入口,很简单,把 Shell run 起来即可:
public class Launcher {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket socket = new Socket("127.0.0.1", 9999);
Encoder e = new Encoder();
Transporter t = new Transporter(socket);
Packager packager = new Packager(t, e);
Client client = new Client(packager);
Shell shell = new Shell(client);
shell.run();
}
}
本篇博客,参考前辈的项目和博客,是对自己学习和理解的一个记录,加油!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。