当前位置:   article > 正文

分布式事务保证服务间的数据一致性:使用MQ消息通知机制实现“实时性要求不高”的最大努力通知_mq如何保证数据一致性

mq如何保证数据一致性

一、前言

标题比较长,这样理解,
分布式事务保证服务间的数据一致性
事务是保证单体架构的一个服务对应一个mysql数据库,的数据一致性。
分布式事务是保证微服务架构的多个微服务、服务之间的,对应的多个mysql数据库(mysql集群)的数据一致性。
后端从单体架构到微服务架构,mysql从单体架构到分布式mysql集群,所以,数据一致性从事务到分布式事务。

使用MQ消息通知机制实现“实时性要求不高”的最大努力通知
对于最大努力通知,注意三点:
1、它勉强可以看做是一种分布式事务的实现方式(理由:这里使用了“勉强”,因为它本质上是一种消息通知机制,但是消息可以用来通知数据一致性);
2、它是基于MQ消息队列来实现的,理由:自己看下面就知道,它是基于MQ实现的;
3、它是一种实时性不高的机制,理由:基于MQ的异步消息通知都是实时性不高的。

二、定义:什么是最大努力通知(重点001 消息重复+消息校对)

最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:

在这里插入图片描述

对于上图的解释,整个流程:
1、账户系统调用充值系统接口
2、被动:充值系统完成支付处理向账户系统发起充值结果通知,若通知失败,则充值系统按策略进行重复通知
3、主动-消息队列:账户系统接收到充值结果通知修改充值状态。
4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

通过上边的例子我们总结最大努力通知方案的目标:
最大努力通知的目标:顾名思义,发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

最大努力通知,为达到目标的两步实现:
1、消息重复机制。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

两种分布式事务方案,最大努力通知与可靠消息一致性有什么不同?
1、解决方案思想不同:消息可靠性的保证方不同
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证
最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方
2、两者的业务应用场景不同
可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决方向不同
可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

三、理论:最大努力通知的两种解决方案(重点002)

通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。

3.1 方案一 producer-MQ-consumer(producer与内部应用consumer之间的通知)

在这里插入图片描述

本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:
1、发起通知方将通知发给MQ。使用普通消息机制将通知发给MQ。
注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。(后边会讲)
2、接收通知方监听 MQ。
3、接收通知方接收消息,业务处理完成回应ack。
4、消息重复机制(consumer-mq)的应用:接收通知方若没有回应ack则MQ会重复通知。
MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
5、消息校对机制(consumer-producer)的应用:接收通知方可通过消息校对接口来校对消息的一致性。

注意1:消息重复机制 mq-consumer:通知发起方将消息生产到MQ中,通知接收方来取出消息,如果没有来取,MQ按时间间隔通知。
注意2:消息重复机制 mq-consumer:去做通知的是MQ,而不是通知发起方。

3.2 方案二 producer-MQ-application-consumer(producer与外部应用application之间的通知)

本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:

在这里插入图片描述

对于上图的解释,交互流程如下:
1、发起通知方将通知发给MQ。
使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。
2、通知程序监听 MQ,接收MQ的消息。
区别:方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。
消息重复机制(mq-application):通知程序若没有回应ack则MQ会重复通知。
3、通知程序收到通知后,通过互联网接口协议(如http、webservice) 调用 接收通知方案接口,完成通知。
通知程序 调用 接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消息。
4、消息校对机制(consumer-producer):接收通知方可通过消息校对接口来校对消息的一致性。

3.3 方案一和方案二区别(消息重复机制不同,消息校对机制是相同的,应用不同)

方案1和方案2的不同点:消息重复机制不同,消息校对机制是相同的
1、消息重复机制:方案1中接收通知方与MQ接口(mq-consumer),即接收通知方案监听 MQ,此方案主要应用producer与内部应用consumer之间的通知
2、消息重复机制:方案2中由通知程序与MQ接口(mq-application),通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收通知方。此方案主要应用producer与外部应用application之间的通知,例如支付宝、微信的支付结果通知。

四、实践:RocketMQ实现最大努力通知型事务

4.1 业务说明(重点003)

本实例通过RocketMq中间件实现最大努力通知型分布式事务,模拟充值过程。

本案例有账户系统和充值系统两个微服务,其中,
账户系统的数据库是bank1数据库,记录的是张三账户信息;
充值系统的数据库是bank1_pay数据库,记录的是账户的充值记录。

业务流程如下图:

在这里插入图片描述

对于上图解释,交互流程如下:充值系统是消息发送方,账户系统是消息接收方,修改账户信息
1、用户请求充值系统进行充值。
2、充值系统完成充值将充值结果发给MQ。
3、消息重复机制的实现:账户系统监听MQ,接收充值结果通知账户系统修改账户余额,如果接收不到ack,MQ会重复发送通知。
4、消息校对机制的实现:账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。

4.2 程序组成部分

本示例程序组成部分如下:

数据库:MySQL-5.7.25

包括bank1和bank1_pay两个数据库。

JDK:64位 jdk1.8.0_201

rocketmq 服务端:RocketMQ-4.5.0

rocketmq 客户端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE

微服务框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE

微服务及数据库的关系 :

dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 银行1,操作张三账户, 连接数据库bank1

dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 银行2,操作充值记录,连接数据库bank1_pay

在这里插入图片描述

交互流程如下:
1、用户请求充值系统进行充值。
2、充值系统完成充值将充值结果发给MQ。
3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值金额。
4、账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。

4.3 创建数据库

导入数据库脚本:资料\sql\bank1.sql、资料\sql\bank1_pay.sql,已经导过不用重复导入。创建bank1库,并导入以下表结构和数据(包含张三账户)

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`    (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
    `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
    `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
    `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE
)    ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);

DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication`    (
    `tx_no`    varchar(64) COLLATE utf8_bin NOT NULL,
    `create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

创建bank1_pay库,并导入以下表结构:

CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` (
    `id` varchar(64) COLLATE utf8_bin NOT NULL,
    `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账号', 
    `pay_amount` double NULL DEFAULT NULL COMMENT '充值余额',
    `result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值结果:success,fail', 
    PRIMARY KEY (`id`) USING BTREE
)ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4.4 启动RocketMQ

rocketmq启动方式与RocketMQ实现可靠消息最终一致性事务中完全一致

4.5 EurekaServer:discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。

导入:资料\基础代码\dtx 父工程,此工程自带了discover-server,discover-server基于Eureka实现。

已经导过不用重复导入。

4.6 导入dtx-notifymsg-demo

dtx-notifymsg-demo是本方案的测试工程,根据业务需求需要创建两个dtx-notifymsg-demo工程。

4.6.1 导入dtx-notifymsg-demo

导入:资料\基础代码\dtx-notifymsg-demo到父工程dtx下。

两个测试工程如下:

dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 ,操作张三账户,连接数据库bank1

dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay,操作李四账户,连接数据库bank1_pay

4.6.2 父工程maven依赖说明

在dtx父工程中指定了SpringBoot和SpringCloud版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐dependencies</artifactId>
    <version>2.1.3.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring‐cloud‐dependencies</artifactId>
    <version>Greenwich.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在dtx-notifymsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq‐spring‐boot‐starter</artifactId>
    <version>2.0.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

4.6.3 配置rocketMQ

在application-local.propertis中配置rocketMQ nameServer地址及生产组:

rocketmq.producer.group = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876

其它详细配置见导入的基础工程。

4.7 dtx-notifydemo-pay

4.7.1 dtx-notifydemo-pay要实现的功能

dtx-notifydemo-pay实现如下功能:

1、充值接口

2、充值完成要通知

3、充值结果查询接口

4.7.2 Dao

@Mapper
@Component
public interface AccountPayDao {

    @Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})")
    int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result);
    
    @Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")
    AccountPay findByIdTxNo(@Param("txNo") String txNo);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

4.7.3 Service

@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService{

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Autowired
    AccountPayDao accountPayDao;

    @Transactional
    @Override
    public AccountPay insertAccountPay(AccountPay accountPay) {
        int result = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
        if(result>0){
            //发送通知
            rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay); 
            return accountPay;
        }
        return null;
    }

    @Override
    public AccountPay getAccountPay(String txNo) {
        AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
        return accountPay;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

4.7.4 Controller

@RestController
public class AccountPayController {

    @Autowired
    AccountPayService accountPayService;

    //充值
    @GetMapping(value = "/paydo")
    public AccountPay pay(AccountPay accountPay){
        //事务号
        String txNo = UUID.randomUUID().toString();
        accountPay.setId(txNo);
        return accountPayService.insertAccountPay(accountPay);
    }

    //查询充值结果
    @GetMapping(value = "/payresult/{txNo}")
    public AccountPay payresult(@PathVariable("txNo") String txNo){ 
        return accountPayService.getAccountPay(txNo);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4.8 dtx-notifydemo-bank1

dtx-notifydemo-bank1实现如下功能:
1、监听MQ,接收充值结果,根据充值结果完成账户金额修改。
2、主动查询充值系统,根据充值结果完成账户金额修改。

4.8.1 Dao

@Mapper
@Component
public interface AccountInfoDao {

    //修改账户金额
    @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    //查询幂等记录,用于幂等控制
    @Select("select count(1) from de_duplication where tx_no = #{txNo}")
    int isExistTx(String txNo);
     
    //添加事务记录,用于幂等控制
    @Insert("insert into de_duplication values(#{txNo},now());")
    int addTx(String txNo);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

4.8.2 AccountInfoService

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    PayClient payClient;

    /**
    *    更新帐号余额,并发送消息
    *    
    *    @param accountChange */
    @Transactional
    @Override
    public void updateAccountBalance(AccountChangeEvent accountChange) { 
        //幂等校验
        int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); 
        if(existTx >0){
            log.info("已处理消息:{}", JSONObject.toJSONString(accountChange)); 
            return ;
        }
        //添加事务记录
        accountInfoDao.addTx(accountChange.getTxNo());
        //更新账户金额
        accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
    }

    /**
    *    主动查询充值结果
    *    
    *    @param tx_no */
    @Override
    public AccountPay queryPayResult(String tx_no) {
        //主动请求充值系统查询充值结果
        AccountPay accountPay = payClient.queryPayResult(tx_no); 
        //充值结果
        String result = accountPay.getResult();
        log.info("主动查询充值结果:{}", JSON.toJSONString(accountPay)); 
        if("success".equals(result)){
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(accountPay.getAccountNo()); 
            accountChangeEvent.setAmount(accountPay.getPayAmount()); 
            accountChangeEvent.setTxNo(accountPay.getId()); 
            updateAccountBalance(accountChangeEvent);
        }
        return accountPay;
        }
    }
}


@FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class)
public interface PayClient {
    @GetMapping("/pay/payresult/{txNo}")
    AccountPay queryPayResult(@PathVariable("txNo") String txNo);
}

@Component
public class PayFallback implements PayClient {

    @Override
    public AccountPay queryPayResult(String txNo) {
        AccountPay accountPay = new AccountPay();
        accountPay.setResult("fail");
        return accountPay;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

4.8.3 监听MQ

@Component
@Slf4j
@RocketMQMessageListener(topic="topic_notifymsg",consumerGroup="consumer_group_notifymsg_bank1") 
public class NotifyMsgListener implements RocketMQListener<AccountPay> {

    @Autowired
    AccountInfoService accountInfoService;

    @Override
    public void onMessage(AccountPay accountPay) {
        log.info("接收到消息:{}", JSON.toJSONString(accountPay));
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
        accountChangeEvent.setAmount(accountPay.getPayAmount());
        accountChangeEvent.setAccountNo(accountPay.getAccountNo());
        accountChangeEvent.setTxNo(accountPay.getId());
        accountInfoService.updateAccountBalance(accountChangeEvent);
        log.info("处理消息完成:{}", JSON.toJSONString(accountChangeEvent));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4.8.4 Controller

@RestController
@Slf4j
public class AccountInfoController {

    @Autowired
    private AccountInfoService accountInfoService;

    //主动查询充值结果
    @GetMapping(value = "/payresult/{txNo}")
    public AccountPay result(@PathVariable("txNo") String txNo){ 
        AccountPay accountPay = accountInfoService.queryPayResult(txNo); 
        return accountPay;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

4.9 运行结果:两个

最终是通过AccountInfoServiceImpl类的updateAccountBalance()方法,实现查询的,注意,这个方面上面加上@Transactional注解

 /**
    *    更新帐号余额,并发送消息
    *    
    *    @param accountChange */
    @Transactional
    @Override
    public void updateAccountBalance(AccountChangeEvent accountChange) { 
        //幂等校验
        int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); 
        if(existTx >0){
            log.info("已处理消息:{}", JSONObject.toJSONString(accountChange)); 
            return ;
        }
        //添加事务记录
        accountInfoDao.addTx(accountChange.getTxNo());
        //更新账户金额
        accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

调用这个方法的唯二的两个地方:AccountInfoServiceImpl 类 queryPayResult() 和 NotifyMsgListener 类的onMessage()方法,前者表示,消息接收方主动查询,后者表示,消息接收方监听到RocketMQ的变化而调用
所以,最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务;最大努力通知方案需要实现如下功能:
1、消息重复通知机制。充值系统充值成功,发送消息,账户系统接收消息,修改账户金额。也要注意,账户系统修改账户金额幂等测试。
2、消息校对机制。充值系统充值成功,账户系统主动查询充值结果,修改账户金额。注意,账户系统修改账户金额幂等测试。

注意1,账户系统修改账户金额幂等测试。
public void updateAccountBalance(AccountChangeEvent accountChange) {
//幂等校验
int existTx = accountInfoDao.isExistTx(accountChange.getTxNo());
if(existTx >0){
log.info(“已处理消息:{}”, JSONObject.toJSONString(accountChange));
return ;
}
//添加事务记录
accountInfoDao.addTx(accountChange.getTxNo());
//更新账户金额
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
}
如何实现幂等性?
第一,数据库层面,在创建账户表的时候,额外创建一张重复表,重复表记录账户流水号和流水号时间,账户表和重复表都是通过账户流水号来确定唯一性的;
第二,后端代码service层面,插入和修改账户记录,先查重复表里是否有这个账户流水号,如果有了,说明请求来了,不再插入,如果重复表中没有,重复表插入流水记录,账户表完成操作(更新或插入)
注意1:更新或插入或删除要这么做,因为会改变数据,防止更新、删除、插入多次,特别是防止插入多次,但是查询不需要重复表,因为不改变数据。
注意2:一定要先完成,重复表插入流水记录,然后账户表相关操作(插入、更新或删除),否则,重复表判断isExistTx()没有任何意义,其实,在重复表插入流水记录,然后账户表相关操作(插入、更新或删除)中间插一个Thread.sleep(1000),可以让问题在高并发下暴露的更加明显。

注意2,如何实现消息通信
(1)在application.properties中配置好RocketMQ的地址,
(2)生产者,直接将消息放到topic_notifymsg中,
消费者,直接从topic_notifymsg中取出消息。

五、面试金手指

5.1 分布式事务:最大努力通知

标准答案: 最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务;最大努力通知方案需要实现如下功能:
1、消息重复通知机制。
2、消息校对机制。

最大努力通知方案主要也是借助MQ消息系统来进行事务控制,这一点与可靠消息最终一致方案一样。看来MQ中间件确实在一个分布式系统架构中,扮演者重要的角色。最大努力通知方案是比较简单的分布式事务方案,它本质上就是通过定期校对,实现数据一致性。
一.最大努力通知方案的实现
1.业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
2.主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。
3.主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息。
4.业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。
5.如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。
二.最大努力通知方案的特点
1.用到的服务模式:可查询操作、幂等操作。
2.被动方的处理结果不影响主动方的处理结果;
3.适用于对业务最终一致性的时间敏感度低的系统;
4.适合跨企业的系统间的操作,或者企业内部比较独立的系统间的操作,比如银行通知、商户通知等;
三.最大努力通知方案的设计
相比于可靠消息最终一致方案,最大努力通知方案设计上比较简单,主要是由两部分构成。
1.实时消息服务(MQ):接收主动方发送的MQ消息。
2.通知服务子系统:监听MQ消息,当收到消息后,向被动方发送通知(一般是URL方式),同时生成通知记录。如果没有接收到被动方的返回消息,就根据通知记录进行重复通知。
仅适用于实时性不太高的场合:最大努力通知方案实现方式比较简单,本质上就是通过定期校对,适用于数据一致性时间要求不太高的场合,其实不把它看作是分布式事务方案,只认为是一种跨平台的数据处理方案也是可以的

5.2 最大努力通知两种方式,最重点

调用这个方法的唯二的两个地方:AccountInfoServiceImpl 类 queryPayResult() 和 NotifyMsgListener 类的onMessage()方法,前者表示,消息接收方主动查询,后者表示,消息接收方监听到RocketMQ的变化而调用
所以,最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务;最大努力通知方案需要实现如下功能:
1、消息重复通知机制。充值系统充值成功,发送消息,账户系统接收消息,修改账户金额。也要注意,账户系统修改账户金额幂等测试。
2、消息校对机制。充值系统充值成功,账户系统主动查询充值结果,修改账户金额。注意,账户系统修改账户金额幂等测试。

5.2 设计一个接口幂等性,加分项

注意1,账户系统修改账户金额幂等测试。
public void updateAccountBalance(AccountChangeEvent accountChange) {
//幂等校验
int existTx = accountInfoDao.isExistTx(accountChange.getTxNo());
if(existTx >0){
log.info(“已处理消息:{}”, JSONObject.toJSONString(accountChange));
return ;
}
//添加事务记录
accountInfoDao.addTx(accountChange.getTxNo());
//更新账户金额
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
}
如何实现幂等性?
第一,数据库层面,在创建账户表的时候,额外创建一张重复表,重复表记录账户流水号和流水号时间,账户表和重复表都是通过账户流水号来确定唯一性的;
第二,后端代码service层面,插入和修改账户记录,先查重复表里是否有这个账户流水号,如果有了,说明请求来了,不再插入,如果重复表中没有,重复表插入流水记录,账户表完成操作(更新或插入)
注意1:更新或插入或删除要这么做,因为会改变数据,防止更新、删除、插入多次,特别是防止插入多次,但是查询不需要重复表,因为不改变数据。
注意2:一定要先完成,重复表插入流水记录,然后账户表相关操作(插入、更新或删除),否则,重复表判断isExistTx()没有任何意义,其实,在重复表插入流水记录,然后账户表相关操作(插入、更新或删除)中间插一个Thread.sleep(1000),可以让问题在高并发下暴露的更加明显。

5.4 如何实现消息通信

注意2,如何实现消息通信
(1)在application.properties中配置好RocketMQ的地址,
(2)生产者,直接将消息放到topic_notifymsg中,
消费者,直接从topic_notifymsg中取出消息。

5.5 如何实现服务间调用,Fegin,调用远程服务就像调用本地服务,同时加上错误处理

@FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class)
public interface PayClient {
    @GetMapping("/pay/payresult/{txNo}")
    AccountPay queryPayResult(@PathVariable("txNo") String txNo);
}

@Component
public class PayFallback implements PayClient {

    @Override
    public AccountPay queryPayResult(String txNo) {
        AccountPay accountPay = new AccountPay();
        accountPay.setResult("fail");
        return accountPay;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

六、小结

分布式事务保证服务间的数据一致性:使用MQ消息通知机制实现“实时性要求不高”的最大努力通知,完成了。

天天打码,天天进步!!!

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号