赞
踩
对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍。本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo)。
seata中一个事务的开启是由TM角色来完成,在整体事务发起方我们可以通过在执行方法中包含@GlobalTransactional来标示启用全局事务,并包含该全局事务的一定自定义设置。如下所示:
- public @interface GlobalTransactional {
-
- /**
- * 设置该全局事务下执行的超时时间 默认毫秒
- *
- * @return timeoutMills in MILLISECONDS.
- */
- int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;
-
- /**
- * 全局事务实例的的名称
- *
- * @return Given name.
- */
- String name() default "";
-
- /**
- * 设置哪些异常类发生需要进行rollback
- * @return
- */
- Class<? extends Throwable>[] rollbackFor() default {};
-
- /**
- * 设置哪些异常类名发生需要进行rollback
- * @return
- */
- String[] rollbackForClassName() default {};
-
- /**
- * 设置哪些异常类发生不需要进行rollback
- * @return
- */
- Class<? extends Throwable>[] noRollbackFor() default {};
-
- /**
- * 设置哪些异常类名发生不需要进行rollback
- * @return
- */
- String[] noRollbackForClassName() default {};
-
- /**
- * 事务传播级别 默认REQUIRED
- *
- * @return
- */
- Propagation propagation() default Propagation.REQUIRED;
- }

使用过spring的@Transactional事务的实现知道,它通过动态代理的方式,将事务的创建,提交或回滚这些公干的动作封装到一套执行模版中。这种方式在很多开源框架都是如此构建的例如mybtis中的各执行注解(@Update,@Select等),Springcloud中的Feign调用啊等。通过idea我们可以查看@GlobalTransactional该注解在什么地方被使用到,如下所示:
如果对于springboot的一些开源start(例如mybatis中MapperScannerRegistrar等)项目有过源码走读的经验,从GlobalTransactionScanner名字可以看出该类负责扫描GlobalTransaction注解并构建其代理方法(比较@GlobalTransactionScanner作用在方法中)。继续通过ieda的Find Usages功能寻找GobalTransactionScanner的引用,发现在seata的spring starter项目中的SeataAutoConfiguration对其进行初始化。
我们镜头回到GobalTransactionScanner中(对于SeataAutoConfiguration的其它作用后续描述)。GobalTransactionScanner实现了InitializingBean(bean初始化完成后执行),AbstractAutoProxyCreator,ApplicationContextAware(获取ApplicationContext对象),DisposableBean(bean被消耗时执行),分别对应的spring中bean不同的生命周期。如下所示是spring bean初始化完成后执行
这里会存在一个疑问,为何要开启RM与TM两个client,如果对于某一个服务它在分布式事务链路中只是作为一个分支即RM的角色而非TM,那么对于这TM的启动是否没有存在必要,毕竟需要开启TM与TC之间的连接通道,也是一个资源的浪费。
在1.2与1.3对于TM与TC之间连接的有关的管理类有着不同的命名
1.2的时候命名为TmRpcClient
对于1.3的时候改命名为TmNettyRemotingClient如下所示:
其实不论上述两个版本核心都是通过Netty作为服务之间远程网络通信基础架构,所以1.3的改为TmNettyRemotingClient更简单表达底层实现原理。后续都以1.3最新版作为讲解
- public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {
- private static final Logger LOGGER = LoggerFactory.getLogger(TmNettyRemotingClient.class);
- private static volatile TmNettyRemotingClient instance;
- //长链接 keep-alive时间
- private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
- //常量 线程 等待队列长度
- private static final int MAX_QUEUE_SIZE = 2000;
- //是否初始化标示
- private final AtomicBoolean initialized = new AtomicBoolean(false);
- //配置applicationId唯一id
- private String applicationId;
- private String transactionServiceGroup;
-
-
- @Override
- public void init() {
- // 注册返回response 消息处理器
- registerProcessor();
- //初始化
- if (initialized.compareAndSet(false, true)) {
- super.init();
- }
- }
-
- private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
- EventExecutorGroup eventExecutorGroup,
- ThreadPoolExecutor messageExecutor) {
- super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
- }
-
- /**
- * 获取一个TmNettyRemotingClient
- *
- * @param applicationId the application id
- * @param transactionServiceGroup the transaction service group
- * @return the instance
- */
- public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
- //作为一个单列的形式获取TmNettyRemotingClient
- TmNettyRemotingClient tmNettyRemotingClient = getInstance();
- tmNettyRemotingClient.setApplicationId(applicationId);
- tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
- return tmNettyRemotingClient;
- }
-
- /**
- * 单例获取 懒汉式获取
- * @return the instance
- */
- public static TmNettyRemotingClient getInstance() {
- if (instance == null) {
- synchronized (TmNettyRemotingClient.class) {
- if (instance == null) {
- NettyClientConfig nettyClientConfig = new NettyClientConfig();
- //定义线程pool
- final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
- nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
- KEEP_ALIVE_TIME, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
- new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
- nettyClientConfig.getClientWorkerThreads()),
- RejectedPolicies.runsOldestTaskPolicy());
- instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
- }
- }
- }
- return instance;
- }
-
- /**
- * Sets application id.
- *
- * @param applicationId the application id
- */
- public void setApplicationId(String applicationId) {
- this.applicationId = applicationId;
- }
-
- /**
- * Sets transaction service group.
- *
- * @param transactionServiceGroup the transaction service group
- */
- public void setTransactionServiceGroup(String transactionServiceGroup) {
- this.transactionServiceGroup = transactionServiceGroup;
- }
-
- @Override
- public String getTransactionServiceGroup() {
- return transactionServiceGroup;
- }
-
- /**
- *注册成功回调
- */
- @Override
- public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
- AbstractMessage requestMessage) {
- RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
- RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
- }
- getClientChannelManager().registerChannel(serverAddress, channel);
- }
-
- @Override
- public void onRegisterMsgFail(String serverAddress, Channel channel, Object response,
- AbstractMessage requestMessage) {
- RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
- RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
- String errMsg = String.format(
- "register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);
- throw new FrameworkException(errMsg);
- }
-
- /**
- * bean被销毁
- */
- @Override
- public void destroy() {
- super.destroy();
- initialized.getAndSet(false);
- instance = null;
- }
-
- @Override
- protected Function<String, NettyPoolKey> getPoolKeyFunction() {
- return (severAddress) -> {
- RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
- return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
- };
- }
-
- /**
- * 注册 TC response 有关处理器
- */
- private void registerProcessor() {
- //注册 TC response netty返回信息解析器
-
- ClientOnResponseProcessor onResponseProcessor =
- new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
- super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
- super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
- super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
- super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
- super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
- super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
- super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
- // 2.注册 heartbeat netty返回信息解析器
- ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
- super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
- }
- }
-
- //父类中init
- public void init() {
- //定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连
- timerExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- clientChannelManager.reconnect(getTransactionServiceGroup());
- }
- }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
- //是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true
- if (NettyClientConfig.isEnableClientBatchSendRequest()) {
- //定义线程pool
- mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
- MAX_MERGE_SEND_THREAD,
- KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
- new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
- //运行MergedSendRunnable任务即合并发送请求
- mergeSendExecutorService.submit(new MergedSendRunnable());
- }
- super.init();
- //
- clientBootstrap.start();
- }
-

- public class NettyClientBootstrap implements RemotingBootstrap {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
- //client有关配置
- private final NettyClientConfig nettyClientConfig;
- //netty 启动类
- private final Bootstrap bootstrap = new Bootstrap();
- //netty worker
- private final EventLoopGroup eventLoopGroupWorker;
- //事件调度
- private EventExecutorGroup defaultEventExecutorGroup;
- //是否初始化标示
- private final AtomicBoolean initialized = new AtomicBoolean(false);
- private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
- private final NettyPoolKey.TransactionRole transactionRole;
- //netty handler事件
- private ChannelHandler[] channelHandlers;
-
- public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
- NettyPoolKey.TransactionRole transactionRole) {
- if (nettyClientConfig == null) {
- nettyClientConfig = new NettyClientConfig();
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("use default netty client config.");
- }
- }
- this.nettyClientConfig = nettyClientConfig;
- int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
- this.transactionRole = transactionRole;
- //nio event group
- this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
- new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
- selectorThreadSizeThreadSize));
- this.defaultEventExecutorGroup = eventExecutorGroup;
- }
-
- /**
- * Sets channel handlers.
- *
- * @param handlers the handlers
- */
- protected void setChannelHandlers(final ChannelHandler... handlers) {
- if (handlers != null) {
- channelHandlers = handlers;
- }
- }
-
- /**
- * Add channel pipeline last.
- *
- * @param channel the channel
- * @param handlers the handlers
- */
- private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {
- if (channel != null && handlers != null) {
- channel.pipeline().addLast(handlers);
- }
- }
-
- @Override
- public void start() {
-
- if (this.defaultEventExecutorGroup == null) {
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
- new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
- nettyClientConfig.getClientWorkerThreads()));
- }
- //初始化 netty client 并设置option属性
- this.bootstrap.group(this.eventLoopGroupWorker).channel(
- nettyClientConfig.getClientChannelClazz()).option(
- ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
- ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
- ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
- nettyClientConfig.getClientSocketRcvBufSize());
-
- if (nettyClientConfig.enableNative()) {
- if (PlatformDependent.isOsx()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("client run on macOS");
- }
- } else {
- bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
- .option(EpollChannelOption.TCP_QUICKACK, true);
- }
- }
-
- //通过pipeline 绑定默认Handler 以及自定义handler
- bootstrap.handler(
- new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(
- /**
- * 设置channel 空闲状态处理器,是用来检测当前Handler的ChannelRead()的空闲时间
- * int readerIdleTimeSeconds 为读超时时间(即多长时间没有接受到客户端发送数据)
- * int writerIdleTimeSeconds, 为写超时时间(即多长时间没有向客户端发送数据)
- * int allIdleTimeSeconds 所有类型(读或写)的超时时间
- * 根据个参数IdleStateHandler会启动不同的定时任务,根据设定的时长去检测ChannelRead()方法是否被调用,
- * 如果没有被调用。之后则会调用后续handler的userEventTriggered方法去执行一些事情(比如断开链接)
- */
- new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
- nettyClientConfig.getChannelMaxWriteIdleSeconds(),
- nettyClientConfig.getChannelMaxAllIdleSeconds()))
- //设置编码解码器
- .addLast(new ProtocolV1Decoder())
- .addLast(new ProtocolV1Encoder());
- if (channelHandlers != null) {
- addChannelPipelineLast(ch, channelHandlers);
- }
- }
- });
-
- if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
- LOGGER.info("NettyClientBootstrap has started");
- }
- }
-
- @Override
- public void shutdown() {
- try {
- //关闭netty网络资源
- this.eventLoopGroupWorker.shutdownGracefully();
- if (this.defaultEventExecutorGroup != null) {
- this.defaultEventExecutorGroup.shutdownGracefully();
- }
- } catch (Exception exx) {
- LOGGER.error("Failed to shutdown: {}", exx.getMessage());
- }
- }
-
- /**
- *
- * 获取一个新的channel channel为与TC之间网络通道
- * @param address the address 网络地址
- * @return the new channel
- */
- public Channel getNewChannel(InetSocketAddress address) {
- Channel channel;
- //连接TC
- ChannelFuture f = this.bootstrap.connect(address);
- try {
- //等待超时时间内与Server端进行 若无法连接抛出异常
- f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
- if (f.isCancelled()) {
- throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
- } else if (!f.isSuccess()) {
- throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
- } else {
- channel = f.channel();
- }
- } catch (Exception e) {
- throw new FrameworkException(e, "can not connect to services-server.");
- }
- return channel;
- }
-
- /**
- * Gets thread prefix.
- *
- * @param threadPrefix the thread prefix
- * @return the thread prefix
- */
- private String getThreadPrefix(String threadPrefix) {
- return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
- }
- }

以上为TM大体的初始化过程,详细可自行研读源码
GlobalTransactionScanner中afterPropertiesSet解析完成之后,会执行AbstractAutoProxyCreator(该类用于为Bean生成代理对象,)中的wrapIfNecessary()方法,(AbstractAutoProxyCreator实际上实现了BeanPostProcessor接口,而wrapIfNecessary在postProcessAfterInitialization方法中被调用,因此它在afterPropertiesSet之后执行
wrapIfNecessary源码分析:
- protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
- //是否开启GlobalTransaction
- if (disableGlobalTransaction) {
- return bean;
- }
- try {
- //PROXYED_SET 记录已被代理
- synchronized (PROXYED_SET) {
- //该bean 是否已被代理 若已被无需重复代理
- if (PROXYED_SET.contains(beanName)) {
- return bean;
- }
- //MethodInterceptor 定义方法拦截器
- interceptor = null;
- //检测是否是TCC 模式下代理
- if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
- //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
- interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
- } else {
- //非jdk代理 基于class方式
- Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
- //如果bean是jdk代理(基于接口) 获取元Class
- Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
-
- //是否包含Annotation
- if (!existsAnnotation(new Class[]{serviceInterface})
- && !existsAnnotation(interfacesIfJdk)) {
- return bean;
- }
-
- if (interceptor == null) {
- if (globalTransactionalInterceptor == null) {
- //构建globalTransactionalInterceptor
- globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
-
- ConfigurationCache.addConfigListener(
- ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)globalTransactionalInterceptor);
- }
- interceptor = globalTransactionalInterceptor;
- }
- }
-
- LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
- if (!AopUtils.isAopProxy(bean)) {
- //如果gaibean不是aop代理类
- bean = super.wrapIfNecessary(bean, beanName, cacheKey);
- } else {
- // 执行包装目标对象到代理对象
-
- AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
- Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
- for (Advisor avr : advisor) {
- advised.addAdvisor(0, avr);
- }
- }
-
- PROXYED_SET.add(beanName);
- return bean;
- }
- } catch (Exception exx) {
- throw new RuntimeException(exx);
- }
- }
-
- /**
- * 目标 classes的方法中是否包含GlobalTransactional 或GlobalLock 注解
- * @param classes
- * @return
- */
- private boolean existsAnnotation(Class<?>[] classes) {
- if (CollectionUtils.isNotEmpty(classes)) {
- for (Class<?> clazz : classes) {
- if (clazz == null) {
- continue;
- }
- GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
- if (trxAnno != null) {
- return true;
- }
- Method[] methods = clazz.getMethods();
-
- for (Method method : methods) {
- //是否包含GlobalTransactional注解
- trxAnno = method.getAnnotation(GlobalTransactional.class);
- if (trxAnno != null) {
- return true;
- }
- //GlobalLock
- GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
- if (lockAnno != null) {
- return true;
- }
- }
- }
- }
- return false;
- }

从上述代码中可看出,用GlobalTransactionalInterceptor 代替了GlobalTransactional 和 GlobalLock 注解的方法
该类用于代理处理@GlobalTransactional被执行,如下源码所示:
- public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
- private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
- //事务模版类
- private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
- private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
- //失败处理器
- private final FailureHandler failureHandler;
- private volatile boolean disable;
- //服务自检周期 默认2000,单位ms.每2秒进行一次服务自检,来决定
- private static int degradeCheckPeriod;
- //降级检测开关 降级开关 默认false。业务侧根据连续错误数自动降级不走seata事务
- private static volatile boolean degradeCheck;
- //升降级达标阈值 默认10
- private static int degradeCheckAllowTimes;
- private static volatile Integer degradeNum = 0;
- private static volatile Integer reachNum = 0;
- private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
-
- //用于周期检测是否降级 执行器 应该在degradeCheck =true时被初始化
- private static ScheduledThreadPoolExecutor executor =
- new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
-
- /**
- * Instantiates a new Global transactional interceptor.
- *
- * @param failureHandler
- * the failure handler
- */
- public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
- //初始化动作
- this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
- this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- DEFAULT_DISABLE_GLOBAL_TRANSACTION);
- degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
- DEFAULT_TM_DEGRADE_CHECK);
- //开启降级设置
- if (degradeCheck) {
- ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
- degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(
- ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
- degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(
- ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
- EVENT_BUS.register(this);
- if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
- startDegradeCheck();
- }
- }
- }
-
- /**
- * 代理方法调用逻辑
- * @param methodInvocation 被代理的原方法
- * @return
- * @throws Throwable
- */
- @Override
- public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
-
- //获取方法所属类
- Class<?> targetClass =
- methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
- //获取执行具体的Method对象
- Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
- if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
- final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
- //获取GlobalTransactional注解对象 获取定义属性
- final GlobalTransactional globalTransactionalAnnotation =
- getAnnotation(method, targetClass, GlobalTransactional.class);
- //获取GlobalLock对象
- final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
- //是否被降级或者开启全局事务
- boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
- if (!localDisable) {
- //判定globalTransactional注解还是globalLock全局锁对象
- if (globalTransactionalAnnotation != null) {

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。