赞
踩
mongodb分片集群想要使用事务,需要对应分片没有仲裁节点
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
import org.springframework.context.annotation.Bean; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.MongoTransactionManager; import org.springframework.stereotype.Component; /** * @author kittlen * @date 2024-04-09 17:20 * @description */ @Component public class MongodbConfig { @Bean public MongoTransactionManager transactionManager(MongoDbFactory factory) { return new MongoTransactionManager(factory); } }
@Autowired private MongoTemplate mongoTemplate; @Autowired private MongoTransactionManager mongoTransactionManager; public int dbFunc(){ TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager); return transactionTemplate.execute(status -> { try { UpdateResult updateResult = mongoTemplate.updateFirst(query, update, collection1); long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1; if (l > 0) { mongoTemplate.insert(saveEntity, collection2); } return 1; } catch (Exception e) { // 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚 status.setRollbackOnly(); return 0; } }); }
多mongos时使用的是随机获取的方式获取mongosClient,通过记录第一次调用的client使后续事务内的请求都通过同一个client请求,防止出现不同mongos导致事务失败情况
import com.mongodb.connection.Server; import java.util.function.Supplier; /** * @author kittlen * @date 2024-04-29 12:08 * @description */ public class MultiServiceTransactionConfig { /** * mongodb多实例事务使用 */ private static ThreadLocal<Server> mongoMultiServerTransactionUserService = new ThreadLocal<>(); /** * 是否开启多实例事务 */ private static ThreadLocal<Boolean> mongoMultiServerTransactionCanUser = new ThreadLocal<>(); /** * 获取service * * @param supplier 如果该service不存在,则获取新service的方法 * @return */ public static Server getService(Supplier<Server> supplier) { Server server = mongoMultiServerTransactionUserService.get(); if (server != null) { return server; } else { Server saveServer = supplier.get(); mongoMultiServerTransactionUserService.set(saveServer); return saveServer; } } /** * 开启事务记录 */ public static void openMultiServerTransaction() { mongoMultiServerTransactionCanUser.set(true); } /** * 是否开启多实例事务 * * @return */ public static boolean canOpenMultiServerTransaction() { Boolean b = mongoMultiServerTransactionCanUser.get(); return Boolean.TRUE.equals(b); } /** * 清除事务配置信息 */ public static void clean() { mongoMultiServerTransactionCanUser.remove(); mongoMultiServerTransactionUserService.remove(); } }
@Override public Server selectServer(final ServerSelector serverSelector) { isTrue("open", !isClosed()); try { CountDownLatch currentPhase = phase.get(); ClusterDescription curDescription = description; ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); Server server; if (this instanceof MultiServerCluster) { server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription); } else { server = selectRandomServer(compositeServerSelector, curDescription); } boolean selectionFailureLogged = false; long startTimeNanos = System.nanoTime(); long curTimeNanos = startTimeNanos; long maxWaitTimeNanos = getMaxWaitTimeNanos(); while (true) { throwIfIncompatible(curDescription); if (server != null) { return server; } if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) { throw createTimeoutException(serverSelector, curDescription); } if (!selectionFailureLogged) { logServerSelectionFailure(serverSelector, curDescription); selectionFailureLogged = true; } connect(); currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS); curTimeNanos = System.nanoTime(); currentPhase = phase.get(); curDescription = description; server = selectRandomServer(compositeServerSelector, curDescription); } } catch (InterruptedException e) { throw new MongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e); } }
重点为:
Server server;
if (this instanceof MultiServerCluster) {
server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);
} else {
server = selectRandomServer(compositeServerSelector, curDescription);
}
try { TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager); MultiServiceTransactionConfig.openMultiServerTransaction(); return transactionTemplate.execute(status -> { try { UpdateResult updateResult = mongoTemplate.updateFirst(query, update,ollection1); long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1; if (l > 0) { mongoTemplate.insert(historyDetailsEntity, collection2); } return 1; } catch (Exception e) { // 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚 status.setRollbackOnly(); return 0; } }); } finally { MultiServiceTransactionConfig.clean(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。