当前位置:   article > 正文

阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)_seata old value与new value

seata old value与new value

序言:

对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍。本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo)。

seata中一个事务的开启是由TM角色来完成,在整体事务发起方我们可以通过在执行方法中包含@GlobalTransactional来标示启用全局事务,并包含该全局事务的一定自定义设置。如下所示:

  1. public @interface GlobalTransactional {
  2. /**
  3. * 设置该全局事务下执行的超时时间 默认毫秒
  4. *
  5. * @return timeoutMills in MILLISECONDS.
  6. */
  7. int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;
  8. /**
  9. * 全局事务实例的的名称
  10. *
  11. * @return Given name.
  12. */
  13. String name() default "";
  14. /**
  15. * 设置哪些异常类发生需要进行rollback
  16. * @return
  17. */
  18. Class<? extends Throwable>[] rollbackFor() default {};
  19. /**
  20. * 设置哪些异常类名发生需要进行rollback
  21. * @return
  22. */
  23. String[] rollbackForClassName() default {};
  24. /**
  25. * 设置哪些异常类发生不需要进行rollback
  26. * @return
  27. */
  28. Class<? extends Throwable>[] noRollbackFor() default {};
  29. /**
  30. * 设置哪些异常类名发生不需要进行rollback
  31. * @return
  32. */
  33. String[] noRollbackForClassName() default {};
  34. /**
  35. * 事务传播级别 默认REQUIRED
  36. *
  37. * @return
  38. */
  39. Propagation propagation() default Propagation.REQUIRED;
  40. }

1:seata客户端-TM(基于springcloud项目分析)

1.0:GlobalTransactionScanner

使用过spring的@Transactional事务的实现知道,它通过动态代理的方式,将事务的创建,提交或回滚这些公干的动作封装到一套执行模版中。这种方式在很多开源框架都是如此构建的例如mybtis中的各执行注解(@Update,@Select等),Springcloud中的Feign调用啊等。通过idea我们可以查看@GlobalTransactional该注解在什么地方被使用到,如下所示:

如果对于springboot的一些开源start(例如mybatis中MapperScannerRegistrar等)项目有过源码走读的经验,从GlobalTransactionScanner名字可以看出该类负责扫描GlobalTransaction注解并构建其代理方法(比较@GlobalTransactionScanner作用在方法中)。继续通过ieda的Find Usages功能寻找GobalTransactionScanner的引用,发现在seata的spring starter项目中的SeataAutoConfiguration对其进行初始化。

我们镜头回到GobalTransactionScanner中(对于SeataAutoConfiguration的其它作用后续描述)。GobalTransactionScanner实现了InitializingBean(bean初始化完成后执行),AbstractAutoProxyCreator,ApplicationContextAware(获取ApplicationContext对象),DisposableBean(bean被消耗时执行),分别对应的spring中bean不同的生命周期。如下所示是spring bean初始化完成后执行

 这里会存在一个疑问,为何要开启RM与TM两个client,如果对于某一个服务它在分布式事务链路中只是作为一个分支即RM的角色而非TM,那么对于这TM的启动是否没有存在必要,毕竟需要开启TM与TC之间的连接通道,也是一个资源的浪费。

1.1:客户端TM client

在1.2与1.3对于TM与TC之间连接的有关的管理类有着不同的命名

1.2的时候命名为TmRpcClient

对于1.3的时候改命名为TmNettyRemotingClient如下所示:

 其实不论上述两个版本核心都是通过Netty作为服务之间远程网络通信基础架构,所以1.3的改为TmNettyRemotingClient更简单表达底层实现原理。后续都以1.3最新版作为讲解

1.1.1:TmNettyRemotingClient(核心类,TM远程调用client)

  1. public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(TmNettyRemotingClient.class);
  3. private static volatile TmNettyRemotingClient instance;
  4. //长链接 keep-alive时间
  5. private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
  6. //常量 线程 等待队列长度
  7. private static final int MAX_QUEUE_SIZE = 2000;
  8. //是否初始化标示
  9. private final AtomicBoolean initialized = new AtomicBoolean(false);
  10. //配置applicationId唯一id
  11. private String applicationId;
  12. private String transactionServiceGroup;
  13. @Override
  14. public void init() {
  15. // 注册返回response 消息处理器
  16. registerProcessor();
  17. //初始化
  18. if (initialized.compareAndSet(false, true)) {
  19. super.init();
  20. }
  21. }
  22. private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
  23. EventExecutorGroup eventExecutorGroup,
  24. ThreadPoolExecutor messageExecutor) {
  25. super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
  26. }
  27. /**
  28. * 获取一个TmNettyRemotingClient
  29. *
  30. * @param applicationId the application id
  31. * @param transactionServiceGroup the transaction service group
  32. * @return the instance
  33. */
  34. public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
  35. //作为一个单列的形式获取TmNettyRemotingClient
  36. TmNettyRemotingClient tmNettyRemotingClient = getInstance();
  37. tmNettyRemotingClient.setApplicationId(applicationId);
  38. tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
  39. return tmNettyRemotingClient;
  40. }
  41. /**
  42. * 单例获取 懒汉式获取
  43. * @return the instance
  44. */
  45. public static TmNettyRemotingClient getInstance() {
  46. if (instance == null) {
  47. synchronized (TmNettyRemotingClient.class) {
  48. if (instance == null) {
  49. NettyClientConfig nettyClientConfig = new NettyClientConfig();
  50. //定义线程pool
  51. final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
  52. nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
  53. KEEP_ALIVE_TIME, TimeUnit.SECONDS,
  54. new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
  55. new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
  56. nettyClientConfig.getClientWorkerThreads()),
  57. RejectedPolicies.runsOldestTaskPolicy());
  58. instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
  59. }
  60. }
  61. }
  62. return instance;
  63. }
  64. /**
  65. * Sets application id.
  66. *
  67. * @param applicationId the application id
  68. */
  69. public void setApplicationId(String applicationId) {
  70. this.applicationId = applicationId;
  71. }
  72. /**
  73. * Sets transaction service group.
  74. *
  75. * @param transactionServiceGroup the transaction service group
  76. */
  77. public void setTransactionServiceGroup(String transactionServiceGroup) {
  78. this.transactionServiceGroup = transactionServiceGroup;
  79. }
  80. @Override
  81. public String getTransactionServiceGroup() {
  82. return transactionServiceGroup;
  83. }
  84. /**
  85. *注册成功回调
  86. */
  87. @Override
  88. public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
  89. AbstractMessage requestMessage) {
  90. RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
  91. RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
  92. if (LOGGER.isInfoEnabled()) {
  93. LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
  94. }
  95. getClientChannelManager().registerChannel(serverAddress, channel);
  96. }
  97. @Override
  98. public void onRegisterMsgFail(String serverAddress, Channel channel, Object response,
  99. AbstractMessage requestMessage) {
  100. RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
  101. RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
  102. String errMsg = String.format(
  103. "register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);
  104. throw new FrameworkException(errMsg);
  105. }
  106. /**
  107. * bean被销毁
  108. */
  109. @Override
  110. public void destroy() {
  111. super.destroy();
  112. initialized.getAndSet(false);
  113. instance = null;
  114. }
  115. @Override
  116. protected Function<String, NettyPoolKey> getPoolKeyFunction() {
  117. return (severAddress) -> {
  118. RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
  119. return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
  120. };
  121. }
  122. /**
  123. * 注册 TC response 有关处理器
  124. */
  125. private void registerProcessor() {
  126. //注册 TC response netty返回信息解析器
  127. ClientOnResponseProcessor onResponseProcessor =
  128. new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
  129. super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
  130. super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
  131. super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
  132. super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
  133. super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
  134. super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
  135. super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
  136. // 2.注册 heartbeat netty返回信息解析器
  137. ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
  138. super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
  139. }
  140. }
  141. //父类中init
  142. public void init() {
  143. //定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连
  144. timerExecutor.scheduleAtFixedRate(new Runnable() {
  145. @Override
  146. public void run() {
  147. clientChannelManager.reconnect(getTransactionServiceGroup());
  148. }
  149. }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
  150. //是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true
  151. if (NettyClientConfig.isEnableClientBatchSendRequest()) {
  152. //定义线程pool
  153. mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
  154. MAX_MERGE_SEND_THREAD,
  155. KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
  156. new LinkedBlockingQueue<>(),
  157. new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
  158. //运行MergedSendRunnable任务即合并发送请求
  159. mergeSendExecutorService.submit(new MergedSendRunnable());
  160. }
  161. super.init();
  162. //
  163. clientBootstrap.start();
  164. }

 NettyClientBootstrap是对于NettyClient的封装,对该类进行源码分析:

 

  1. public class NettyClientBootstrap implements RemotingBootstrap {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
  3. //client有关配置
  4. private final NettyClientConfig nettyClientConfig;
  5. //netty 启动类
  6. private final Bootstrap bootstrap = new Bootstrap();
  7. //netty worker
  8. private final EventLoopGroup eventLoopGroupWorker;
  9. //事件调度
  10. private EventExecutorGroup defaultEventExecutorGroup;
  11. //是否初始化标示
  12. private final AtomicBoolean initialized = new AtomicBoolean(false);
  13. private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
  14. private final NettyPoolKey.TransactionRole transactionRole;
  15. //netty handler事件
  16. private ChannelHandler[] channelHandlers;
  17. public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
  18. NettyPoolKey.TransactionRole transactionRole) {
  19. if (nettyClientConfig == null) {
  20. nettyClientConfig = new NettyClientConfig();
  21. if (LOGGER.isInfoEnabled()) {
  22. LOGGER.info("use default netty client config.");
  23. }
  24. }
  25. this.nettyClientConfig = nettyClientConfig;
  26. int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
  27. this.transactionRole = transactionRole;
  28. //nio event group
  29. this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
  30. new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
  31. selectorThreadSizeThreadSize));
  32. this.defaultEventExecutorGroup = eventExecutorGroup;
  33. }
  34. /**
  35. * Sets channel handlers.
  36. *
  37. * @param handlers the handlers
  38. */
  39. protected void setChannelHandlers(final ChannelHandler... handlers) {
  40. if (handlers != null) {
  41. channelHandlers = handlers;
  42. }
  43. }
  44. /**
  45. * Add channel pipeline last.
  46. *
  47. * @param channel the channel
  48. * @param handlers the handlers
  49. */
  50. private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {
  51. if (channel != null && handlers != null) {
  52. channel.pipeline().addLast(handlers);
  53. }
  54. }
  55. @Override
  56. public void start() {
  57. if (this.defaultEventExecutorGroup == null) {
  58. this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
  59. new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
  60. nettyClientConfig.getClientWorkerThreads()));
  61. }
  62. //初始化 netty client 并设置option属性
  63. this.bootstrap.group(this.eventLoopGroupWorker).channel(
  64. nettyClientConfig.getClientChannelClazz()).option(
  65. ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
  66. ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
  67. ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
  68. nettyClientConfig.getClientSocketRcvBufSize());
  69. if (nettyClientConfig.enableNative()) {
  70. if (PlatformDependent.isOsx()) {
  71. if (LOGGER.isInfoEnabled()) {
  72. LOGGER.info("client run on macOS");
  73. }
  74. } else {
  75. bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
  76. .option(EpollChannelOption.TCP_QUICKACK, true);
  77. }
  78. }
  79. //通过pipeline 绑定默认Handler 以及自定义handler
  80. bootstrap.handler(
  81. new ChannelInitializer<SocketChannel>() {
  82. @Override
  83. public void initChannel(SocketChannel ch) {
  84. ChannelPipeline pipeline = ch.pipeline();
  85. pipeline.addLast(
  86. /**
  87. * 设置channel 空闲状态处理器,是用来检测当前Handler的ChannelRead()的空闲时间
  88. * int readerIdleTimeSeconds 为读超时时间(即多长时间没有接受到客户端发送数据)
  89. * int writerIdleTimeSeconds, 为写超时时间(即多长时间没有向客户端发送数据)
  90. * int allIdleTimeSeconds 所有类型(读或写)的超时时间
  91. * 根据个参数IdleStateHandler会启动不同的定时任务,根据设定的时长去检测ChannelRead()方法是否被调用,
  92. * 如果没有被调用。之后则会调用后续handler的userEventTriggered方法去执行一些事情(比如断开链接)
  93. */
  94. new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
  95. nettyClientConfig.getChannelMaxWriteIdleSeconds(),
  96. nettyClientConfig.getChannelMaxAllIdleSeconds()))
  97. //设置编码解码器
  98. .addLast(new ProtocolV1Decoder())
  99. .addLast(new ProtocolV1Encoder());
  100. if (channelHandlers != null) {
  101. addChannelPipelineLast(ch, channelHandlers);
  102. }
  103. }
  104. });
  105. if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
  106. LOGGER.info("NettyClientBootstrap has started");
  107. }
  108. }
  109. @Override
  110. public void shutdown() {
  111. try {
  112. //关闭netty网络资源
  113. this.eventLoopGroupWorker.shutdownGracefully();
  114. if (this.defaultEventExecutorGroup != null) {
  115. this.defaultEventExecutorGroup.shutdownGracefully();
  116. }
  117. } catch (Exception exx) {
  118. LOGGER.error("Failed to shutdown: {}", exx.getMessage());
  119. }
  120. }
  121. /**
  122. *
  123. * 获取一个新的channel channel为与TC之间网络通道
  124. * @param address the address 网络地址
  125. * @return the new channel
  126. */
  127. public Channel getNewChannel(InetSocketAddress address) {
  128. Channel channel;
  129. //连接TC
  130. ChannelFuture f = this.bootstrap.connect(address);
  131. try {
  132. //等待超时时间内与Server端进行 若无法连接抛出异常
  133. f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
  134. if (f.isCancelled()) {
  135. throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
  136. } else if (!f.isSuccess()) {
  137. throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
  138. } else {
  139. channel = f.channel();
  140. }
  141. } catch (Exception e) {
  142. throw new FrameworkException(e, "can not connect to services-server.");
  143. }
  144. return channel;
  145. }
  146. /**
  147. * Gets thread prefix.
  148. *
  149. * @param threadPrefix the thread prefix
  150. * @return the thread prefix
  151. */
  152. private String getThreadPrefix(String threadPrefix) {
  153. return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
  154. }
  155. }

以上为TM大体的初始化过程,详细可自行研读源码


GlobalTransactionScanner中afterPropertiesSet解析完成之后,会执行AbstractAutoProxyCreator(该类用于为Bean生成代理对象,)中的wrapIfNecessary()方法,(AbstractAutoProxyCreator实际上实现了BeanPostProcessor接口,而wrapIfNecessary在postProcessAfterInitialization方法中被调用,因此它在afterPropertiesSet之后执行

wrapIfNecessary源码分析:

  1. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  2. //是否开启GlobalTransaction
  3. if (disableGlobalTransaction) {
  4. return bean;
  5. }
  6. try {
  7. //PROXYED_SET 记录已被代理
  8. synchronized (PROXYED_SET) {
  9. //该bean 是否已被代理 若已被无需重复代理
  10. if (PROXYED_SET.contains(beanName)) {
  11. return bean;
  12. }
  13. //MethodInterceptor 定义方法拦截器
  14. interceptor = null;
  15. //检测是否是TCC 模式下代理
  16. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  17. //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  18. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  19. } else {
  20. //非jdk代理 基于class方式
  21. Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  22. //如果bean是jdk代理(基于接口) 获取元Class
  23. Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  24. //是否包含Annotation
  25. if (!existsAnnotation(new Class[]{serviceInterface})
  26. && !existsAnnotation(interfacesIfJdk)) {
  27. return bean;
  28. }
  29. if (interceptor == null) {
  30. if (globalTransactionalInterceptor == null) {
  31. //构建globalTransactionalInterceptor
  32. globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  33. ConfigurationCache.addConfigListener(
  34. ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  35. (ConfigurationChangeListener)globalTransactionalInterceptor);
  36. }
  37. interceptor = globalTransactionalInterceptor;
  38. }
  39. }
  40. LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
  41. if (!AopUtils.isAopProxy(bean)) {
  42. //如果gaibean不是aop代理类
  43. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  44. } else {
  45. // 执行包装目标对象到代理对象
  46. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  47. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  48. for (Advisor avr : advisor) {
  49. advised.addAdvisor(0, avr);
  50. }
  51. }
  52. PROXYED_SET.add(beanName);
  53. return bean;
  54. }
  55. } catch (Exception exx) {
  56. throw new RuntimeException(exx);
  57. }
  58. }
  59. /**
  60. * 目标 classes的方法中是否包含GlobalTransactional 或GlobalLock 注解
  61. * @param classes
  62. * @return
  63. */
  64. private boolean existsAnnotation(Class<?>[] classes) {
  65. if (CollectionUtils.isNotEmpty(classes)) {
  66. for (Class<?> clazz : classes) {
  67. if (clazz == null) {
  68. continue;
  69. }
  70. GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
  71. if (trxAnno != null) {
  72. return true;
  73. }
  74. Method[] methods = clazz.getMethods();
  75. for (Method method : methods) {
  76. //是否包含GlobalTransactional注解
  77. trxAnno = method.getAnnotation(GlobalTransactional.class);
  78. if (trxAnno != null) {
  79. return true;
  80. }
  81. //GlobalLock
  82. GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
  83. if (lockAnno != null) {
  84. return true;
  85. }
  86. }
  87. }
  88. }
  89. return false;
  90. }

从上述代码中可看出,用GlobalTransactionalInterceptor 代替了GlobalTransactional 和 GlobalLock 注解的方法

1.3:GlobalTransactionalInterceptor(全局事务拦截器)

该类用于代理处理@GlobalTransactional被执行,如下源码所示:

  1. public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
  2. private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
  3. private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
  4. //事务模版类
  5. private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
  6. private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
  7. //失败处理器
  8. private final FailureHandler failureHandler;
  9. private volatile boolean disable;
  10. //服务自检周期 默认2000,单位ms.每2秒进行一次服务自检,来决定
  11. private static int degradeCheckPeriod;
  12. //降级检测开关 降级开关 默认false。业务侧根据连续错误数自动降级不走seata事务
  13. private static volatile boolean degradeCheck;
  14. //升降级达标阈值 默认10
  15. private static int degradeCheckAllowTimes;
  16. private static volatile Integer degradeNum = 0;
  17. private static volatile Integer reachNum = 0;
  18. private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
  19. //用于周期检测是否降级 执行器 应该在degradeCheck =true时被初始化
  20. private static ScheduledThreadPoolExecutor executor =
  21. new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
  22. /**
  23. * Instantiates a new Global transactional interceptor.
  24. *
  25. * @param failureHandler
  26. * the failure handler
  27. */
  28. public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
  29. //初始化动作
  30. this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
  31. this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
  32. DEFAULT_DISABLE_GLOBAL_TRANSACTION);
  33. degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
  34. DEFAULT_TM_DEGRADE_CHECK);
  35. //开启降级设置
  36. if (degradeCheck) {
  37. ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
  38. degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(
  39. ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
  40. degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(
  41. ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
  42. EVENT_BUS.register(this);
  43. if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
  44. startDegradeCheck();
  45. }
  46. }
  47. }
  48. /**
  49. * 代理方法调用逻辑
  50. * @param methodInvocation 被代理的原方法
  51. * @return
  52. * @throws Throwable
  53. */
  54. @Override
  55. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
  56. //获取方法所属类
  57. Class<?> targetClass =
  58. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
  59. //获取执行具体的Method对象
  60. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  61. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
  62. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
  63. //获取GlobalTransactional注解对象 获取定义属性
  64. final GlobalTransactional globalTransactionalAnnotation =
  65. getAnnotation(method, targetClass, GlobalTransactional.class);
  66. //获取GlobalLock对象
  67. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
  68. //是否被降级或者开启全局事务
  69. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
  70. if (!localDisable) {
  71. //判定globalTransactional注解还是globalLock全局锁对象
  72. if (globalTransactionalAnnotation != null) {
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/242262
推荐阅读
相关标签
  

闽ICP备14008679号