当前位置:   article > 正文

Postgresql 分布式事务JTA实现Atomikos与Spring集成实践_整合atomikos postgresql

整合atomikos postgresql
理解分布式事务JTA原理参见: http://www.ibm.com/developerworks/cn/java/j-lo-jta/

JTA实现产品介绍: http://blog.chinaunix.net/uid-122937-id-3793220.html

Atomikos官网无法访问,不过Maven中央库中具atomikos包。Atomikos集成Spring,Hibernate,Mybatis网上文章比较多,本文是通过JavaSE的方式借用Spring配置来测试Atomikos对JTA的实现。

下面做一件事,就是两(+)个数据库,在一个事务里对其分别对数据库操作验证操作的原子性,即要么两个数据库的操作都成功,要么都失败。

1.准备工作

1.1 Maven pom.xml中添加依赖包

atomikos:(目前最新版)

<dependency>
			<groupId>com.atomikos</groupId>
			<artifactId>transactions-jdbc</artifactId>
			<version>3.9.3</version>
		</dependency>

jar依赖图:

Postgresql数据库驱动:

<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
			<version>9.2-1004-jdbc4</version>
		</dependency>

Spring,Junit依赖这里省略。

1.2 创建数据库以及表

数据库分别是:javaee,tomdb

2.在项目中添加配置文件

spring-jta.xml 和transaction.properties文件,spring-jta.xml在src/main/resources/integration下,transaction.properties在src/main/resources/下。

2.1在spring-jta.xml中配置两个XADataSource:

<!-- 使用分布式事务时设置Postgresql的max_prepared_transactions为大于0的值,该值默认是0 -->
	<!-- 数据库A -->
	<bean id="a" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close">
		<property name="uniqueResourceName" value="pg/a" />
		<property name="xaDataSourceClassName" value="org.postgresql.xa.PGXADataSource" />
		<property name="xaProperties">
			<props>
				<prop key="user">postgres</prop>
				<prop key="password">postgres</prop>
				<prop key="serverName">localhost</prop>
				<prop key="portNumber">5432</prop>
				<prop key="databaseName">tomdb</prop>
			</props>
		</property>
		<property name="poolSize" value="10" />
		<property name="reapTimeout" value="20000" />
	</bean>
	<bean id="b" class="com.atomikos.jdbc.AtomikosDataSourceBean"
		init-method="init" destroy-method="close">
		<property name="uniqueResourceName" value="pg/b" />
		<property name="xaDataSourceClassName" value="org.postgresql.xa.PGXADataSource" />
		<property name="xaProperties">
			<props>
				<prop key="user">postgres</prop>
				<prop key="password">postgres</prop>
				<prop key="serverName">localhost</prop>
				<prop key="portNumber">5432</prop>
				<prop key="databaseName">javaee</prop>
			</props>
		</property>
		<property name="poolSize" value="10" />
		<property name="reapTimeout" value="20000" />
	</bean>

说明:

Postgresql的max_prepared_transactions参数值默认是0,要开启分布式事务需要设置为大于0的值,该参数在PostgreSQL\9.3\data\postgresql.conf文件中。

PGXADataSource的父类BaseDataSource没有url属性,可需要分别设置serverName,portNumber,databaseName等属性。不同的数据库驱动有不同的实现方法。

2.2 配置事务管理对象和UserTransaction接口实现

<!-- atomikos事务管理 -->
	<bean id="atomikosUserTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager">
		<description>UserTransactionManager</description>
		<property name="forceShutdown" value="true" />
	</bean>

	<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
		<property name="transactionTimeout" value="300" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.transaction.jta.JtaTransactionManager">
		<property name="transactionManager" ref="atomikosUserTransactionManager"></property>
	</bean>

上面三个Bean可以独立使用来进行事务控制,具体看下面3。

3. 编写测试

3.1 使用atomikosUserTransactionManager对象测试(TestAtomikos1.java)

  1. package secondriver.springsubway.example.jta;
  2. import java.sql.Connection;
  3. import java.sql.SQLException;
  4. import javax.transaction.HeuristicMixedException;
  5. import javax.transaction.HeuristicRollbackException;
  6. import javax.transaction.NotSupportedException;
  7. import javax.transaction.RollbackException;
  8. import javax.transaction.SystemException;
  9. import org.junit.BeforeClass;
  10. import org.junit.Test;
  11. import org.springframework.context.ApplicationContext;
  12. import org.springframework.context.support.ClassPathXmlApplicationContext;
  13. import com.atomikos.icatch.jta.UserTransactionManager;
  14. import com.atomikos.jdbc.AtomikosDataSourceBean;
  15. public class TestAtomikos1 {
  16. public static ApplicationContext ctx;
  17. @BeforeClass
  18. public static void beforeClass() {
  19. ctx = new ClassPathXmlApplicationContext(
  20. "classpath:integration/spring-jta.xml");
  21. }
  22. public static void afterClass() {
  23. ctx = null;
  24. }
  25. @Test
  26. public void test1() {
  27. exe("abc", "abc");
  28. }
  29. @Test
  30. public void test2() {
  31. exe("123=", "123");
  32. }
  33. public void exe(String av, String bv) {
  34. AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean("a");
  35. AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean("b");
  36. Connection connA;
  37. Connection connB;
  38. UserTransactionManager utm = (UserTransactionManager) ctx
  39. .getBean("atomikosUserTransactionManager");
  40. try {
  41. utm.begin();
  42. connA = adsA.getConnection();
  43. connB = adsB.getConnection();
  44. connA.prepareStatement(
  45. "insert into jta_temp (value) values('" + av + "')")
  46. .execute();
  47. connB.prepareStatement(
  48. "insert into jta_temp (value) values('" + bv + "')")
  49. .execute();
  50. utm.commit();
  51. } catch (SQLException | NotSupportedException | SystemException
  52. | SecurityException | IllegalStateException | RollbackException
  53. | HeuristicMixedException | HeuristicRollbackException e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }

3.2使用Spring的JtaUserTransactionManager对象测试(TestAtomikos2.java 修改TestAtomikos1.java中的exe方法即可)

  1. @Test
  2. public void test1() {
  3. exe("abc", "abc");
  4. }
  5. @Test
  6. public void test2() {
  7. exe("123=", "123");
  8. }
public void exe(String av, String bv) {
		TransactionFactory txm = (TransactionFactory) ctx
				.getBean("transactionManager");

		JdbcTemplate a = (JdbcTemplate) ctx.getBean("jdbcTemplateA");
		JdbcTemplate b = (JdbcTemplate) ctx.getBean("jdbcTemplateB");
		try {
			Transaction tx = txm.createTransaction("tx-name-define", 10000);
			a.update("insert into jta_temp (value) values('" + av + "')");
			b.update("insert into jta_temp (value) values('" + bv + "')");
			tx.commit();
		} catch (NotSupportedException | SystemException | SecurityException
				| RollbackException | HeuristicMixedException
				| HeuristicRollbackException e) {
			e.printStackTrace();
		}
	}

3.3使用atomikosUserTransaction Bean对象进行测试(TestAtomikos3.java 修改TestAtomikos1.java中的exe方法即可)

  1. @Test
  2. public void test1() {
  3. exe("abc", "abc");
  4. }
  5. @Test
  6. public void test2() {
  7. exe("123", "123=");
  8. }
public void exe(String av, String bv) {

		AtomikosDataSourceBean adsA = (AtomikosDataSourceBean) ctx.getBean("a");
		AtomikosDataSourceBean adsB = (AtomikosDataSourceBean) ctx.getBean("b");
		Connection connA;
		Connection connB;
		UserTransaction utx = (UserTransaction) ctx
				.getBean("atomikosUserTransaction");
		try {
			utx.begin();
			connA = adsA.getConnection();
			connB = adsB.getConnection();
			connA.prepareStatement(
					"insert into jta_temp (value) values('" + av + "')")
					.execute();
			connB.prepareStatement(
					"insert into jta_temp (value) values('" + bv + "')")
					.execute();
			utx.commit();
		} catch (SQLException | NotSupportedException | SystemException
				| SecurityException | IllegalStateException | RollbackException
				| HeuristicMixedException | HeuristicRollbackException e) {
			e.printStackTrace();
		}
	}

使用上述三种UserTransaction进行测试,其中test1方法是成功执行,test2方法是执行失败的(因为插入的值长度超过的字段长度限制)。通过分析之后,如果分布式事物控制正确,那么数据库中写入的值对于两张不同的表而言,是没有数字值被写入的。如图所示:

在测试过程中,经过对比确实达到了分布式事务控制的效果。

关于JTA原理文章开始提到的那篇文章,写的很详细和清晰,可以细细阅读和理解。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xmlns:jdbc="http://www.springframework.org/schema/jdbc"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
  9. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  10. http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
  11. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
  12. <aop:aspectj-autoproxy />
  13. <!-- 采用注释的方式配置bean -->
  14. <context:annotation-config />
  15. <bean id="springContextHolder"
  16. class="com.hihsoft.framework.core.utils.SpringContextHolder"
  17. lazy-init="false" />
  18. <bean id="propertyConfigurerForProject_framework"
  19. class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  20. <property name="order" value="1" />
  21. <property name="ignoreUnresolvablePlaceholders" value="true" />
  22. <property name="location">
  23. <value>classpath:jdbc.properties</value>
  24. </property>
  25. </bean>
  26. <bean id="db1_dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
  27. init-method="init" destroy-method="close">
  28. <property name="uniqueResourceName" value="db1_dataSource" />
  29. <property name="xaDataSourceClassName"
  30. value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
  31. <property name="xaProperties">
  32. <props>
  33. <prop key="user">${jdbc.username}</prop>
  34. <prop key="password">${jdbc.password}</prop>
  35. <prop key="URL">${jdbc.url}</prop>
  36. </props>
  37. </property>
  38. <!-- 连接池里面连接的个数? -->
  39. <property name="poolSize" value="3" />
  40. </bean>
  41. <bean id="db2_dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
  42. init-method="init" destroy-method="close">
  43. <property name="uniqueResourceName" value="db2_dataSource" />
  44. <property name="xaDataSourceClassName"
  45. value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
  46. <property name="xaProperties">
  47. <props>
  48. <prop key="user">${jdbc.username.hihsoft}</prop>
  49. <prop key="password">${jdbc.password.hihsoft}</prop>
  50. <prop key="URL">${jdbc.url.hihsoft}</prop>
  51. </props>
  52. </property>
  53. <!-- 连接池里面连接的个数? -->
  54. <property name="poolSize" value="3" />
  55. </bean>
  56. <!-- enable transaction demarcation with annotations -->
  57. <tx:annotation-driven transaction-manager="jtaTransactionManager"
  58. proxy-target-class="false" />
  59. <!-- atomikos事务管理器 -->
  60. <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
  61. init-method="init" destroy-method="close">
  62. <property name="forceShutdown" value="true" />
  63. </bean>
  64. <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
  65. <property name="transactionTimeout" value="300" />
  66. </bean>
  67. <!-- spring 事务管理器 -->
  68. <bean id="jtaTransactionManager"
  69. class="org.springframework.transaction.jta.JtaTransactionManager">
  70. <property name="transactionManager" ref="atomikosTransactionManager" />
  71. <property name="userTransaction" ref="atomikosUserTransaction" />
  72. <property name="allowCustomIsolationLevels" value="true" />
  73. </bean>
  74. <bean id="db1_sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  75. <property name="dataSource" ref="db1_dataSource" />
  76. <property name="mapperLocations"
  77. value="classpath*:com/hihsoft/db1/persistence/**/*.xml" />
  78. </bean>
  79. <bean id="db1MapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  80. <property name="basePackage" value="com.hihsoft.db1.persistence" />
  81. <property name="sqlSessionFactoryBeanName" value="db1_sqlSessionFactory" />
  82. </bean>
  83. <bean id="db2_sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  84. <property name="dataSource" ref="db2_dataSource" />
  85. <property name="mapperLocations"
  86. value="classpath*:com/hihsoft/db2/persistence/**/*.xml" />
  87. </bean>
  88. <bean id="db2MapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  89. <property name="basePackage" value="com.hihsoft.db2.persistence" />
  90. <property name="sqlSessionFactoryBeanName" value="db2_sqlSessionFactory" />
  91. </bean>
  92. <!-- mybatis日志打印 -->
  93. <bean class="org.apache.ibatis.logging.LogFactory" init-method="useStdOutLogging"></bean>
  94. <!-- spring jdbc ibatis mybatis初始化数据导入 -->
  95. <jdbc:initialize-database data-source="db1_dataSource"
  96. enabled="false" ignore-failures="ALL">
  97. <jdbc:script location="classpath:/resources/dbcfg/initframework_db1.sql"
  98. encoding="UTF-8" />
  99. </jdbc:initialize-database>
  100. <jdbc:initialize-database data-source="db2_dataSource"
  101. enabled="false" ignore-failures="ALL">
  102. <jdbc:script location="classpath:/resources/dbcfg/initframework_db2.sql"
  103. encoding="UTF-8" />
  104. </jdbc:initialize-database>
  105. <!-- 扫描范围 -->
  106. <context:component-scan base-package="com.hihsoft">
  107. <context:exclude-filter type="annotation"
  108. expression="org.springframework.stereotype.Controller" />
  109. </context:component-scan>
  110. <!-- 获取bean工具注入 -->
  111. </beans>


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/346893?site
推荐阅读
相关标签
  

闽ICP备14008679号