当前位置:   article > 正文

分布式- Mysql + Mongo多数据源的分布式事务回滚_transactionsynchronizationmanager.getresource(mong

transactionsynchronizationmanager.getresource(mongotransactionmanager.getdbf

1. XA两阶段提交模型

如上图,XA规范实现的两阶段提交流程:(下面全部翻译自XA规范原文)

阶段1

  TM要求所有RMs准备提交(或准备)事务分支。这询问RM是否能够保证提交事务分支的能力。RM可能会查询该RM内部的其他实例。CRM被要求准备它们创建的事务分支,将prepare请求发送到远程站点并接收结果。在返回失败并回滚其工作之后,RM可以丢弃事务分支的信息。

阶段2

  TM根据实际情况向所有RMs发出提交或回滚事务分支的请求。CRM被要求提交或回滚它们创建的事务分支,向远程站点发送提交或回滚请求并接收结果。所有RMs提交或回滚对共享资源的更改,然后将状态返回给TM。然后TM可以丢弃全局事务的信息。

1.1 XA对2PC的优化

  • 只读断言

  当事务分支没有更新共享资源时,这个RM会断言并响应给TM的prepare请求。也就免去了阶段2。但是,如果一个RM在全局事务的所有RMs返回prepared之前返回了只读优化,该RM释放事务上下文,例如read locks。这时候其他事务就有机会去改变这些数据(可能是写锁),显然全局序列化被破坏。同样CRM也可以断言,当TM挂起或终止线程与事务分支的关联时,它不是某个特定线程中活动的事务分支的参与者。

  • 一阶段提交

  如果一个TM知道DTP系统中只有一个RM在修改共享资源,那么它可以使用单阶段提交。即TM免去了阶段1的prepare,直接执行了阶段2的commit。

1.2 2PC的缺点

  • 资源阻塞

由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)

  • 数据不一致

在阶段二,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

由于二阶段提交存在着这些缺陷,所以,研究者们在二阶段提交的基础上做了改进,提出了三阶段提交(该篇文章暂不涉及)

2. Spring集成jta-atomikos实现Mysql + Mongo多数据源事务回滚

(1)项目环境为SpringBoot2.1.6

(2)依赖版本管理

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>std-boot-starter-transaction</artifactId>
  7. <groupId>priv.whh.std</groupId>
  8. <version>1.0.0-SNAPSHOT</version>
  9. <relativePath>../pom.xml</relativePath>
  10. </parent>
  11. <modelVersion>4.0.0</modelVersion>
  12. <artifactId>std-boot-jta</artifactId>
  13. <properties>
  14. <druid.version>1.1.23</druid.version>
  15. <mybatis-spring-boot-starter.version>1.3.2</mybatis-spring-boot-starter.version>
  16. <mybatis.version>3.4.6</mybatis.version>
  17. <mybatis-spring.version>1.3.2</mybatis-spring.version>
  18. <mysql-connector-java.version>8.0.11</mysql-connector-java.version>
  19. </properties>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-data-mongodb</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.alibaba</groupId>
  31. <artifactId>druid</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.projectlombok</groupId>
  35. <artifactId>lombok</artifactId>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.mybatis.spring.boot</groupId>
  39. <artifactId>mybatis-spring-boot-starter</artifactId>
  40. <scope>test</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.mybatis</groupId>
  44. <artifactId>mybatis-spring</artifactId>
  45. <scope>test</scope>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.mybatis</groupId>
  49. <artifactId>mybatis</artifactId>
  50. <scope>test</scope>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.springframework.boot</groupId>
  54. <artifactId>spring-boot-starter-web</artifactId>
  55. <scope>test</scope>
  56. </dependency>
  57. <dependency>
  58. <groupId>mysql</groupId>
  59. <artifactId>mysql-connector-java</artifactId>
  60. <scope>test</scope>
  61. </dependency>
  62. <dependency>
  63. <groupId>org.springframework</groupId>
  64. <artifactId>spring-jdbc</artifactId>
  65. <scope>test</scope>
  66. </dependency>
  67. </dependencies>
  68. <dependencyManagement>
  69. <dependencies>
  70. <dependency>
  71. <groupId>com.alibaba</groupId>
  72. <artifactId>druid</artifactId>
  73. <version>${druid.version}</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.mybatis.spring.boot</groupId>
  77. <artifactId>mybatis-spring-boot-starter</artifactId>
  78. <version>${mybatis-spring-boot-starter.version}</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.mybatis</groupId>
  82. <artifactId>mybatis-spring</artifactId>
  83. <version>${mybatis-spring.version}</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.mybatis</groupId>
  87. <artifactId>mybatis</artifactId>
  88. <version>${mybatis.version}</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>mysql</groupId>
  92. <artifactId>mysql-connector-java</artifactId>
  93. <version>${mysql-connector-java.version}</version>
  94. </dependency>
  95. </dependencies>
  96. </dependencyManagement>
  97. </project>

(3)配置类JtaAutoConfiguration

  1. package priv.whh.std.boot.jta.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.context.annotation.Primary;
  5. import org.springframework.data.mongodb.MongoDbFactory;
  6. import org.springframework.data.mongodb.MongoTransactionManager;
  7. import priv.whh.std.boot.jta.manager.JtaUserTransaction;
  8. import priv.whh.std.boot.jta.util.MongoUtils;
  9. /**
  10. * @author whh
  11. * @date 2020/7/22
  12. */
  13. @Configuration
  14. public class JtaAutoConfiguration {
  15. @Bean
  16. @Primary
  17. public JtaUserTransaction jtaUserTransaction(MongoDbFactory factory, MongoUtils mongoUtils) {
  18. return new JtaUserTransaction(new MongoTransactionManager(factory), mongoUtils);
  19. }
  20. }

(4)自定义事务JtaUserTransaction

  1. package priv.whh.std.boot.jta.manager;
  2. import com.atomikos.icatch.config.UserTransactionService;
  3. import com.atomikos.icatch.config.UserTransactionServiceImp;
  4. import com.atomikos.icatch.jta.TransactionManagerImp;
  5. import com.atomikos.util.SerializableObjectFactory;
  6. import com.mongodb.MongoException;
  7. import com.mongodb.TransactionOptions;
  8. import com.mongodb.client.ClientSession;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.data.mongodb.MongoDatabaseUtils;
  11. import org.springframework.data.mongodb.MongoDbFactory;
  12. import org.springframework.data.mongodb.MongoTransactionManager;
  13. import org.springframework.data.mongodb.SessionSynchronization;
  14. import org.springframework.lang.Nullable;
  15. import org.springframework.transaction.TransactionDefinition;
  16. import org.springframework.transaction.TransactionSystemException;
  17. import org.springframework.transaction.support.*;
  18. import org.springframework.util.Assert;
  19. import org.springframework.util.ClassUtils;
  20. import priv.whh.std.boot.jta.util.MongoUtils;
  21. import javax.naming.NamingException;
  22. import javax.naming.Reference;
  23. import javax.naming.Referenceable;
  24. import javax.transaction.NotSupportedException;
  25. import javax.transaction.SystemException;
  26. import javax.transaction.TransactionManager;
  27. import javax.transaction.UserTransaction;
  28. import java.io.Serializable;
  29. import java.lang.reflect.Method;
  30. import java.util.Objects;
  31. /**
  32. * @author whh
  33. * @date 2020/7/23
  34. */
  35. @Slf4j
  36. public class JtaUserTransaction implements UserTransaction, Serializable, Referenceable {
  37. private static final long serialVersionUID = -865418426269785202L;
  38. private transient TransactionManager transactionManager;
  39. private transient MongoTransactionManager mongoTransactionManager;
  40. private final transient MongoUtils mongoUtils;
  41. public JtaUserTransaction(MongoTransactionManager mongoTransactionManager, MongoUtils mongoUtils) {
  42. this.mongoTransactionManager = mongoTransactionManager;
  43. this.mongoUtils = mongoUtils;
  44. }
  45. /**
  46. * @see javax.transaction.UserTransaction
  47. */
  48. @Override
  49. public void begin() throws NotSupportedException, SystemException {
  50. checkSetup();
  51. transactionManager.begin();
  52. mongoUtils.setSessionSynchronizationForTransactionBegin();
  53. }
  54. /**
  55. * @see javax.transaction.UserTransaction
  56. */
  57. @Override
  58. public void commit() throws javax.transaction.RollbackException, javax.transaction.HeuristicMixedException,
  59. javax.transaction.HeuristicRollbackException, javax.transaction.SystemException {
  60. Assert.notNull(mongoTransactionManager.getDbFactory(), "DbFactory must not be null!");
  61. if (Objects.nonNull(TransactionSynchronizationManager.getResource(mongoTransactionManager.getDbFactory()))) {
  62. MongoTransactionObject mongoTransactionObject = extractMongoTransaction(getMongoTransaction());
  63. MongoResourceHolder resourceHolder = newResourceHolder(new DefaultTransactionDefinition());
  64. mongoTransactionObject.setResourceHolder(resourceHolder);
  65. try {
  66. mongoTransactionObject.commitTransaction();
  67. TransactionSynchronizationManager.unbindResource(mongoTransactionManager.getDbFactory());
  68. mongoTransactionObject.getRequiredResourceHolder().clear();
  69. mongoTransactionObject.closeSession();
  70. mongoUtils.setSessionSynchronizationForTransactionCompletion();
  71. } catch (Exception ex) {
  72. throw new TransactionSystemException(String.format("Could not commit Mongo transaction for session %s.",
  73. debugString(mongoTransactionObject.getSession())), ex);
  74. }
  75. }
  76. checkSetup();
  77. transactionManager.commit();
  78. }
  79. /**
  80. * @see javax.transaction.UserTransaction
  81. */
  82. @Override
  83. public void rollback() throws SystemException {
  84. Assert.notNull(mongoTransactionManager.getDbFactory(), "Db factory must not be null");
  85. if (Objects.nonNull(TransactionSynchronizationManager.getResource(mongoTransactionManager.getDbFactory()))) {
  86. MongoTransactionObject mongoTransactionObject = extractMongoTransaction(getMongoTransaction());
  87. MongoResourceHolder resourceHolder = newResourceHolder(new DefaultTransactionDefinition());
  88. mongoTransactionObject.setResourceHolder(resourceHolder);
  89. try {
  90. mongoTransactionObject.abortTransaction();
  91. TransactionSynchronizationManager.unbindResource(mongoTransactionManager.getDbFactory());
  92. mongoTransactionObject.getRequiredResourceHolder().clear();
  93. mongoTransactionObject.closeSession();
  94. mongoUtils.setSessionSynchronizationForTransactionCompletion();
  95. } catch (MongoException ex) {
  96. throw new TransactionSystemException(String.format("Could not abort Mongo transaction for session %s.",
  97. debugString(mongoTransactionObject.getSession())), ex);
  98. }
  99. }
  100. checkSetup();
  101. transactionManager.rollback();
  102. }
  103. /**
  104. * 不抛出异常进行回滚
  105. *
  106. * @see javax.transaction.UserTransaction
  107. */
  108. @Override
  109. public void setRollbackOnly() throws SystemException {
  110. checkSetup();
  111. transactionManager.setRollbackOnly();
  112. }
  113. /**
  114. * @see javax.transaction.UserTransaction
  115. */
  116. @Override
  117. public int getStatus() throws SystemException {
  118. checkSetup();
  119. return transactionManager.getStatus();
  120. }
  121. /**
  122. * @see javax.transaction.UserTransaction
  123. */
  124. @Override
  125. public void setTransactionTimeout(int seconds) throws SystemException {
  126. checkSetup();
  127. transactionManager.setTransactionTimeout(seconds);
  128. }
  129. /**
  130. * IMPLEMENTATION OF REFERENCEABLE
  131. */
  132. @Override
  133. public Reference getReference() throws NamingException {
  134. return SerializableObjectFactory.createReference(this);
  135. }
  136. protected int determineTimeout(TransactionDefinition definition) {
  137. if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
  138. return definition.getTimeout();
  139. }
  140. return TransactionDefinition.TIMEOUT_DEFAULT;
  141. }
  142. /**
  143. * Referenceable mechanism requires later setup of transactionManager, otherwise binding
  144. * <p>
  145. * into JNDI already requires that TM is running.
  146. */
  147. private void checkSetup() {
  148. synchronized (TransactionManagerImp.class) {
  149. transactionManager = TransactionManagerImp.getTransactionManager();
  150. if (Objects.isNull(transactionManager)) {
  151. UserTransactionService uts = new UserTransactionServiceImp();
  152. uts.init();
  153. transactionManager = TransactionManagerImp.getTransactionManager();
  154. }
  155. }
  156. }
  157. private Object getMongoTransaction() {
  158. MongoDbFactory mongoDbFactory = mongoTransactionManager.getDbFactory();
  159. Assert.notNull(mongoDbFactory, "Db factory must not be null");
  160. MongoResourceHolder resourceHolder = (MongoResourceHolder) TransactionSynchronizationManager
  161. .getResource(mongoDbFactory);
  162. return new MongoTransactionObject(resourceHolder);
  163. }
  164. private static MongoTransactionObject extractMongoTransaction(Object transaction) {
  165. Assert.isInstanceOf(MongoTransactionObject.class, transaction,
  166. () -> String.format("Expected to find a %s but it turned out to be %s.", MongoTransactionObject.class,
  167. transaction.getClass()));
  168. return (MongoTransactionObject) transaction;
  169. }
  170. private MongoResourceHolder newResourceHolder(TransactionDefinition definition) {
  171. MongoDbFactory dbFactory = mongoTransactionManager.getDbFactory();
  172. Class<MongoDatabaseUtils> mongoDatabaseUtilsClazz = MongoDatabaseUtils.class;
  173. ClientSession session = null;
  174. try {
  175. Method doGetSession = mongoDatabaseUtilsClazz.getDeclaredMethod(
  176. "doGetSession", MongoDbFactory.class, SessionSynchronization.class);
  177. doGetSession.setAccessible(true);
  178. session = (ClientSession) doGetSession.invoke(
  179. mongoDatabaseUtilsClazz.newInstance(), dbFactory, SessionSynchronization.ALWAYS);
  180. } catch (Exception e) {
  181. log.error("getSession err;", e.getCause());
  182. }
  183. if (Objects.nonNull(session)) {
  184. MongoResourceHolder resourceHolder = new MongoResourceHolder(session, dbFactory);
  185. resourceHolder.setTimeoutIfNotDefaulted(determineTimeout(definition));
  186. return resourceHolder;
  187. }
  188. return null;
  189. }
  190. private static String debugString(@Nullable ClientSession session) {
  191. if (session == null) {
  192. return "null";
  193. }
  194. String debugString = String.format("[%s@%s ", ClassUtils.getShortName(session.getClass()),
  195. Integer.toHexString(session.hashCode()));
  196. try {
  197. if (session.getServerSession() != null) {
  198. debugString += String.format("id = %s, ", session.getServerSession().getIdentifier());
  199. debugString += String.format("causallyConsistent = %s, ", session.isCausallyConsistent());
  200. debugString += String.format("txActive = %s, ", session.hasActiveTransaction());
  201. debugString += String.format("txNumber = %d, ", session.getServerSession().getTransactionNumber());
  202. debugString += String.format("closed = %s, ", session.getServerSession().isClosed());
  203. debugString += String.format("clusterTime = %s", session.getClusterTime());
  204. } else {
  205. debugString += "id = n/a";
  206. debugString += String.format("causallyConsistent = %s, ", session.isCausallyConsistent());
  207. debugString += String.format("txActive = %s, ", session.hasActiveTransaction());
  208. debugString += String.format("clusterTime = %s", session.getClusterTime());
  209. }
  210. } catch (RuntimeException e) {
  211. debugString += String.format("error = %s", e.getMessage());
  212. }
  213. debugString += "]";
  214. return debugString;
  215. }
  216. /**
  217. * @see org.springframework.data.mongodb.MongoResourceHolder
  218. */
  219. protected static class MongoTransactionObject implements SmartTransactionObject {
  220. @Nullable
  221. private MongoResourceHolder resourceHolder;
  222. MongoTransactionObject(@Nullable MongoResourceHolder resourceHolder) {
  223. this.resourceHolder = resourceHolder;
  224. }
  225. /**
  226. * Set the {@link MongoResourceHolder}.
  227. *
  228. * @param resourceHolder can be {@literal null}.
  229. */
  230. void setResourceHolder(@Nullable MongoResourceHolder resourceHolder) {
  231. this.resourceHolder = resourceHolder;
  232. }
  233. /**
  234. * @return {@literal true} if a {@link MongoResourceHolder} is set.
  235. */
  236. final boolean hasResourceHolder() {
  237. return resourceHolder != null;
  238. }
  239. /**
  240. * Start a MongoDB transaction optionally given {@link TransactionOptions}.
  241. *
  242. * @param options can be {@literal null}
  243. */
  244. void startTransaction(@Nullable TransactionOptions options) {
  245. ClientSession session = getRequiredSession();
  246. if (options != null) {
  247. session.startTransaction(options);
  248. } else {
  249. session.startTransaction();
  250. }
  251. }
  252. /**
  253. * Commit the transaction.
  254. */
  255. public void commitTransaction() {
  256. getRequiredSession().commitTransaction();
  257. }
  258. /**
  259. * Rollback (abort) the transaction.
  260. */
  261. public void abortTransaction() {
  262. getRequiredSession().abortTransaction();
  263. }
  264. /**
  265. * Close a {@link ClientSession} without regard to its transactional state.
  266. */
  267. void closeSession() {
  268. ClientSession session = getRequiredSession();
  269. if (session.getServerSession() != null && !session.getServerSession().isClosed()) {
  270. session.close();
  271. }
  272. }
  273. @Nullable
  274. public ClientSession getSession() {
  275. return resourceHolder != null ? resourceHolder.getSession() : null;
  276. }
  277. private MongoResourceHolder getRequiredResourceHolder() {
  278. Assert.state(resourceHolder != null, "MongoResourceHolder is required but not present. o_O");
  279. return resourceHolder;
  280. }
  281. private ClientSession getRequiredSession() {
  282. ClientSession session = getSession();
  283. Assert.state(session != null, "A Session is required but it turned out to be null.");
  284. return session;
  285. }
  286. /**
  287. * (non-Javadoc)
  288. *
  289. * @see org.springframework.transaction.support.SmartTransactionObject#isRollbackOnly()
  290. */
  291. @Override
  292. public boolean isRollbackOnly() {
  293. return this.resourceHolder != null && this.resourceHolder.isRollbackOnly();
  294. }
  295. /**
  296. * (non-Javadoc)
  297. *
  298. * @see org.springframework.transaction.support.SmartTransactionObject#flush()
  299. */
  300. @Override
  301. public void flush() {
  302. TransactionSynchronizationUtils.triggerFlush();
  303. }
  304. }
  305. /**
  306. * @see org.springframework.data.mongodb.MongoResourceHolder
  307. */
  308. class MongoResourceHolder extends ResourceHolderSupport {
  309. private @Nullable
  310. ClientSession session;
  311. private MongoDbFactory dbFactory;
  312. /**
  313. * Create a new {@link com.shero.sport.web.conf.JtaTransactionImp.MongoResourceHolder} for a given {@link ClientSession session}.
  314. *
  315. * @param session the associated {@link ClientSession}. Can be {@literal null}.
  316. * @param dbFactory the associated {@link MongoDbFactory}. must not be {@literal null}.
  317. */
  318. MongoResourceHolder(@Nullable ClientSession session, MongoDbFactory dbFactory) {
  319. this.session = session;
  320. this.dbFactory = dbFactory;
  321. }
  322. /**
  323. * @return the associated {@link ClientSession}. Can be {@literal null}.
  324. */
  325. @Nullable
  326. ClientSession getSession() {
  327. return session;
  328. }
  329. /**
  330. * @return the required associated {@link ClientSession}.
  331. * @throws IllegalStateException if no {@link ClientSession} is associated with this {@link com.shero.sport.web.conf.JtaTransactionImp.MongoResourceHolder}.
  332. * @since 2.1.3
  333. */
  334. ClientSession getRequiredSession() {
  335. ClientSession session = getSession();
  336. if (session == null) {
  337. throw new IllegalStateException("No session available!");
  338. }
  339. return session;
  340. }
  341. /**
  342. * @return the associated {@link MongoDbFactory}.
  343. */
  344. public MongoDbFactory getDbFactory() {
  345. return dbFactory;
  346. }
  347. /**
  348. * Set the {@link ClientSession} to guard.
  349. *
  350. * @param session can be {@literal null}.
  351. */
  352. public void setSession(@Nullable ClientSession session) {
  353. this.session = session;
  354. }
  355. /**
  356. * Only set the timeout if it does not match the {@link TransactionDefinition#TIMEOUT_DEFAULT default timeout}.
  357. *
  358. * @param seconds
  359. */
  360. void setTimeoutIfNotDefaulted(int seconds) {
  361. if (seconds != TransactionDefinition.TIMEOUT_DEFAULT) {
  362. setTimeoutInSeconds(seconds);
  363. }
  364. }
  365. /**
  366. * @return {@literal true} if session is not {@literal null}.
  367. */
  368. boolean hasSession() {
  369. return session != null;
  370. }
  371. /**
  372. * @return {@literal true} if the session is active and has not been closed.
  373. */
  374. boolean hasActiveSession() {
  375. if (!hasSession()) {
  376. return false;
  377. }
  378. return hasServerSession() && !getRequiredSession().getServerSession().isClosed();
  379. }
  380. /**
  381. * @return {@literal true} if the session has an active transaction.
  382. * @see #hasActiveSession()
  383. * @since 2.1.3
  384. */
  385. boolean hasActiveTransaction() {
  386. if (!hasActiveSession()) {
  387. return false;
  388. }
  389. return getRequiredSession().hasActiveTransaction();
  390. }
  391. /**
  392. * @return {@literal true} if the {@link ClientSession} has a {@link com.mongodb.session.ServerSession} associated
  393. * <p>
  394. * that is accessible via {@link ClientSession#getServerSession()}.
  395. */
  396. boolean hasServerSession() {
  397. try {
  398. return getRequiredSession().getServerSession() != null;
  399. } catch (IllegalStateException serverSessionClosed) {
  400. // ignore
  401. }
  402. return false;
  403. }
  404. }
  405. }

(5)工具类MongoUtils

  1. package priv.whh.std.boot.jta.util;
  2. import lombok.RequiredArgsConstructor;
  3. import org.springframework.data.mongodb.SessionSynchronization;
  4. import org.springframework.data.mongodb.core.MongoTemplate;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author whh
  8. * @date 2020/7/22
  9. */
  10. @Component
  11. @RequiredArgsConstructor
  12. public class MongoUtils {
  13. private final MongoTemplate mongoTemplate;
  14. public void setSessionSynchronizationForTransactionBegin() {
  15. // 同步任何事务(即使是空事务)并在执行此操作时启动MongoDB事务
  16. mongoTemplate.setSessionSynchronization(SessionSynchronization.ALWAYS);
  17. }
  18. public void setSessionSynchronizationForTransactionCompletion() {
  19. mongoTemplate.setSessionSynchronization(SessionSynchronization.ON_ACTUAL_TRANSACTION);
  20. }
  21. }

(6)配置

  1. mybatis:
  2. configuration:
  3. map-underscore-to-camel-case: true
  4. mapper-locations: classpath*:mapper/*.xml
  5. type-aliases-package: com.tzc.whh.model
  6. spring:
  7. datasource:
  8. url: jdbc:mysql://127.0.0.1:3306/local?useUnicode=true&characterEncoding=UTF8&useSSL=false&serverTimezone=Asia/Shanghai
  9. username: root
  10. password: 123456
  11. # driver-class-name: com.mysql.jdbc.Driver
  12. driver-class-name: com.mysql.cj.jdbc.Driver
  13. xa:
  14. data-source-class-name: com.alibaba.druid.pool.xa.DruidXADataSource
  15. data:
  16. mongodb:
  17. uri: mongodb://127.0.0.1:27018,127.0.0.1:27019/coupon?authSource=coupon&slaveOk=true&replicaSet=rs0&write=1&readPreference=secondaryPreferred&connectTimeoutMS=300000
  18. server:
  19. port: 9090

(7)事务类Manager

  1. package priv.whh.std.boot.jta.manager;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.data.mongodb.core.MongoTemplate;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.transaction.annotation.Transactional;
  6. import priv.whh.std.boot.jta.dao.UserMapper;
  7. import priv.whh.std.boot.jta.po.UserPo;
  8. import java.util.Objects;
  9. import java.util.Random;
  10. /**
  11. * @author whh
  12. * @date 2020/7/22
  13. */
  14. @Component
  15. public class Manager {
  16. @Autowired
  17. private UserMapper userMapper;
  18. @Autowired
  19. private MongoTemplate mongoTemplate;
  20. @Transactional(rollbackFor = Exception.class)
  21. public void test(Integer test) throws Exception {
  22. userMapper.insert(new UserPo(1L, "test"));
  23. mongoTemplate.save(new UserPo(new Random().nextLong(), "testA"), "t_account");
  24. userMapper.insert(new UserPo(1L, "testB"));
  25. mongoTemplate.save(new UserPo(new Random().nextLong(), "testB"), "t_account");
  26. if (Objects.equals(1, test)) {
  27. throw new Exception();
  28. }
  29. }
  30. }

当入参test为1时,抛出异常, 事务回滚。其它情况时,正常执行。

 

参考资料:

分布式事务(一)原理概览

springboot + jta + mysql + mongo 分布式(多种数据源)事务

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/274871
推荐阅读
相关标签
  

闽ICP备14008679号