当前位置:   article > 正文

springboot mongodb分片集群事务

springboot mongodb分片集群事务

前置

mongodb分片集群想要使用事务,需要对应分片没有仲裁节点

在这里插入图片描述

代码


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
              <version>2.1.0.RELEASE</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果是单个mongos

import org.springframework.context.annotation.Bean;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.stereotype.Component;

/**
 * @author kittlen
 * @date 2024-04-09 17:20
 * @description
 */

@Component
public class MongodbConfig {

    

    @Bean
    public MongoTransactionManager transactionManager(MongoDbFactory factory) {
        return new MongoTransactionManager(factory);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

使用


	@Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private MongoTransactionManager mongoTransactionManager;

	public int dbFunc(){
        TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager);
		return transactionTemplate.execute(status -> {
                    try {
                        UpdateResult updateResult = mongoTemplate.updateFirst(query, update, collection1);
                        long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1;
                        if (l > 0) {
                            mongoTemplate.insert(saveEntity, collection2);
                        }
                        return 1;
                    } catch (Exception e) {
                        // 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚
                       status.setRollbackOnly();
                       return 0;
                    }
                });
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

如果连接是多mongos,则需要重写BaseCluster类

多mongos时使用的是随机获取的方式获取mongosClient,通过记录第一次调用的client使后续事务内的请求都通过同一个client请求,防止出现不同mongos导致事务失败情况

事务记录类


import com.mongodb.connection.Server;

import java.util.function.Supplier;

/**
 * @author kittlen
 * @date 2024-04-29 12:08
 * @description
 */

public class MultiServiceTransactionConfig {

    /**
     * mongodb多实例事务使用
     */
    private static ThreadLocal<Server> mongoMultiServerTransactionUserService = new ThreadLocal<>();

    /**
     * 是否开启多实例事务
     */
    private static ThreadLocal<Boolean> mongoMultiServerTransactionCanUser = new ThreadLocal<>();

    /**
     * 获取service
     *
     * @param supplier 如果该service不存在,则获取新service的方法
     * @return
     */
    public static Server getService(Supplier<Server> supplier) {
        Server server = mongoMultiServerTransactionUserService.get();
        if (server != null) {
            return server;
        } else {
            Server saveServer = supplier.get();
            mongoMultiServerTransactionUserService.set(saveServer);
            return saveServer;
        }
    }

    /**
     * 开启事务记录
     */

    public static void openMultiServerTransaction() {
        mongoMultiServerTransactionCanUser.set(true);
    }

    /**
     * 是否开启多实例事务
     *
     * @return
     */
    public static boolean canOpenMultiServerTransaction() {
        Boolean b = mongoMultiServerTransactionCanUser.get();
        return Boolean.TRUE.equals(b);
    }

    /**
     * 清除事务配置信息
     */
    public static void clean() {
        mongoMultiServerTransactionCanUser.remove();
        mongoMultiServerTransactionUserService.remove();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

重写mongodb的类com.mongodb.internal.connection.BaseCluster的selectServer方法


	@Override
    public Server selectServer(final ServerSelector serverSelector) {
        isTrue("open", !isClosed());

        try {
            CountDownLatch currentPhase = phase.get();
            ClusterDescription curDescription = description;
            ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
            Server server;
            if (this instanceof MultiServerCluster) {
                server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);
            } else {
                server = selectRandomServer(compositeServerSelector, curDescription);
            }
            boolean selectionFailureLogged = false;

            long startTimeNanos = System.nanoTime();
            long curTimeNanos = startTimeNanos;
            long maxWaitTimeNanos = getMaxWaitTimeNanos();

            while (true) {
                throwIfIncompatible(curDescription);

                if (server != null) {
                    return server;
                }

                if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {
                    throw createTimeoutException(serverSelector, curDescription);
                }

                if (!selectionFailureLogged) {
                    logServerSelectionFailure(serverSelector, curDescription);
                    selectionFailureLogged = true;
                }

                connect();

                currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS);

                curTimeNanos = System.nanoTime();

                currentPhase = phase.get();
                curDescription = description;
                server = selectRandomServer(compositeServerSelector, curDescription);
            }

        } catch (InterruptedException e) {
            throw new MongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

重点为:

			Server server;
            if (this instanceof MultiServerCluster) {
                server = MultiServiceTransactionConfig.canOpenMultiServerTransaction() ? MultiServiceTransactionConfig.getService(() -> selectRandomServer(compositeServerSelector, description)) : selectRandomServer(compositeServerSelector, curDescription);
            } else {
                server = selectRandomServer(compositeServerSelector, curDescription);
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用

			try {
                TransactionTemplate transactionTemplate = new TransactionTemplate(mongoTransactionManager);
                MultiServiceTransactionConfig.openMultiServerTransaction();
                return transactionTemplate.execute(status -> {
                    try {
                        UpdateResult updateResult = mongoTemplate.updateFirst(query, update,ollection1);
                        long l = updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 1;
                        if (l > 0) {
                            mongoTemplate.insert(historyDetailsEntity, collection2);
                        }
                        return 1;
                    } catch (Exception e) {
                        // 如果发生异常,事务将在此处回滚,通过status.setRollbackOnly();或者抛出异常都可回滚
                       status.setRollbackOnly();
                       return 0;
                    }
                });
            } finally {
                MultiServiceTransactionConfig.clean();
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/533568
推荐阅读
相关标签
  

闽ICP备14008679号