赞
踩
目录
TransactionManager (TM) 模块维护了一个自定义的 XID 格式的文件,并提供接口供其他模块来查询某个事务的状态。
每个事物都有一个 XID,它表明了这个事物的唯一性。事物的 XID 从1开始不断递增。事物的状态包括:
0 - active,正在进行,尚未结束
1 - committed,已提交
2 - aborted,已撤销(回滚)
TM通过维护一个自定义的XID格式文件来保存当前和过去事物的状态。文件中前8个字节记录了文件管理的事务个数,从第9个字节开始,每个字节记录一个事物的状态。
TransactionManager 提供一些接口供其他模块调用,具体包括:
- public interface TransactionManager {
- long begin(); // 开启一个新事务
- void commit(long xid); // 提交一个事务
- void abort(long xid); // 取消一个事务
- boolean isActive(long xid); // 查询一个事务的状态是否是正在进行的状态
- boolean isCommitted(long xid); // 查询一个事务的状态是否是已提交
- boolean isAborted(long xid); // 查询一个事务的状态是否是已取消
- void close(); // 关闭TM
- }
另外还提供了两个静态方法,分别用于创建一个新的 XID 文件并返回 TM 对象,和从一个已有的 XID 文件创建 TM 对象。
首先根据文件路径创造 file 对象,因为file类涉及到关于文件或文件目录的创建、删除、重命名、修改时间、文件大小等方法,并未涉及到写入或读取文件内容的操作,所以当需要读取或写入文件内容,必须使用 IO 流来完成。接下来使用 RandomAccessFile 创建流对象关联文件 file,RandomAccessFile 的特点是它可以从指定的位置读写。接下来调用 getChannel 方法打开 fileChannel 实例。FileChannel.write() 方法向 FileChannel 写数据,该方法参数是一个 Buffer。我们就通过 ByteBuffer.wrap() 方法把一个 byte 数组包装成 ByteBuffer。
- public static TransactionManagerImpl create(String path) {
- File f = new File(path+TransactionManagerImpl.XID_SUFFIX);
- try {
- if(!f.createNewFile()) {
- Panic.panic(Error.FileExistsException);
- }
- } catch (Exception e) {
- Panic.panic(e);
- }
- if(!f.canRead() || !f.canWrite()) {
- Panic.panic(Error.FileCannotRWException);
- }
-
- FileChannel fc = null;
- RandomAccessFile raf = null;
- try {
- raf = new RandomAccessFile(f, "rw");
- fc = raf.getChannel();
- } catch (FileNotFoundException e) {
- Panic.panic(e);
- }
-
- // 写空XID文件头
- ByteBuffer buf = ByteBuffer.wrap(new byte[TransactionManagerImpl.LEN_XID_HEADER_LENGTH]);
- try {
- fc.position(0);
- fc.write(buf);
- } catch (IOException e) {
- Panic.panic(e);
- }
-
- return new TransactionManagerImpl(raf, fc);
- }
-
- public static TransactionManagerImpl open(String path) {
- File f = new File(path+TransactionManagerImpl.XID_SUFFIX);
- if(!f.exists()) {
- Panic.panic(Error.FileNotExistsException);
- }
- if(!f.canRead() || !f.canWrite()) {
- Panic.panic(Error.FileCannotRWException);
- }
-
- FileChannel fc = null;
- RandomAccessFile raf = null;
- try {
- raf = new RandomAccessFile(f, "rw");
- fc = raf.getChannel();
- } catch (FileNotFoundException e) {
- Panic.panic(e);
- }
-
- return new TransactionManagerImpl(raf, fc);
- }
- // XID文件头长度
- static final int LEN_XID_HEADER_LENGTH = 8;
- // 每个事务的占用长度
- private static final int XID_FIELD_SIZE = 1;
-
- // 事务的三种状态
- private static final byte FIELD_TRAN_ACTIVE = 0;
- private static final byte FIELD_TRAN_COMMITTED = 1;
- private static final byte FIELD_TRAN_ABORTED = 2;
-
- // 超级事务,永远为commited状态
- public static final long SUPER_XID = 0;
-
- static final String XID_SUFFIX = ".xid";
-
- private RandomAccessFile file;
- private FileChannel fc;
- private long xidCounter;
- private Lock counterLock;
构造方法传入 RandomAccessFile 和 FileChannel 对象,并调用 checkXIDCounter() 方法,检查XID文件的合法性。
- TransactionManagerImpl(RandomAccessFile raf, FileChannel fc) {
- this.file = raf;
- this.fc = fc;
- counterLock = new ReentrantLock();
- checkXIDCounter();
- }
每次创建 TM 对象时会检查XID文件是否合法,并为 xidCounter 赋值。首先判断文件长度是否小于记录了事务个数的8个字节,如果小于直接抛异常停机。其次,从开头8个字节中记录的事务个数推出文件应有的长度,并与实际长度比较,来判断是否合法。具体来说建立一个8字节的缓冲区,从 fileChannel 的0位置开始读,转换为 long 类型赋值给 xidCounter。
- private void checkXIDCounter() {
- long fileLen = 0;
- try {
- fileLen = file.length();
- } catch (IOException e1) {
- Panic.panic(Error.BadXIDFileException);
- }
- if(fileLen < LEN_XID_HEADER_LENGTH) {
- Panic.panic(Error.BadXIDFileException);
- }
-
- ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH);
- try {
- fc.position(0);
- fc.read(buf);
- } catch (IOException e) {
- Panic.panic(e);
- }
- //this.xidCounter = Parser.parseLong(buf.array());
- this.xidCounter = buf.getLong();
- long end = getXidPosition(this.xidCounter + 1);
- if(end != fileLen) {
- Panic.panic(Error.BadXIDFileException);
- }
- }
-
- // 根据事务xid取得其在xid文件中对应的位置
- private long getXidPosition(long xid) { return LEN_XID_HEADER_LENGTH + (xid-1)*XID_FIELD_SIZE; }
更新事务的状态为status。
- private void updateXID(long xid, byte status) {
- long offset = getXidPosition(xid);
- byte[] tmp = new byte[XID_FIELD_SIZE];
- tmp[0] = status;
- ByteBuffer buf = ByteBuffer.wrap(tmp);
- try {
- fc.position(offset);
- fc.write(buf);
- } catch (IOException e) {
- Panic.panic(e);
- }
- try {
- fc.force(false);
- } catch (IOException e) {
- Panic.panic(e);
- }
- }
提交与回滚事务,通过修改事务的状态完成。
- // 提交XID事务
- public void commit(long xid) {
- updateXID(xid, FIELD_TRAN_COMMITTED);
- }
-
- // 回滚XID事务
- public void abort(long xid) {
- updateXID(xid, FIELD_TRAN_ABORTED);
- }
检测事务的状态是否为status。
- private boolean checkXID(long xid, byte status) {
- long offset = getXidPosition(xid);
- ByteBuffer buf = ByteBuffer.wrap(new byte[XID_FIELD_SIZE]);
- try {
- fc.position(offset);
- fc.read(buf);
- } catch (IOException e) {
- Panic.panic(e);
- }
- return buf.array()[0] == status;
- }
检测事务的三种状态。
- public boolean isActive(long xid) {
- if(xid == SUPER_XID) return false;
- return checkXID(xid, FIELD_TRAN_ACTIVE);
- }
-
- public boolean isCommitted(long xid) {
- if(xid == SUPER_XID) return true;
- return checkXID(xid, FIELD_TRAN_COMMITTED);
- }
-
- public boolean isAborted(long xid) {
- if(xid == SUPER_XID) return false;
- return checkXID(xid, FIELD_TRAN_ABORTED);
- }
首先通过 ReentrantLock 加锁确保原子性。然后通过在 XID 中更新(新建)这个事务的状态完成写入。然后将 XID 文件的 Header 加一。
- public long begin() {
- counterLock.lock();
- try {
- long xid = xidCounter + 1;
- updateXID(xid, FIELD_TRAN_ACTIVE);
- incrXIDCounter();
- return xid;
- } finally {
- counterLock.unlock();
- }
- }
- private void incrXIDCounter() {
- xidCounter ++;
- ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter));
- try {
- fc.position(0);
- fc.write(buf);
- } catch (IOException e) {
- Panic.panic(e);
- }
- try {
- fc.force(false);
- } catch (IOException e) {
- Panic.panic(e);
- }
- }
- public void close() {
- try {
- fc.close();
- file.close();
- } catch (IOException e) {
- Panic.panic(e);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。