赞
踩
分布式事务的引入
InnoDB存储引擎提供了对XA事务的支持,并通过XA事务来支持分布式事务的实现。分布式事务指的是允许多个独立的事务资源参与到一个全局的事务中。事务资源通常是关系型数据库系统,但也可以是其他类型的资源。全局事务要求在其中的所有参与的事务要么都提交,要么都回滚,这对于事务原有的ACID要求又有了提高。
XA事务跨数据库:XA事务语允许不同数据库之间的分布式事务,如一台服务器是MySQL数据库的,另一台是Oracle数据库的,又可能还有一台服务器是SQL Server数据库的,只要参与在全局事务中的每个节点都支持XA事务。
我们考虑下面一种场景:当你发了工资之后,把你的当月工资¥10000从银行卡转到了支付宝中。如果在银行卡账户扣除¥10000之后,支付宝系统挂掉了,支付宝的账户并没有增加¥10000,这时候就出现了数据不一致的情况。(当然这种情况肯定不会出现了,只是一个假设)
在很多交易系统中(比如我们熟知的电商)都能找到上述情况的影子:
(1)在下单的时候,需要在订单表中插入一条数据,然后把库存减去一。
(2)在搜索的时候如果点击了广告,需要先记录该点击事件,然后通知商家系统扣除广告费。
在这种情况下,一定需要使用分布式事务来保证数据的安全。如果发生的操作不能全部提交或回滚,那么任何一个节点出现问题都会导致严重的结果。
可见与本地事务不同的是,分布式事务需要多一次的PREPARE操作,待收到所有节点的同意信息后,再进行COMMIT或是ROLLBACK操作。
现如今实现基于两阶段提交的分布式事务也没那么困难了,如果使用java,那么可以使用开源软件atomikos(http://www.atomikos.com/)或者TCC开源的框架来快速实现。
不过但凡使用过的上述两阶段提交的同学都可以发现性能实在是太差,根本不适合高并发的系统。为什么?
(1)两阶段提交涉及多次节点间的网络通信,通信时间太长!
(2)事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!
正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。比如使用消息队列来避免分布式事务。
小结:分布式事务举例子(跨数据库的数据一致性):
1、跨数据库的银行转账;
2、电商网站:在下单的时候,需要在订单表中插入一条数据,然后把库存减去一;
3、电商网站:在搜索的时候如果点击了广告,需要先记录该点击事件,然后通知商家系统扣除广告费。
小结:两阶段提交的性能问题
1、两阶段提交涉及多次节点间的网络通信,通信时间太长!
2、事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!
XA定义:XA是由X/Open组织提出的分布式事务的规范(要想连接XA,先理解DTP)。
XA功能:XA规范主要定义了(全局)事务管理器™和(局部)资源管理器(RM)之间的接口,主流的关系型 数据库产品都是实现了XA接口的。
DTP是什么?
DTP全称为 X/Open DTP(X/Open Distributed Transaction Processing Reference Model) ,其中 X/Open 是组织名,所以,后面的DTP Distributed Transaction Processing Reference Model,翻译为 分布式事务流程引用模型,也可以翻译为 全局事务流程引用模型 (分布式事务可以翻译为全局事务)即 X/Open 这个组织定义了一套分布式事务的标准,也就是了定义了规范和API接口,由厂商进行具体的实现。
X/Open DTP 这个组织定义的这套 分布式事务/全局事务 模型, 定义了/包含了 三个组件: AP,TM,RM。
AP(Application Program):也就是应用程序,可以理解为使用DTP的程序
RM(Resource Manager):资源管理器,这里可以理解为一个DBMS(database manager system 数据库管理系统)系统,或者消息服务器管理系统,应用程序通过资源管理器对资源进行控制,资源必须实现XA定义的接口。
TM(Transaction Manager):事务管理器,负责协调和管理事务,提供给AP应用程序编程接口(TX协议) 和 管理资源管理器
三者通信:AP 可以和TM 以及 RM 通信,TM 和 RM 互相之间可以通信
TM和RM通信:为了实现TM和RM通信,DTP模型里面定义了XA接口,TM 和 RM 通过XA接口进行双向通信,例如:TM通知RM提交事务或者回滚事务,RM把提交结果通知给TM。
AP和RM通信:AP和RM之间则通过RM提供的Native API 进行资源控制,这个没有进行约API和规范,各个厂商自己实现自己的资源控制,比如Oracle自己的数据库驱动程序。
AP和TM通信:AP和TM之间通过 TX协议 来实现通信,AP通过 事务管理器TX 来控制事务。
在 DTP模型 定义了以下几个概念:
事务:一个事务是一个完整的工作单元,由多个独立的计算任务组成,这多个任务在逻辑上是原子的。
全局事务:对于一次性操作多个资源管理器的事务,就是全局事务
分支事务:在全局事务中,某一个资源管理器有自己独立的任务,这些任务的集合作为这个资源管理器的分支任务
控制线程:用来表示一个工作线程,主要是关联AP,TM,RM三者的一个线程,也就是事务上下文环境。简单的说,就是需要标识一个全局事务以及分支事务的关系。
1、 XA接口通信:XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁。
2、事务管理器CAP:XA之所以需要引入事务管理器是因为,在分布式系统中,根据CAP原则,两台机器理论上无法达到一致的状态,需要引入一个单点进行协调(就像从延后提交到 两阶段/三阶段 ,引入了协调者)。
3、由全局事务管理器管理和协调的事务,可以跨越多个资源(如数据库或JMS队列)和进程。 全局事务管理器一般使用 XA 二阶段提交协议 与数据库进行交互。
资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径,数据库就是一种资源管理器,资源管理还应该具有管理事务提交或回滚的能力。
事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器通过XA接口与每个资源管理器(resource manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识。
Xid 接口, Xid 接口是 X/Open 事务标识符 XID 结构的 Java 映射。此接口指定三个访问器方法,以检索全局事务格式 ID、全局事务 ID 和分支限定符。Xid 接口供事务管理器和资源管理器使用。此接口对应用程序不可见。
注意:XA 不能自动提交(所以,下面的代码中自动提交设置为false, con.setAutoCommit(false))。
分布式事务模型 使用二阶段提交协议2PC(Two-phaseCommit) 实现多数据源事务处理
准备阶段
事务协调者(事务管理器)给每个参与者(资源管理器)发送Prepare消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的redo和undo日志,但不提交,到达一种“万事俱备,只欠东风”的状态。
两阶段提交的rollback是通过MySQL undo日志实现,持久化是通过redo 日志实现的,自行脑补MySQL日志系统。
提交阶段
如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源)
出现部分资源失败后的处理情况
二阶段提交看起来确实能够提供原子性的操作,但是不幸的事,二阶段提交还是有几个缺点的、
1、同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
2、单点故障。由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
3、数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据部一致性的现象。
4、二阶段无法解决的问题:协调者再发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。
特别注意:XA事务就是使用两阶段提交,不是三阶段提交
XA基于两阶段提交: prepare 和 commit.
第一阶段为 准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。
第二阶段为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。
假设有两个Connection, con1, con2, 大体的过程如下 .
con1 = XAResouce1.getConnection... // 从XAResource得到Connection
con2 = XAResouce2.getConnection...
con1 do some thing.
con2 do some thing.
after they finish.
pre1 = XAResouce1.prepare(); // 从XAConnectin得到pre,预准备阶段
pre2 = XAResouce2.prepare();
if( both pre1 and pre2 are OK){
XAResouce1 and 2 commit // 提交
}else {
XAResouce1 and 2 rollback // 回滚
}
XA事务
XA事务 是基于两阶段提交实现的,两阶段提交涉及一个协调者和多个参与者,在XA中,协调者是事务,mysql是参与者
两阶段提交是一种理论,XA事务 是这种理论的实现。
因为XA 事务是基于两阶段提交协议的,所以需要有一个事务协调者(transaction manager)来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务协调者(transaction manager)收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。MySQL 在这个XA事务中扮演的是参与者的角色,而不是事务协调者(transaction manager)。
# 在mysql实例中开启一个XA事务,指定一个全局唯一标识; mysql> XA START 'any_unique_id'; # XA事务的操作结束; mysql> XA END 'any_unique_id'; # 告知mysql准备提交这个xa事务; mysql> XA PREPARE 'any_unique_id'; # 告知mysql提交这个xa事务; mysql> XA COMMIT 'any_unique_id'; # 告知mysql回滚这个xa事务; mysql> XA ROLLBACK 'any_unique_id'; # 查看本机mysql目前有哪些xa事务处于prepare状态; mysql> XA RECOVER;
xa事务的语法(六个命令 按顺序来)
xa start xa end,两个中包裹一些分布式原子化操作,就像本质事务transaction一样
xa prepare
xa commit | xa rollback
xa recover
在单个节点上运行分布式事务是没有意义的,起码两个节点才有意义。但是要在MySQL数据库的命令行下演示多个节点参与的分布式事务也是行不通的。通常来说,都是通过编程语言来完成分布式事务操作的。当前Java的JTA可以很好地支持MySQL的分布式事务
下面通过JAVA代码用一个简单的例子来演示:
import java.sql.Connection; import java.sql.Statement; import javax.sql.XAConnection; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource; import com.mysql.jdbc.jdbc2.optional.MysqlXid; public class XaDemo { public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) { try { MysqlXADataSource ds = new MysqlXADataSource(); ds.setUrl(connStr); ds.setUser(user); ds.setPassword(pwd); return ds; } catch (Exception e) { e.printStackTrace(); } return null; } public static void main(String[] arg) { String connStr1 = "jdbc:mysql://118.89.107.162:3306/wjq"; String connStr2 = "jdbc:mysql://118.89.107.162:3307/wjq"; try { //从不同数据库获取数据库数据源 MysqlXADataSource ds1 = getDataSource(connStr1, "root", "XXXXXXXX"); MysqlXADataSource ds2 = getDataSource(connStr2, "root", "XXXXXXXX"); //数据库1获取连接 XAConnection xaConnection1 = ds1.getXAConnection(); XAResource xaResource1 = xaConnection1.getXAResource(); Connection connection1 = xaConnection1.getConnection(); Statement statement1 = connection1.createStatement(); //数据库2获取连接 XAConnection xaConnection2 = ds2.getXAConnection(); XAResource xaResource2 = xaConnection2.getXAResource(); Connection connection2 = xaConnection2.getConnection(); Statement statement2 = connection2.createStatement(); //创建事务分支的xid Xid xid1 = new MysqlXid(new byte[] { 0x01 }, new byte[] { 0x02 }, 100); Xid xid2 = new MysqlXid(new byte[] { 0x011 }, new byte[] { 0x012 }, 100); try { //事务分支1关联分支事务sql语句 // 三步走: xa start 具体语句 xa end xaResource1.start(xid1, XAResource.TMNOFLAGS); int update1Result = statement1.executeUpdate("UPDATE accounts SET BALANCE = CAST('9700.00' AS DECIMAL) WHERE CUSTOMER_NO = '001'"); xaResource1.end(xid1, XAResource.TMSUCCESS); //事务分支2关联分支事务sql语句 // 三步走: xa start 具体语句 xa end xaResource2.start(xid2, XAResource.TMNOFLAGS); int update2Result = statement2.executeUpdate("INSERT INTO user_purchase_his(CUSTOMER_NO, SERIAL_NO, AMOUNT, CURRENCY, REMARK) " + " VALUES ('001', '20190303204700000001', 200, 'CNY', '购物消费')"); xaResource2.end(xid2, XAResource.TMSUCCESS); // 两阶段提交协议第一阶段 xa prepare int ret1 = xaResource1.prepare(xid1); int ret2 = xaResource2.prepare(xid2); // 两阶段提交协议第二阶段 xa conmit | xa rollback if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) { xaResource1.commit(xid1, false); xaResource2.commit(xid2, false); System.out.println("reslut1:" + update1Result + ", result2:" + update2Result); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } }
第一,关联分支事务sql语句 三步走: xa start 具体语句 xa end
第二,两阶段提交协议第一阶段 xa prepare
第三,两阶段提交协议第二阶段 xa conmit | xa rollback
好了,五条语句都用到了,xa recover语句放到下面,mysql crash之后处于已经xa prepare但是没有xa commit的状态
如果执行分布式事务的某个mysql crash了,MySQL按照如下逻辑进行恢复:
第一种情况,如果这个xa事务还没有prepare,那么XA分布式事务直接回滚 xa rollback它。
第二种情况,如果这个xa事务prepare了,还没commit,那么XA分布式事务使用xa recover命令将它恢复到prepare的状态,由用户去决定xa commit或xa rollback(举例:当mysql crash后重新启动之后,就进入到第二种情况,已经prepare但是没有commit,此时执行“XA RECOVER;”查看当前处于prepare状态的xa事务,然后commit或rollback它们即可。如果不去处理,那么它们占用的资源就一直不会释放,比如锁)
第三种情况,如果这个xa事务commit了,无能无力了,什么也不用做。
开启了xa事务就无法使用本地事务和锁表操作
root@localhost [3306][wjq]>xa start 'xatest';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>begin;
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
root@localhost [3306][wjq]>lock table xatest read;
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
开启了本地事务就无法使用xa事务
root@localhost [3306][wjq]>begin;
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>xa start 'xatest';
ERROR 1400 (XAE09): XAER_OUTSIDE: Some work is done outside global transaction
所以如果在执行xa事务过程中有语句出错了,你也需要先xa end一下,然后才能xa rollback。
root@localhost [3306][wjq]>xa start 'xatest';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>xa rollback 'xatest';
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
root@localhost [3306][wjq]>xa end 'xatest';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>xa rollback 'xatest';
Query OK, 0 rows affected (0.00 sec)
一直以来,MySQL数据库是支持分布式事务的,但是只能说是有限的支持,具体表现在:
1、已经prepare的事务,在客户端退出或者服务宕机的时候,2PC的事务会被回滚。
2、在服务器故障重启提交后,相应的Binlog被丢失。
上述问题存在于MySQL数据库长达数十年的时间,直到MySQL-5.7.7版本,官方才修复了该问题。
下面将会详细介绍下该问题的具体表现和官方修复方法,这里分别采用官方MySQL-5.6.27版本(未修复)和MySQL-5.7.9版本(已修复)进行验证。
注意:这里的MYSQL的测试版本是5.6版本,而非5.7版本,如果使用5.7版本测试,是无法复现下面的问题的;
先来看下存在的问题,我们先创建一个表如下:
root@localhost [3306][wjq]>create table xatest(id int auto_increment primary key,col1 int);
Query OK, 0 rows affected (0.01 sec)
对于上述表,通过如下操作进行数据插入:
root@localhost [3306][wjq]> XA START 'wjqmysql56';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]> INSERT INTO xatest VALUES(1,1);
Query OK, 1 row affected (0.02 sec)
root@localhost [3306][wjq]> XA END 'wjqmysql56';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]> XA PREPARE 'wjqmysql56';
Query OK, 0 rows affected (0.00 sec)
通过上面的操作,用户创建了一个分布式事务,并且prepare没有返回错误,说明该分布式事务可以被提交。通过命令XA RECOVER查看显示如下结果:
root@localhost [3306][wjq]> XA RECOVER;
+----------+--------------+--------------+------------+
| formatID | gtrid_length | bqual_length | data |
+----------+--------------+--------------+------------+
| 1 | 7 | 0 | wjqmysql56 |
+----------+--------------+--------------+------------+
若这时候用户退出客户端后重连,通过命令xa recover会发现刚才创建的2PC事务不见了。即prepare成功的事务丢失了,不符合2PC协议规范!!!
产生上述问题的主要原因在于:MySQL 5.6版本在客户端退出的时候,自动把已经prepare的事务回滚了,> > 问题1:那么MySQL为什么要这样做? why
解释1:这是MySQL的内部实现,MySQL 5.7以前的版本,对于prepare的事务,MySQL是不会记录binlog的(官方说是减少fsync,起到了优化的作用),而prepare以前的操作信息都保存在连接的IO_CACHE中,只有当分布式事务提交的时候才会把前面的操作写入binlog信息。
问题2:为什么要这样设计,为什么prepare之前的操作不保存到binlog中? why
解释2:因为对于binlog来说,分布式事务与普通的事务没有区别,binlog日志无法区分它记录的日志是本地事务日志还是分布式事务日志。在分布式事务两阶段提交下,如果将第一个阶段prepare的操作写入到binlog中,如果这个时候客户端退出了,以前的binlog信息都会被丢失(因为binlog在内存中),再次重连后允许提交的话,因为Binlog丢失,从而造成主从数据的不一致的操作。所以在设计在,官方在客户端退出的时候直接把已经prepare的事务都回滚了,不记录prepare操作的日志。
官方的做法,貌似干得很漂亮,牺牲了一点标准化的东西,至少保证了主从数据的一致性。但其实不然,若用户已经prepare后在客户端退出之前,MySQL发生了宕机,这个时候又会怎样?
MySQL在某个分布式事务prepare成功后宕机,宕机前操作该事务的连接并没有断开,这个时候已经prepare的事务并不会被回滚,所以在MySQL重新启动后,引擎层通过recover机制能恢复该事务。当然该事务的Binlog已经在宕机过程中被丢失,这个时候,如果去提交,则会造成主从数据的不一致,即提交没有记录Binlog,从上丢失该条数据。所以对于这种情况,官方一般建议直接回滚已经prepare的事务。
以上是MySQL 5.6及以前版本MySQL在分布式事务上的各种问题,那么MySQL 5.7版本官方做了哪些改进?从具体的操作上来分析下MySQL 5.7的改进方法。还是以上面同样的表结构进行同样的操作如下:
root@localhost [3306][wjq]>xa start 'wjqmysql5.7';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>insert into xatest values(1,1);
Query OK, 1 row affected (0.00 sec)
root@localhost [3306][wjq]>xa end 'wjqmysql5.7';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>xa prepare 'wjqmysql5.7';
Query OK, 0 rows affected (0.00 sec)
这个时候,我们通过mysqlbinlog来查看下Master上的Binlog,同时也对比下Slave上的Relay log。
通过上面的操作,明显发现在prepare以后,从XA START到XA PREPARE之间的操作都被记录到了Master的Binlog中,然后通过复制关系传到了Slave上。也就是说MySQL 5.7开始,MySQL对于分布式事务,在prepare的时候就完成了写Binlog的操作,通过新增一种叫XA_prepare_log_event的event类型来实现,这是与以前版本的主要区别(以前版本prepare时不写Binlog)。
当然仅靠这一点是不够的,因为我们知道Slave通过SQL thread来回放Relay log信息,由于prepare的事务能阻塞整个session,而回放的SQL thread只有一个(不考虑并行回放),那么SQL thread会不会因为被分布式事务的prepare阶段所阻塞,从而造成整个SQL thread回放出现问题?
这也正是官方要解决的第二个问题:怎么样能使SQL thread在回放到分布式事务的prepare阶段时,不阻塞后面event的回放?
其实这个实现也很简单,只要在SQL thread回放到prepare的时候,进行类似于客户端断开连接的处理即可(把相关cache与SQL thread的连接句柄脱离)。最后在Slave服务器上,用户通过命令XA RECOVER可以查到如下信息:
至于上面的事务什么时候提交,一般等到Master上进行XA COMMIT ‘wjqmysql5.7’后,slave上也同时会被提交。
root@localhost [3306][wjq]>xa recover;
+----------+--------------+--------------+-------------+
| formatID | gtrid_length | bqual_length | data |
+----------+--------------+--------------+-------------+
| 1 | 11 | 0 | wjqmysql5.7 |
+----------+--------------+--------------+-------------+
1 row in set (0.00 sec)
xa事务提交之后,就可以在从库那边看到数据了
root@localhost [3306][wjq]>xa commit 'wjqmysql5.7';
Query OK, 0 rows affected (0.00 sec)
root@localhost [3306][wjq]>
root@localhost [3306][wjq]>xa recover;
Empty set (0.00 sec)
root@localhost [3308][wjq]>select * from xatest;
+----+------+
| id | col1 |
+----+------+
| 1 | 1 |
+----+------+
1 row in set (0.00 sec)
小结:MySQL 5.7对于分布式事务的支持变得完美了。
import com.alibaba.druid.pool.xa.DruidXADataSource; import com.mysql.jdbc.jdbc2.optional.MysqlXid; import javax.sql.XAConnection; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; class DistributeTransaction { private Properties props; // 属性 private String propertyfile = "jdbc.properties"; // 指定文件 private String sql_1 = "delete from test3 where pk_t=3;"; // 两条sql语句,要放在同一个事务中的 private String sql_2 = "INSERT INTO test(name) VALUES('tyz');"; DistributeTransaction() { // 分布式事务构造函数 DruidXADataSource xaDataSource_1 = null; // 第一个函数确定 从配置文件到XAdatasource DruidXADataSource xaDataSource_2 = null; XAConnection xaConnection_1 = null; // 第二个函数确定 从 XAdatasource 到 xaconnection XAConnection xaConnection_2 = null; XAResource xaResource_1 = null; // 第三个函数确定 从XAConnection 到 XAResource XAResource xaResource_2 = null; Connection connection_1 = null; // 第四个函数确定 从 XAConnection 到 Connection Connection connection_2 = null; Xid xid_1 = null; // 第五个函数确定 createXid Xid xid_2 = null; try { props = new Properties(); // 新建 props.load(getClass().getResourceAsStream(propertyfile)); // 加载文件 props对象和propertyfile联系在一起了 } catch (IOException io) { System.err.println("Error while accessing the properties file (" + propertyfile + "). Abort."); System.exit(1); } DruidXADataSource[] xaDataSources = initXADataSource(); // 调用第一个函数,返回datasource数组 xaDataSource_1 = xaDataSources[0]; // 得到的数组的两个元素分别赋值给成员变量 xaDataSource_2 = xaDataSources[1]; XAConnection[] xaConnections = initXAConnection(xaDataSource_1, xaDataSource_2); // 调用第二个函数,输入:两个DruidXADataSource,输出:初始化XA事务连接,组合成为两个数组 xaConnection_1 = xaConnections[0]; // 得到的数组的两个元素分别赋值给成员变量 xaConnection_2 = xaConnections[1]; xaResource_1 = initXAResource(xaConnection_1); // 调用第三个函数,输入:成员变量XA连接,输出:赋值给成员变量XA资源 xaResource_2 = initXAResource(xaConnection_2); connection_1 = getDatabaseConnection(xaConnection_1); // 调用第四个函数,输入:成员变量XA连接,输出:赋值给成员变量连接 connection_2 = getDatabaseConnection(xaConnection_2); // create a separate branch for a common transaction Xid[] xids = createXID(); // 调用第五个函数,创建两个xid,得到的数组元素,赋值给成员变量 xid_1 xid_2 xid_1 = xids[0]; xid_2 = xids[1]; try { execBranch(connection_1, xaResource_1, xid_1, sql_1); // 第一个函数 执行 XA事务是基于 两段式/三段式提交 处理的, 输入:Connection XAResource Xid Sql语句 execBranch(connection_2, xaResource_2, xid_2, sql_2); if (prepareCommit(xaResource_1, xid_1) == XAResource.XA_OK && prepareCommit(xaResource_2, xid_2) == XAResource.XA_OK) { // 第二个函数,预提交 commitBranch(xaResource_1, xid_1); // 第三个函数,提交 commitBranch(xaResource_2, xid_2); } else { throw new RuntimeException(); } } catch (Exception e) { rollbackBranch(xaResource_1, xid_1); // 第四个函数,发生错误回滚 rollbackBranch(xaResource_2, xid_2); } } DruidXADataSource[] initXADataSource() { // 第一个函数,输入:配置文件内容,输出:初始化XA的数据源,组合成为数组 System.out.print("Create a XADataSource_1 data source: "); // 新建一个XA数据源 DruidXADataSource xaDataSource_1 = new DruidXADataSource(); // 新建一个数据源对象 xaDataSource_1.setDbType(props.getProperty("db1.dbtype")); // 读取文件内存,设置内存对象 xaDataSource_1.setUrl(props.getProperty("db1.url")); // 读取文件内存,设置内存对象 xaDataSource_1.setUsername(props.getProperty("db1.username")); // 读取文件内存,设置内存对象 xaDataSource_1.setPassword(props.getProperty("db1.password")); // 读取文件内存,设置内存对象 System.out.println("Okay."); // 打印Okay. System.out.print("Create a XADataSource_2 data source: "); // 重复db1,做一次db2 DruidXADataSource xaDataSource_2 = new DruidXADataSource(); xaDataSource_2.setDbType(props.getProperty("db2.dbtype")); xaDataSource_2.setUrl(props.getProperty("db2.url")); xaDataSource_2.setUsername(props.getProperty("db2.username")); xaDataSource_2.setPassword(props.getProperty("db2.password")); System.out.println("Okay."); return new DruidXADataSource[]{xaDataSource_1, xaDataSource_2}; // 返回db1 db2包装好的数据源数组 } // 第二个函数,输入:两个DruidXADataSource,输出:初始化XA事务连接,组合成为两个数组 XAConnection[] initXAConnection(DruidXADataSource xaDataSource_1, DruidXADataSource xaDataSource_2) { XAConnection xaconn_1 = null; // 声明两个xa连接为null XAConnection xaconn_2 = null; try { System.out.print("Set up DB_1 XA connection: "); xaconn_1 = xaDataSource_1.getXAConnection(); // 拿到实参DruidXADataSource中的conn System.out.println("Okay."); System.out.print("Set up DB_2 XA connection: "); xaconn_2 = xaDataSource_2.getXAConnection(); System.out.println("Okay."); } catch (SQLException e) { sqlerr(e); } return new XAConnection[]{xaconn_1, xaconn_2}; // 放到XAConnection数组中,返回 } XAResource initXAResource(XAConnection xacon) { //从 XAConnection到 XAResource XAResource xares = null; try { System.out.print("Setting up a XA resource: "); xares = xacon.getXAResource(); //从 XAConnection到 XAResource System.out.println("Okay."); } catch (SQLException e) { sqlerr(e); } return xares; } Connection getDatabaseConnection(XAConnection xacon) { // 从 XAConnection 到 Connection Connection con = null; try { System.out.print("Establish database connection: "); con = xacon.getConnection(); // 从 XAConnection 到 Connection con.setAutoCommit(false); // Connection 设置自动提交为false,需要手动提交 System.out.println("Okay."); } catch (SQLException e) { sqlerr(e); } return con; } Xid[] createXID() { Xid xid_1 = null; byte[] gid_1 = new byte[1]; // 定义两个元素数为1的数组 byte数组 byte[] bid_1 = new byte[1]; gid_1[0] = (Byte.decode(props.getProperty("xid.global"))).byteValue(); // 从配置文件设置gid_1[0] bid_1[0] = (Byte.decode(props.getProperty("xid.branch.db_1"))).byteValue(); // 从配置文件设置bid_1[0] System.out.print("Creating an XID (" + Byte.toString(gid_1[0]) + ", " + Byte.toString(bid_1[0]) + ") for DB_1: "); xid_1 = new MysqlXid(gid_1, bid_1, 0); // 通过gid_1 和 bid_1 设置 xid_1 System.out.println("Okay."); // 完成了 xid_1 Xid xid_2 = null; byte[] gid_2 = new byte[1]; byte[] bid_2 = new byte[1]; gid_2[0] = (Byte.decode(props.getProperty("xid.global"))).byteValue(); bid_2[0] = (Byte.decode(props.getProperty("xid.branch.db_2"))).byteValue(); System.out.print("Creating an XID (" + Byte.toString(gid_2[0]) + ", " + Byte.toString(bid_2[0]) + ") for DB_2: "); xid_2 = new MysqlXid(gid_2, bid_2, 0); System.out.println("Okay."); return new Xid[]{xid_1, xid_2}; } void execBranch(Connection con, XAResource xares, Xid xid, String sql) { // 第一个函数, try { xares.start(xid, XAResource.TMNOFLAGS); // XAResource作为调用对象,Xid作为XATransaction的唯一标识,end操作 Statement stmt = con.createStatement(); // connection唯一作用,新建一个statement stmt.executeUpdate(sql); // sql 唯一作用,确定要执行的sql语句 xares.end(xid, XAResource.TMSUCCESS); // 一定要XAResource调用end,后面才能做 提交 操作 // XAResource作为调用对象,Xid作为XATransaction的唯一标识,end操作 // 小结,这个XAResource就是 XA事务,和以前的 单体事务Transaction,用法相同,就是用来包一层sql执行,后端@Transaction注解,数据库就Begin Transaction;和End Transaction; } catch (XAException e) { // 这是一个XAResource,调用的回滚操作,如果发生异常,一定是XAException异常 System.err.println("XA exception caught:"); System.err.println("Cause : " + e.getCause()); System.err.println("Message: " + e.getMessage()); e.printStackTrace(); throw new RuntimeException(e); } catch (SQLException e) { sqlerr(e); throw new RuntimeException(e); } } int prepareCommit(XAResource xares, Xid xid) { // 第二个函数, int rc = 0; System.out.print("Prepare XA branch (" + Byte.toString((xid.getGlobalTransactionId())[0]) + ", " + Byte.toString((xid.getBranchQualifier())[0]) + "): "); try { xares.prepare(xid); // XAResource作为调用对象,Xid作为XATransaction的唯一标识,预提交操作 } catch (XAException e) { // 这是一个XAResource,调用的回滚操作,如果发生异常,一定是XAException异常 xaerr(e); throw new RuntimeException(e); } System.out.println("Okay."); return rc; } void commitBranch(XAResource xares, Xid xid) { // 第三个函数, System.out.print("Commit XA branch (" + Byte.toString((xid.getGlobalTransactionId())[0]) + ", " + Byte.toString((xid.getBranchQualifier())[0]) + "): "); try { // second parameter is 'false' since we have a two phase commit 第二个参数设置为false,因为这里是两段式提交 xares.commit(xid, false); // XAResource作为调用对象,Xid作为XATransaction的唯一标识,提交操作 } catch (XAException e) { // 这是一个XAResource,调用的回滚操作,如果发生异常,一定是XAException异常 xaerr(e); throw new RuntimeException(e); } System.out.println("Okay."); } void rollbackBranch(XAResource xares, Xid xid) { // 第四个函数, System.out.print("Rollback XA branch (" + Byte.toString((xid.getGlobalTransactionId())[0]) + ", " + Byte.toString((xid.getBranchQualifier())[0]) + "): "); try { xares.rollback(xid); // XAResource作为调用对象,Xid作为XATransaction的唯一标识,回滚操作 } catch (XAException e) { // 这是一个XAResource,调用的回滚操作,如果发生异常,一定是XAException异常 xaerr(e); throw new RuntimeException(e); } System.out.println("Okay."); } void sqlerr(SQLException exception) { System.err.println("FAILED."); while (exception != null) { // 唯一作用,SQLException类型的异常不为null,把异常打印出来 System.err.println("==> SQL Exception caught"); System.err.println("--> SQLCODE : " + exception.getErrorCode()); System.err.println("--> SQLSTATE: " + exception.getSQLState()); System.err.println("--> Message : " + exception.getMessage()); exception = exception.getNextException(); } } void xaerr(XAException exception) { // 唯一作用,XAException类型的异常不为null,把异常打印出来 System.err.println("FAILED."); System.err.println("==> XA Exception caught"); System.err.println("--> Cause : " + exception.getCause()); System.err.println("--> Message: " + exception.getMessage()); exception.printStackTrace(); } public static void main (String args[]) { // main函数,执行入口 new DistributeTransaction(); // 新建分布式事务 } }
XA事务的两个性能局限性
1、效率低下,准备阶段的成本持久,全局事务状态的成本持久,性能与本地事务相差10倍左右;
2、提交前,出现故障难以恢复和隔离问题。
JTA(Java Transaction API)是符合X/Open DTP的一个编程模型,事务管理和资源管理器支架也是用了XA协议。
a、高层应用事务界定接口,供事务客户界定事务边界的
b、X/Open XA协议(资源之间的一种标准化的接口)的标准Java映射,它可以使事务性的资源管理器参与由外部事务管理器控制的事务中
c、高层事务管理器接口,允许应用程序服务器为其管理的应用程序界定事务的边界
位于javax.transaction包中
a、UserTransaction接口:让应用程序得以控制事务的开始、挂起、提交、回滚等。由Java客户端程序或EJB调用。
b、TransactionManager 接口:用于应用服务器管理事务状态
c、Transaction接口:用于执行相关事务操作
d、XAResource接口:用于在分布式事务环境下,协调事务管理器和资源管理器的工作
e、Xid接口:为事务标识符的Java映射
注:前3个接口位于Java EE版的类库 javaee.jar 中,Java SE中没有提供!UserTransaction是编程常用的接口
注意的是JTA只提供了接口,没有具体的实现。
JTS(Java Transaction Service)是服务OTS的JTA的实现。简单的说JTS实现了JTA接口,并且符合OTS的规范。
JTA的事务周期可横跨多个JDBC Connection生命周期,对众多Connection进行调度,实现其事务性要求。
JTA可以处理任何提供符合XA接口的资源。包括:JDBC连接,数据库,JMS,商业对象等等。
JOTM(Java Open Transaction Manager)是ObjectWeb的一个开源JTA实现,它本身也是开源应用程序服务器JOnAS(Java Open Application Server)的一部分,为其提供JTA分布式事务的功能。官网:http://jotm.objectweb.org
Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器提供 以下是包括在这个开源版本中的一些功能:
1 全面崩溃 / 重启恢复
2 兼容标准的SUN公司JTA API
3 嵌套事务
4 为XA和非XA提供内置的JDBC适配器
官网:https://www.atomikos.com/
使用jtom配置jta的事务实现,配置spring支持jtom的环境 (使用xapool管理数据源)
金手指:为什么要使用JTA?
标准答案:多数据源下必须使用JTA,
因为spring的
< tx:annotation-driven transaction-manager=“springTransactionManager”/> 只能指定一个事务管理器 所以多数据源下 肯定不能使用同一个事务管理器,必须使用jta事务
mysql 新建表 mymoney
插入记录,用户zs 余额 1000
CREATE DATABASE IF NOT EXISTS dtp;
USE dtp;
CREATE TABLE mymoney(
id INT PRIMARY KEY AUTO_INCREMENT,
NAME VARCHAR(20),
lostedmoney INT
);
INSERT INTO mymoney(id,NAME,lostedmoney) VALUES(1,'zs',1000);
oracle 新建表mymoney
插入记录:用户ls 余额300
oracle下 scott账号下 创建相同表
CREATE TABLE mymoney(
id number PRIMARY KEY ,
NAME VARCHAR2(20),
lostedmoney number
);
INSERT INTO mymoney(id,NAME,lostedmoney) VALUES(2,'ls',300);
模拟zs给ls转账 自然涉及到多数据源分布式事务
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.et</groupId> <artifactId>spring</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.13.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>4.3.13.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>4.3.13.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>4.3.13.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/jotm/jotm --> <dependency> <groupId>jotm</groupId> <artifactId>jotm</artifactId> <version>2.0.10</version> </dependency> <!-- https://mvnrepository.com/artifact/org.ow2.jotm/jotm-core --> <dependency> <groupId>org.ow2.jotm</groupId> <artifactId>jotm-core</artifactId> <version>2.2.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.ow2.jotm/jotm-datasource --> <dependency> <groupId>org.ow2.jotm</groupId> <artifactId>jotm-datasource</artifactId> <version>2.2.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.ow2.jotm/jotm-standalone --> <dependency> <groupId>org.ow2.jotm</groupId> <artifactId>jotm-standalone</artifactId> <version>2.2.3</version> </dependency> <!-- https://mvnrepository.com/artifact/javax.resource/javax.resource-api --> <dependency> <groupId>javax.resource</groupId> <artifactId>javax.resource-api</artifactId> <version>1.7</version> </dependency> <!-- https://mvnrepository.com/artifact/com.experlog/xapool --> <dependency> <groupId>com.experlog</groupId> <artifactId>xapool</artifactId> <version>1.5.0</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>com.jslsolucoes</groupId> <artifactId>ojdbc6</artifactId> <version>11.2.0.1.0</version> </dependency> </dependencies> </project>
spring配置文件中, 配置 两个数据源 , 并且将两个支持xa的数据源 绑定到一个资源管理器 UserTranscation中
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd "> <!-- 第一,基础:配置扫描路径和配置一个bean --> <context:component-scan base-package="cn"></context:component-scan> <bean id="txCurrent" class="org.objectweb.jotm.Current"></bean> <!-- 3、这就是一个--> <!-- 使用xapool配置支持xa的资源(mysql数据库)连接池 数据库连接四要素--> <bean id="xaMysqlDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown"> <property name="dataSource"> <!-- 定义支持mysql的xa数据源 --> <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown"> <property name="driverName" value="com.mysql.jdbc.Driver"></property> <property name="url" value="jdbc:mysql://localhost:3306/dtp"></property> <!-- 注意用户名密码不能配在这里 否则无法读取 --> <!-- 注册到txCurrent --> <property name="transactionManager" ref="txCurrent"></property> </bean> </property> <!-- 用户名密码配置在这里--> <property name="user" value="root"></property> <property name="password" value="123456"></property> </bean> <!-- 配置jdbc,mysql连接 配置一个mysql jdbc,它的dataSource是上面配置的bean--> <bean id="mysqlJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="xaMysqlDataSource" /> </bean> <!-- 使用xapool配置支持xa的资源(oracle数据库)连接池 数据库连接四要素--> <bean id="xaOracleDataSource" class="org.enhydra.jdbc.pool.StandardXAPoolDataSource" destroy-method="shutdown"> <property name="dataSource"> <!-- 定义支持mysql的xa数据源 --> <bean class="org.enhydra.jdbc.standard.StandardXADataSource" destroy-method="shutdown"> <property name="driverName" value="oracle.jdbc.OracleDriver"></property> <property name="url" value="jdbc:oracle:thin:@localhost:1521:orcl"></property> <!-- 注册到txCurrent --> <property name="transactionManager" ref="txCurrent"></property> </bean> </property> <!-- 用户名密码配置在这里--> <property name="user" value="scott"></property> <property name="password" value="tiger"></property> </bean> <!-- 配置jdbc,oracle连接 配置一个oracle jdbc,它的dataSource是上面配置的bean--> <bean id="oracleJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="xaOracleDataSource" /> </bean> <!-- 2、配置jta事务 userTransaction必须将所有的数据源(RM)注册到txCurrent中--> <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="userTransaction" ref="txCurrent" /> </bean> <!-- 1、注解驱动,事务管理器--> <tx:annotation-driven transaction-manager="springTransactionManager"/> </beans>
小结:
1、<tx:annotation-driven transaction-manager=“springTransactionManager”/>
2、 bean id=“springTransactionManager” class=“org.springframework.transaction.jta.JtaTransactionManager”>
property name=“userTransaction” ref=“txCurrent” />
/bean>
3、 bean id=“txCurrent” class=“org.objectweb.jotm.Current”>
4、 bean id=“xaMysqlDataSource” class=“org.enhydra.jdbc.pool.StandardXAPoolDataSource” destroy-method=“shutdown”>
1、注解驱动,事务管理器
2、jta作为这个事务管理器
3、事务管理器中的事务是 jotm
4、配置spring支持jtom的环境 (使用xapool管理数据源)
小结:只要在这里写好了配置文件,就可以了,下面代码中就可以完成多个数据源的分布式事务,数据一致性了。
package cn.et.dao; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; @Transactional // 加上事务注解 @Repository // 注解 + 扫描路径,放到spring ioc容器中 public class UserZsDao { @Autowired private UserLsDao userLsDao; @Resource(name="mysqlJdbcTemplate") JdbcTemplate mysqlJdbcTemplate; public void zsMinus(int money){ //zs的mysql扣款 String sql="update MYMONEY set lostedmoney=lostedmoney-"+money+" where id=1"; mysqlJdbcTemplate.execute(sql); //调用ls的oracle加钱 userLsDao.lsAdd(money); //模拟出现异常 查看数据两个操作数据是否都回滚 String a=null; a.toCharArray(); } }
处理ls的数据处理类
package cn.et.dao; import javax.annotation.Resource; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; @Transactional // 加上事务注解 @Repository // 注解 + 扫描路径,放到spring ioc容器中 public class UserLsDao { @Resource(name="oracleJdbcTemplate") JdbcTemplate oracleJdbcTemplate; public void lsAdd(int money){ String sql="update MYMONEY set lostedmoney=lostedmoney+"+money+" where id=2"; oracleJdbcTemplate.execute(sql); } }
package cn.et.dao; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) { // 加载配置文件 ClassPathXmlApplicationContext cpxac=new ClassPathXmlApplicationContext("applicationContext.xml"); // 配置文件中配置的bean会到spring ioc容器中,这里从spring ioc容器中拿到bean UserZsDao userZsDao=(UserZsDao)cpxac.getBean("userZsDao"); userZsDao.zsMinus(100); // 调用方法,出现一样, a=null; a.toCharArray(); cpxac.close(); // 关闭 } }
运行后 发现出现异常正常回滚
小结:分布式事务举例子(跨数据库的数据一致性):
1、跨数据库的银行转账;
2、电商网站:在下单的时候,需要在订单表中插入一条数据,然后把库存减去一;
3、电商网站:在搜索的时候如果点击了广告,需要先记录该点击事件,然后通知商家系统扣除广告费。
小结:两阶段提交的性能问题
1、两阶段提交涉及多次节点间的网络通信,通信时间太长!
2、事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!
MySQL中基于XA实现的分布式事务,完成了。
天天打码,天天进步!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。