当前位置:   article > 正文

手写数据库轮子项目 MYDB 之一 | TransactionManager (TM) 事务_手写简化版数据库--mydb

手写简化版数据库--mydb

目录

一、模块介绍

二、接口介绍

三、接口的实现类介绍

1. 属性

2. 构造方法

3. 检查XID文件的合法性

4. 更新事务的状态

5. 检测事务的状态

6. 开启一个事务并返回其xid

7. 将 XID 文件 Header 加一

8. 关闭 TM 模块


项目 github 地址

原作者博客主页


一、模块介绍

TransactionManager (TM) 模块维护了一个自定义的 XID 格式的文件,并提供接口供其他模块来查询某个事务的状态。

每个事物都有一个 XID,它表明了这个事物的唯一性。事物的 XID 从1开始不断递增。事物的状态包括:

0 - active,正在进行,尚未结束

1 - committed,已提交

2 - aborted,已撤销(回滚)

TM通过维护一个自定义的XID格式文件来保存当前和过去事物的状态。文件中前8个字节记录了文件管理的事务个数,从第9个字节开始,每个字节记录一个事物的状态。

二、接口介绍

TransactionManager 提供一些接口供其他模块调用,具体包括:

  1. public interface TransactionManager {
  2. long begin(); // 开启一个新事务
  3. void commit(long xid); // 提交一个事务
  4. void abort(long xid); // 取消一个事务
  5. boolean isActive(long xid); // 查询一个事务的状态是否是正在进行的状态
  6. boolean isCommitted(long xid); // 查询一个事务的状态是否是已提交
  7. boolean isAborted(long xid); // 查询一个事务的状态是否是已取消
  8. void close(); // 关闭TM
  9. }

另外还提供了两个静态方法,分别用于创建一个新的 XID 文件并返回 TM 对象,和从一个已有的 XID 文件创建 TM 对象。

首先根据文件路径创造 file 对象,因为file类涉及到关于文件或文件目录的创建、删除、重命名、修改时间、文件大小等方法,并未涉及到写入或读取文件内容的操作,所以当需要读取或写入文件内容,必须使用 IO 流来完成。接下来使用 RandomAccessFile 创建流对象关联文件 file,RandomAccessFile 的特点是它可以从指定的位置读写。接下来调用 getChannel 方法打开 fileChannel 实例。FileChannel.write() 方法向 FileChannel 写数据,该方法参数是一个 Buffer。我们就通过 ByteBuffer.wrap() 方法把一个 byte 数组包装成 ByteBuffer。

  1. public static TransactionManagerImpl create(String path) {
  2. File f = new File(path+TransactionManagerImpl.XID_SUFFIX);
  3. try {
  4. if(!f.createNewFile()) {
  5. Panic.panic(Error.FileExistsException);
  6. }
  7. } catch (Exception e) {
  8. Panic.panic(e);
  9. }
  10. if(!f.canRead() || !f.canWrite()) {
  11. Panic.panic(Error.FileCannotRWException);
  12. }
  13. FileChannel fc = null;
  14. RandomAccessFile raf = null;
  15. try {
  16. raf = new RandomAccessFile(f, "rw");
  17. fc = raf.getChannel();
  18. } catch (FileNotFoundException e) {
  19. Panic.panic(e);
  20. }
  21. // 写空XID文件头
  22. ByteBuffer buf = ByteBuffer.wrap(new byte[TransactionManagerImpl.LEN_XID_HEADER_LENGTH]);
  23. try {
  24. fc.position(0);
  25. fc.write(buf);
  26. } catch (IOException e) {
  27. Panic.panic(e);
  28. }
  29. return new TransactionManagerImpl(raf, fc);
  30. }
  31. public static TransactionManagerImpl open(String path) {
  32. File f = new File(path+TransactionManagerImpl.XID_SUFFIX);
  33. if(!f.exists()) {
  34. Panic.panic(Error.FileNotExistsException);
  35. }
  36. if(!f.canRead() || !f.canWrite()) {
  37. Panic.panic(Error.FileCannotRWException);
  38. }
  39. FileChannel fc = null;
  40. RandomAccessFile raf = null;
  41. try {
  42. raf = new RandomAccessFile(f, "rw");
  43. fc = raf.getChannel();
  44. } catch (FileNotFoundException e) {
  45. Panic.panic(e);
  46. }
  47. return new TransactionManagerImpl(raf, fc);
  48. }

三、接口的实现类介绍

1. 属性

  1. // XID文件头长度
  2. static final int LEN_XID_HEADER_LENGTH = 8;
  3. // 每个事务的占用长度
  4. private static final int XID_FIELD_SIZE = 1;
  5. // 事务的三种状态
  6. private static final byte FIELD_TRAN_ACTIVE = 0;
  7. private static final byte FIELD_TRAN_COMMITTED = 1;
  8. private static final byte FIELD_TRAN_ABORTED = 2;
  9. // 超级事务,永远为commited状态
  10. public static final long SUPER_XID = 0;
  11. static final String XID_SUFFIX = ".xid";
  12. private RandomAccessFile file;
  13. private FileChannel fc;
  14. private long xidCounter;
  15. private Lock counterLock;

2. 构造方法

构造方法传入 RandomAccessFile 和 FileChannel 对象,并调用 checkXIDCounter() 方法,检查XID文件的合法性。

  1. TransactionManagerImpl(RandomAccessFile raf, FileChannel fc) {
  2. this.file = raf;
  3. this.fc = fc;
  4. counterLock = new ReentrantLock();
  5. checkXIDCounter();
  6. }

3. 检查XID文件的合法性

每次创建 TM 对象时会检查XID文件是否合法,并为 xidCounter 赋值。首先判断文件长度是否小于记录了事务个数的8个字节,如果小于直接抛异常停机。其次,从开头8个字节中记录的事务个数推出文件应有的长度,并与实际长度比较,来判断是否合法。具体来说建立一个8字节的缓冲区,从 fileChannel 的0位置开始读,转换为 long 类型赋值给 xidCounter。

  1. private void checkXIDCounter() {
  2. long fileLen = 0;
  3. try {
  4. fileLen = file.length();
  5. } catch (IOException e1) {
  6. Panic.panic(Error.BadXIDFileException);
  7. }
  8. if(fileLen < LEN_XID_HEADER_LENGTH) {
  9. Panic.panic(Error.BadXIDFileException);
  10. }
  11. ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH);
  12. try {
  13. fc.position(0);
  14. fc.read(buf);
  15. } catch (IOException e) {
  16. Panic.panic(e);
  17. }
  18. //this.xidCounter = Parser.parseLong(buf.array());
  19. this.xidCounter = buf.getLong();
  20. long end = getXidPosition(this.xidCounter + 1);
  21. if(end != fileLen) {
  22. Panic.panic(Error.BadXIDFileException);
  23. }
  24. }
  25. // 根据事务xid取得其在xid文件中对应的位置
  26. private long getXidPosition(long xid) { return LEN_XID_HEADER_LENGTH + (xid-1)*XID_FIELD_SIZE; }

4. 更新事务的状态

更新事务的状态为status。

  1. private void updateXID(long xid, byte status) {
  2. long offset = getXidPosition(xid);
  3. byte[] tmp = new byte[XID_FIELD_SIZE];
  4. tmp[0] = status;
  5. ByteBuffer buf = ByteBuffer.wrap(tmp);
  6. try {
  7. fc.position(offset);
  8. fc.write(buf);
  9. } catch (IOException e) {
  10. Panic.panic(e);
  11. }
  12. try {
  13. fc.force(false);
  14. } catch (IOException e) {
  15. Panic.panic(e);
  16. }
  17. }

提交与回滚事务,通过修改事务的状态完成。

  1. // 提交XID事务
  2. public void commit(long xid) {
  3. updateXID(xid, FIELD_TRAN_COMMITTED);
  4. }
  5. // 回滚XID事务
  6. public void abort(long xid) {
  7. updateXID(xid, FIELD_TRAN_ABORTED);
  8. }

5. 检测事务的状态

检测事务的状态是否为status。

  1. private boolean checkXID(long xid, byte status) {
  2. long offset = getXidPosition(xid);
  3. ByteBuffer buf = ByteBuffer.wrap(new byte[XID_FIELD_SIZE]);
  4. try {
  5. fc.position(offset);
  6. fc.read(buf);
  7. } catch (IOException e) {
  8. Panic.panic(e);
  9. }
  10. return buf.array()[0] == status;
  11. }

检测事务的三种状态。

  1. public boolean isActive(long xid) {
  2. if(xid == SUPER_XID) return false;
  3. return checkXID(xid, FIELD_TRAN_ACTIVE);
  4. }
  5. public boolean isCommitted(long xid) {
  6. if(xid == SUPER_XID) return true;
  7. return checkXID(xid, FIELD_TRAN_COMMITTED);
  8. }
  9. public boolean isAborted(long xid) {
  10. if(xid == SUPER_XID) return false;
  11. return checkXID(xid, FIELD_TRAN_ABORTED);
  12. }

6. 开启一个事务并返回其xid

首先通过 ReentrantLock 加锁确保原子性。然后通过在 XID 中更新(新建)这个事务的状态完成写入。然后将 XID 文件的 Header 加一。

  1. public long begin() {
  2. counterLock.lock();
  3. try {
  4. long xid = xidCounter + 1;
  5. updateXID(xid, FIELD_TRAN_ACTIVE);
  6. incrXIDCounter();
  7. return xid;
  8. } finally {
  9. counterLock.unlock();
  10. }
  11. }

7. 将 XID 文件 Header 加一

  1. private void incrXIDCounter() {
  2. xidCounter ++;
  3. ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter));
  4. try {
  5. fc.position(0);
  6. fc.write(buf);
  7. } catch (IOException e) {
  8. Panic.panic(e);
  9. }
  10. try {
  11. fc.force(false);
  12. } catch (IOException e) {
  13. Panic.panic(e);
  14. }
  15. }

8. 关闭 TM 模块

  1. public void close() {
  2. try {
  3. fc.close();
  4. file.close();
  5. } catch (IOException e) {
  6. Panic.panic(e);
  7. }
  8. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/869410
推荐阅读
相关标签
  

闽ICP备14008679号