赞
踩
Seata 是阿里开源的分布式事务框架,属于二阶段提交模式,为用户提供了AT、TCC、SAGA 和 XA 事务模式。
在微服务架构中,以下3个模块会变为3个独立的微服务,各自有自己的数据源,Seata 调用逻辑就变为:
Business 是业务入口,在程序中会通过注解来说明他是一个全局事务,这时他的角色为 TM(事务管理者)。
(1)全局事务的回滚是如何实现的呢?
Seata 有一个重要的机制:回滚日志。每个分支事务对应的数据库中都需要有一个回滚日志表 UNDO_LOG,在真正修改数据库记录之前,都会先记录修改前的记录值,以便之后回滚。在收到回滚请求后,就会根据 UNDO_LOG 生成回滚操作的 SQL 语句来执行。如果收到的是提交请求,就把 UNDO_LOG 中的相应记录删除掉。
(2)RM 是怎么自动和 TC 交互的?
是通过监控拦截JDBC实现的,例如监控到开启本地事务了,就会自动向 TC 注册、生成回滚日志、向 TC 汇报执行结果。
(3)二阶段回滚失败怎么办?
例如 TC 命令各个 RM 回滚的时候,有一个微服务挂掉了,那么所有正常的微服务也都不会执行回滚,当这个微服务重新正常运行后,TC 会重新执行全局回滚。
(1)注意
TBD
从官网下载Seata服务端后,conf目录下会有两个配置文件(file.conf和registry.conf),file.conf应该是存储信息用的,registry.conf代表启动的时候向哪个注册中心注册,以便后续健康检查用的。默认都是file形式。
启动命令(sh seata-server.sh -p 8091 -h 127.0.0.1 -m file),fescar版本的服务端不能指定域名信息。
SeataAutoConfiguration作为配置入口,完成初始化工作。初始化的时候,会开启很多定时任务或者线程池,有的会隐藏在类实例化内(难找)。
- // io.seata.spring.annotation.GlobalTransactionScanner#initClient
- private void initClient() {
- // ...
- //TM初始化
- TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
- // ...
- //RM初始化
- RMClient.init(applicationId, txServiceGroup);
- // ...
- }
不论Seata配置的配置中心类型是什么,优先从系统配置获取值信息(System.getProperty)。也就是说如果项目本身就集成例如nacos的,理论上不会触发seata配置的nacos地址去读取需要的配置信息。
- // io.seata.config.FileConfiguration#getLatestConfig
- public String getLatestConfig(String dataId, String defaultValue, long timeoutMills) {
- // 获取系统配置(key为service.vgroupMapping.XXXX-seata-service-group)
- String value = getConfigFromSysPro(dataId);
- if (value != null) {
- return value;
- }
- // 异步获取本地文件配置
- ConfigFuture configFuture = new ConfigFuture(dataId, defaultValue, ConfigOperation.GET, timeoutMills);
- configOperateExecutor.submit(new ConfigOperateRunnable(configFuture));
- Object getValue = configFuture.get();
- return getValue == null ? null : String.valueOf(getValue);
- }
根据配置的注册中心类型,走不同的RegistryProvider实现。所有获取Seata服务端地址的时候,会从配置中拉取配置的地址信息,再根据注册中心类型进行健康检查,剔除一些不可用的Seata服务端,保证Seata服务端的高可用。
(1)ServiceLoader加载器加载(这个会加载指定接口的实现)
- // io.seata.discovery.registry.RegistryFactory#buildRegistryService
- private static RegistryService buildRegistryService() {
- RegistryType registryType;
- String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
- ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
- + ConfigurationKeys.FILE_ROOT_TYPE);
- try {
- registryType = RegistryType.getType(registryTypeName);
- } catch (Exception exx) {
- throw new NotSupportYetException("not support registry type: " + registryTypeName);
- }
- if (RegistryType.File == registryType) {
- // 文件类型直接返回实现
- return FileRegistryServiceImpl.getInstance();
- } else {
- return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
- }
- }
(2) 检测Seata服务是否可用
以Zk为例。
- // io.seata.discovery.registry.zk.ZookeeperRegisterServiceImpl#lookup
- public List<InetSocketAddress> lookup(String key) throws Exception {
- // 获取配置的Seata服务地址
- String clusterName = getServiceGroup(key);
-
- if (clusterName == null) {
- return null;
- }
- // 根据不同的注册中心类型检测服务是否可用
- return doLookup(clusterName);
- }
-
- // io.seata.discovery.registry.zk.ZookeeperRegisterServiceImpl#doLookup
- List<InetSocketAddress> doLookup(String clusterName) throws Exception {
- boolean exist = getClientInstance().exists(ROOT_PATH + clusterName);
- if (!exist) {
- return null;
- }
-
- if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
- List<String> childClusterPath = getClientInstance().getChildren(ROOT_PATH + clusterName);
- refreshClusterAddressMap(clusterName, childClusterPath);
- subscribeCluster(clusterName);
- }
-
- return CLUSTER_ADDRESS_MAP.get(clusterName);
- }
- @GlobalTransactional(timeoutMills = 300000, name = "spring-cloud-demo-tx")
- @RequestMapping(value = "/order", method = RequestMethod.POST, produces = "application/json")
- public String order(String userId, String commodityCode, int orderCount) {
- // ...
- }
(1)GlobalTransactionScanner继承AbstractAutoProxyCreator,实现全局事务拦截器的加载(AOP原理)
- // io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary
- protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
- try {
- synchronized (PROXYED_SET) {
- if (PROXYED_SET.contains(beanName)) {
- return bean;
- }
- interceptor = null;
- //check TCC proxy
- if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
- //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
- interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)interceptor);
- } else {
- Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
- Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
-
- if (!existsAnnotation(new Class[]{serviceInterface})
- && !existsAnnotation(interfacesIfJdk)) {
- return bean;
- }
-
- if (interceptor == null) {
- if (globalTransactionalInterceptor == null) {
- globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
- ConfigurationCache.addConfigListener(
- ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)globalTransactionalInterceptor);
- }
- interceptor = globalTransactionalInterceptor;
- }
- }
-
- LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
- if (!AopUtils.isAopProxy(bean)) {
- bean = super.wrapIfNecessary(bean, beanName, cacheKey);
- } else {
- AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
- Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
- for (Advisor avr : advisor) {
- advised.addAdvisor(0, avr);
- }
- }
- PROXYED_SET.add(beanName);
- return bean;
- }
- } catch (Exception exx) {
- throw new RuntimeException(exx);
- }
- }
- // io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
- public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
- Class<?> targetClass =
- methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
- Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
- if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
- final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
- final GlobalTransactional globalTransactionalAnnotation =
- getAnnotation(method, targetClass, GlobalTransactional.class);
- final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
- boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
- if (!localDisable) {
- if (globalTransactionalAnnotation != null) {
- // 处理全局事务
- return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
- } else if (globalLockAnnotation != null) {
- return handleGlobalLock(methodInvocation, globalLockAnnotation);
- }
- }
- }
- return methodInvocation.proceed();
- }
XID是Seata分布式事务的一个全局上下文,不论各个项目通过何种方式调用(Dubbo、ACM、SringCloud、Http等等),都需要投传这个XID,例如Dubbo可以用拦截器拦截进行透传绑定;本地为了方便可以直接利用Http透传给下个业务系统进行绑定。
(1)TM申请XID
- // io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)
- public Object sendSyncRequest(Object msg) throws TimeoutException {
- // 获取可用的Seata服务端地址
- String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
- int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
- RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
-
- // 发送异步请求,获取XID信息
- if (NettyClientConfig.isEnableClientBatchSendRequest()) {
-
- // send batch message is sync request, needs to create messageFuture and put it in futures.
- MessageFuture messageFuture = new MessageFuture();
- messageFuture.setRequestMessage(rpcMessage);
- messageFuture.setTimeout(timeoutMillis);
- futures.put(rpcMessage.getId(), messageFuture);
-
- // put message into basketMap
- BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
- key -> new LinkedBlockingQueue<>());
- if (!basket.offer(rpcMessage)) {
- LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
- serverAddress, rpcMessage);
- return null;
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("offer message: {}", rpcMessage.getBody());
- }
- if (!isSending) {
- synchronized (mergeLock) {
- mergeLock.notifyAll();
- }
- }
-
- try {
- return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
- } catch (Exception exx) {
- LOGGER.error("wait response error:{},ip:{},request:{}",
- exx.getMessage(), serverAddress, rpcMessage.getBody());
- if (exx instanceof TimeoutException) {
- throw (TimeoutException) exx;
- } else {
- throw new RuntimeException(exx);
- }
- }
-
- } else {
- Channel channel = clientChannelManager.acquireChannel(serverAddress);
- return super.sendSync(channel, rpcMessage, timeoutMillis);
- }
-
- }
(2)下游系统绑定上游系统的XID信息
- @RequestMapping(value = "/account", method = RequestMethod.POST, produces = "application/json")
- public String account(String userId, int money, String xid) {
- // 拿到上游系统的xid后,需要进行绑定xid,不然谁知道你是啥
- RootContext.bind(xid);
- LOGGER.info("Account Service ... xid: " + RootContext.getXID());
-
- int result = jdbcTemplate.update(
- "update account_tbl set money = money - ? where user_id = ?",
- new Object[] { money, userId });
- LOGGER.info("Account Service End ... ");
- if (result == 1) {
- return SUCCESS;
- }
- return FAIL;
- }
- // io.seata.tm.api.TransactionalTemplate#execute
- public Object execute(TransactionalExecutor business) throws Throwable {
- // ...
- try {
- // 开启全局事务,主要就是绑定xid
- beginTransaction(txInfo, tx);
-
- Object rs;
- try {
- // 业务内容执行
- rs = business.execute();
- } catch (Throwable ex) {
- // 如果是指定错误类型(可扩展),就回滚,否则就提交
- completeTransactionAfterThrowing(txInfo, tx, ex);
- throw ex;
- }
-
- // 通知TC提交事务
- commitTransaction(tx);
-
- return rs;
- } finally {
- //5. clear
- resumeGlobalLockConfig(previousConfig);
- triggerAfterCompletion();
- cleanUp();
- }
- } finally {
- // If the transaction is suspended, resume it.
- if (suspendedResourcesHolder != null) {
- tx.resume(suspendedResourcesHolder);
- }
- }
- }
本地Mysql事务开启的时候需要用Seata数据库代理类进行拦截处理。
- public JdbcTemplate jdbcTemplate(DruidDataSource druidDataSource) {
- // 使用Seata数据库代理类
- DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
-
- JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSourceProxy);
-
- // ...
-
- return jdbcTemplate;
- }
- // org.springframework.jdbc.core.JdbcTemplate#execute(org.springframework.jdbc.core.PreparedStatementCreator, org.springframework.jdbc.core.PreparedStatementCallback<T>)
- public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) throws DataAccessException {
- // ...
- try {
- // 获取Sql预处理表达式,AbstractConnectionProxy会进行拦截处理
- ps = psc.createPreparedStatement(con);
- this.applyStatementSettings(ps);
- T result = action.doInPreparedStatement(ps);
- this.handleWarnings((Statement)ps);
- var13 = result;
- } catch (SQLException var10) {
- // ...
- } finally {
- // ...
- }
-
- return var13;
- }
这里似乎主要是针对AT模式下,Undo_log的插入语句进行拦截预处理。
- // io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)
- public PreparedStatement prepareStatement(String sql) throws SQLException {
- String dbType = getDbType();
- // support oracle 10.2+
- PreparedStatement targetPreparedStatement = null;
- if (BranchType.AT == RootContext.getBranchType()) {
- List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
- if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
- SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
- if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
- // 这里主要识别Undo_log的sql语句,但是其本身是插入语句的话没试过
- TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
- sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
- String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
- tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
- targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
- }
- }
- }
- if (targetPreparedStatement == null) {
- targetPreparedStatement = getTargetConnection().prepareStatement(sql);
- }
- return new PreparedStatementProxy(this, targetPreparedStatement, sql);
- }
- // io.seata.rm.datasource.ConnectionProxy#doCommit
- private void doCommit() throws SQLException {
- if (context.inGlobalTransaction()) {
- // 如果上下文中能拿到xid,进来(如果没绑定xid,那么就不会当作分布式事务处理)
- processGlobalTransactionCommit();
- } else if (context.isGlobalLockRequire()) {
- processLocalCommitWithGlobalLocks();
- } else {
- targetConnection.commit();
- }
- }
- // io.seata.rm.datasource.ConnectionProxy#processGlobalTransactionCommit
- private void processGlobalTransactionCommit() throws SQLException {
- try {
- // 向TC注册分支ID(branchId)
- register();
- } catch (TransactionException e) {
- recognizeLockKeyConflictException(e, context.buildLockKeys());
- }
- try {
- UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
- targetConnection.commit();
- } catch (Throwable ex) {
- LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
- // 事务异常后向TC,也就是Seata服务端汇报失败
- report(false);
- throw new SQLException(ex);
- }
- if (IS_REPORT_SUCCESS_ENABLE) {
- // 事务异常后向TC,也就是Seata服务端汇报成功
- report(true);
- }
- context.reset();
- }
(1)插入undo_log的sql
- // io.seata.rm.datasource.undo.AbstractUndoLogManager#flushUndoLogs
- public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
- // ...
- BranchUndoLog branchUndoLog = new BranchUndoLog();
- branchUndoLog.setXid(xid);
- branchUndoLog.setBranchId(branchId);
- branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
-
- UndoLogParser parser = UndoLogParserFactory.getInstance();
- // 这里将本次执行的sql信息转换成二进制
- byte[] undoLogContent = parser.encode(branchUndoLog);
-
- // ...
- insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
- cp.getTargetConnection());
- }
(1)RM初始化的时候会启动clientBootstrap
NettyClientBootstrap内部包装了netty的Bootstrap,其中会向Bootstrap绑定一个ClientHandler用来处理seata server返回来的消息,最终通过Bootstrap建立与server端的连接。
- // io.seata.core.rpc.netty.AbstractNettyRemotingClient#init
- public void init() {
- // ...
- clientBootstrap.start();
- }
(2)RM客户端接收TC消息
- // io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler#channelRead
- public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
- if (!(msg instanceof RpcMessage)) {
- return;
- }
- processMessage(ctx, (RpcMessage) msg);
- }
(3)处理TC消息
根据TC返回的消息,例如分支提交、回滚、删除Undo_log信息等等。
- //
- protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
- // ...
- Object body = rpcMessage.getBody();
- if (body instanceof MessageTypeAware) {
- MessageTypeAware messageTypeAware = (MessageTypeAware) body;
- final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
- if (pair != null) {
- if (pair.getSecond() != null) {
- try {
- pair.getSecond().execute(() -> {
- try {
- // 根据不同的消息类型执行不同的处理器
- pair.getFirst().process(ctx, rpcMessage);
- } catch (Throwable th) {
- LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
- }
- });
- } catch (RejectedExecutionException e) {
- LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
- "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
- if (allowDumpStack) {
- String name = ManagementFactory.getRuntimeMXBean().getName();
- String pid = name.split("@")[0];
- int idx = new Random().nextInt(100);
- try {
- Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
- } catch (IOException exx) {
- LOGGER.error(exx.getMessage());
- }
- allowDumpStack = false;
- }
- }
- } else {
- try {
- pair.getFirst().process(ctx, rpcMessage);
- } catch (Throwable th) {
- LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
- }
- }
- } else {
- LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
- }
- } else {
- LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
- }
- }
4.1 为什么Seata在第一阶段就直接提交了分支事务
Seata能够在第一阶段直接提交事务,是因为Seata框架为每一个RM维护了一张UNDO_LOG表(这张表需要客户端自行创建),其中保存了每一次本地事务的回滚数据。因此,二阶段的回滚并不依赖于本地数据库事务的回滚,而是RM直接读取这张UNDO_LOG表,并将数据库中的数据更新为UNDO_LOG中存储的历史数据。如果第二阶段是提交命令,那么RM事实上并不会对数据进行提交(因为一阶段已经提交了),而实发起一个异步请求删除UNDO_LOG中关于本事务的记录。
4.2 使用fescar遇到can not register RM,err:can not connect to fescar-server
SpringBoot项目集成的是seata-spring-boot-starter,结果用的服务端是Fescar。这两个还是有区别的(Seata是Fescar改名后的产物),如果项目中集成seata-spring-boot-starter,就去官网下载seata服务端。网上说是因为内网IP的问题,涉及到改Fescar服务端源码。Fescar服务端不能指定IP地址,Seata可以指定IP地址(sh seata-server.sh -p 8091 -h 127.0.0.1 -m file)。
4.3 Netty相关操作
4.4 AT、TCC、SAGA 和 XA相关概念
4.5 分库分表情况下,分布式事务是如何操作
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。