当前位置:   article > 正文

高并发下金融账号控制案例_账户操作高并发

账户操作高并发

目录
什么是高并发,高并发下为什么会有数据一致性的问题
基于资金账号体系数据库设计及开发相关的代码,测试发现问题
解决方案有哪些,不同数据一致性解决方案的特点比较,不同应用场景使用不同解决方案
总结不同的解决方案对应不同的业务场景
详解
什么是高并发,高并发下会出现什么问题,如何确保数据一致性
例子:往缸里面倒水,一个人一瓢,如果成千上百的就是并发。这个时候有一个人计数,如果所有人同时向缸里面倒水,计数的人就无法控制准确记录,这就会导致缸里面的具体有多少瓢水无法准确确定。

资金账号体系的介绍,及数据库设计
涉及到支付就会有一套完整的资金账号体系,保证用户在产品中的一个资金变动的记录。
数据脚本:

CREATE TABLE `account` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `account_type` int(2) DEFAULT NULL COMMENT '账户类型 1余额账户  2冻结账户  3消费账户 4返佣账户 5信用账户',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额',
  `cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额',
  `final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额',
  `accoun_status` int(2) DEFAULT NULL COMMENT '1正常 2冻结状态',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `flag` int(1) DEFAULT NULL COMMENT '逻辑删除字段',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=124 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE `account_flew` (
  `id` bigint(20) NOT NULL,
  `flew_type` int(1) DEFAULT NULL COMMENT '1充值 2消费 3分账',
  `business_type` int(2) DEFAULT NULL COMMENT '二级业务类型 1下单 2微信充值 3支付宝充值 ',
  `business_type_msg` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '二级业务类型的注解',
  `pre_order_id` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '业务关联id',
  `account_id` bigint(20) DEFAULT NULL COMMENT '账户表的主键id',
  `account_type` int(2) DEFAULT NULL COMMENT '账户类型表',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额',
  `cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额',
  `final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额',
  `pay_channel` int(2) DEFAULT NULL COMMENT '支付渠道',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `flag` int(2) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

操作账号相关代码编写
entity对象

@Data
public class Account {
    private Long id;
    /** 账户类型 1余额账户  2冻结账户  3消费账户 4返佣账户 5信用账户 */
    private Integer accountType;
        /** 用户id */
    private Long userId;
        /** 期初余额 */
    private Double beginMoney;
        /** 当期发生额 */
    private Double curMoney;
        /** 期末余额 */
    private Double finalMoney;
        /** 1正常 2冻结状态 */
    private Integer accounStatus;
        /**  */
    private Date createTime;
        /**  */
    private Date updateTime;
        /** 逻辑删除字段 */
    private Integer flag;
}

@Data
public class AccountFlew  {
    private Long id;

    /** 1充值 2消费 3分账 */
    private Integer flewType;
        /** 二级业务类型 1下单 2微信充值 3支付宝充值  */
    private Integer businessType;
        /** 二级业务类型的注解 */
    private String businessTypeMsg;
        /** 业务关联id */
    private String preOrderId;
        /** 账户表的主键id */
    private Long accountId;
        /** 账户类型表 */
    private Integer accountType;
        /** 用户id */
    private Long userId;
        /** 期初余额 */
    private Double beginMoney;
        /** 当期发生额 */
    private Double curMoney;
        /** 期末余额 */
    private Double finalMoney;
        /** 支付渠道 */
    private Integer payChannel;
        /**  */
    private Date createTime;
        /**  */
    private Date updateTime;
        /**  */
    private Integer flag;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

pom文件相关的依赖
application.properties配置
基于java自带的锁机制解决数据一致性问题
Lock锁

private final static Lock lock = new ReentrantLock();
lock.lock();
lock.unlock();

synchronized
private Integer lock = new Integer(0);
public void locktest(){
    synchronized (lock){
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

基于java自带的锁,控制并发的问题
多节点下无法控制
-Dfile.encoding=UTF-8 -Dserver.port=201

基于mysql自带的锁进行控制
mysql排它锁的介绍
Inodb行级锁,通过锁定行数据来保证数据一致性。登录linux的mysql,设置事务部自动提交set autocommit = 0,开启事务 begin,执行for update的sql,commit提交。select
id
from account
where id = #{id,jdbcType=BIGINT} for update

xml配置

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.course.account.infrastructure.mapper.AccountMapper">

    <!-- 所有字段 -->
    <sql id="Base_Column_List">
   id
   ,account_type
   ,user_id
   ,begin_money
   ,cur_money
   ,final_money
   ,account_status
   ,create_time
   ,update_time
   ,del_status
   </sql>

    <!-- 字段映射 -->
    <resultMap id="BaseResultMap" type="com.course.account.infrastructure.entity.Account">
        <id column="id" property="id" jdbcType="BIGINT"/>
        <result column="account_type" jdbcType="VARCHAR" property="accountType"/>
        <result column="user_id" jdbcType="VARCHAR" property="userId"/>
        <result column="begin_money" jdbcType="DECIMAL" property="beginMoney"/>
        <result column="cur_money" jdbcType="DECIMAL" property="curMoney"/>
        <result column="final_money" jdbcType="DECIMAL" property="finalMoney"/>
        <result column="account_status" jdbcType="INTEGER" property="accountStatus"/>
        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
        <result column="update_time" jdbcType="TIMESTAMP" property="updateTime"/>
        <result column="del_status" jdbcType="INTEGER" property="delStatus"/>
    </resultMap>

    <!-- 主键查询 -->
    <select id="lockAccountById" resultMap="BaseResultMap" parameterType="java.lang.Long">
        select
        <include refid="Base_Column_List"/>
        from account
        where id = #{id,jdbcType=BIGINT} for update
    </select>
</mapper>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

死锁
client1:
select * from account where id = 1 for update;
select * from account where id = 2 for update;
client2:
select * from account where id = 2 for update;
select * from account where id = 1 for update;

mysql返回异常:
ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction

基于redis做并发控制
redis的分布式锁原理
reids是基于单线程的队列实现的,可以保证并发的安全;
基于redis命令:setnx命令来实现
java代码
@Autowired
private StringRedisTemplate stringRedisTemplate;

stringRedisTemplate.opsForValue().setIfAbsent();
stringRedisTemplate.expire(key,5, TimeUnit.SECONDS);
stringRedisTemplate.delete(key);

String key = “account_lock_”;
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key,“1”);
if(!b){
log.warn(“当前处于锁状态不能操作”);
return;
}

redis分布式锁存在的问题
问题:1,长生key问题;2,原子性问题;3,key超时问题;4,主从切换key失效问题
解决:
1:通过给key设置合理失效时间;
2:通过使用lua脚本;
3:(1)通过业务层中校验key是否有效决定是否提交,此方法可以大大降低key超时带来的问题,不能全部解决;(2)redission中的狗链会定期检查key是否持有,持有则延长key失效时间;
4:redisson中的redlock可以解决主从key失效问题
基于redission解决redis的分布式锁的原子性问题
加锁
waitTime和leaseTime分别是等待时间和过期时间
redissonService.getRLock(“1”).tryLock(waitTime,leaseTime,TimeUnit.SECONDS);

源码

/**
* 尝试获取锁
*
* @return
*/
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}

/**
 * 一看就是为了获取异步执行的结果,所以重点应该看tryLockAsync()
 *
 * @return
 */
protected final <V> V get(RFuture<V> future) {
    return this.commandExecutor.get(future);
}

/**
 * 异步获取锁
 *
 * @return
 */
public RFuture<Boolean> tryLockAsync() {
    return this.tryLockAsync(Thread.currentThread().getId());
}

/**
 * 尝试获取锁
 *
 * @param threadId
 * @return
 */
public RFuture<Boolean> tryLockAsync(long threadId) {
    return this.tryAcquireOnceAsync(-1L, (TimeUnit) null, threadId);
}


/**
 * 尝试获取锁
 *
 * @param leaseTime
 * @param unit
 * @param threadId
 * @return
 */
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
    /**
     * 如果自定义过期时间
     */
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    /**
     * 如果是默认的过期时间
     */
    else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //没有异常
            if (e == null) {
                //成功获取锁
                if (ttlRemaining) {
                    // 更新过期时间
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

/**
 * 真正的获取锁的代码
 * @param leaseTime
 * @param unit
 * @param threadId
 * @param command
 * @param <T>
 * @return
 */
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    //这个字段后面用作续过期时间
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    /**
     * 利用lua脚本执行相关逻辑
     */
    return this.commandExecutor.evalWriteAsync(this.getName(),
            LongCodec.INSTANCE,
            command,
            "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); 
			redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; 
			if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); 
			redis.call('pexpire', KEYS[1], ARGV[1]); 
			return nil; end; 
			return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(this.getName()),
            new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

//如果key不存在
if (redis.call('exists', KEYS[1]) == 0)
    //设置这个key,并且设置超时时间,获取锁成功
   then redis.call('hset', KEYS[1], ARGV[2], 1);
     redis.call('pexpire', KEYS[1], ARGV[1]);
     return nil; end;
//如果key存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    //就对key自增,并且重置过期时间(重入锁)
   then redis.call('hincrby', KEYS[1], ARGV[2], 1);
     redis.call('pexpire', KEYS[1], ARGV[1]);
     return nil; end;
//获取key剩下的时间
return redis.call('pttl', KEYS[1]);


/**
 * 重置过期时间
 * @param threadId
 */
private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    //检查是否存在指定的定时任务
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    //如果已经存在指定的定时任务
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    }
    //如果是第一次创建这个定时任务
    else {
        entry.addThreadId(threadId);
        this.renewExpiration();
    }

}

/**
 * 定时任务重置过期时间
 */
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        //每三分之一的过期时间续一次,直至解锁
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    //拿到第一个线程id
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        //续时间
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            // 如果有异常,打印日志
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            }
                            // 没有异常就继续续时间
                            else {
                                RedissonLock.this.renewExpiration();
                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

/**
 * 重置指定线程id的过期时间
 * @param threadId
 * @return
 */
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.commandExecutor.evalWriteAsync(this.getName(),
            LongCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
            Collections.singletonList(this.getName()),
            new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

//如果已经存在这个key,那就给它续时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
  then redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1; end;
return 0;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181

图解

基于redis实现调用次数及并发的控制
redis是单线程的本身就可以保证并发控制,基于命令incrby命令来实现。
Double account_money = stringRedisTemplate.opsForValue().increment(“account_money”, -Double.valueOf(updateAccountDto.getMoney()));

Buchixihongshi123
redis中如果在操作数据的时候出现了异常情况下,如何保证数据和mysql一致?通过incrby做加法。
stringRedisTemplate.opsForValue().increment(“account_money”, Double.valueOf(updateAccountDto.getMoney()));

总结
1,单节点多节点下的解决方案:
单节点:可以通过java自带的锁机制,多节点不可以;
多节点:通过mysql锁redis分布式锁;
2,业务场景:
每次都需要成功的业务可以通过mysql锁;redis的incrby命令;
不要求每次都成功可以通过redis的分布式锁;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/404505
推荐阅读
相关标签
  

闽ICP备14008679号