赞
踩
目录
什么是高并发,高并发下为什么会有数据一致性的问题
基于资金账号体系数据库设计及开发相关的代码,测试发现问题
解决方案有哪些,不同数据一致性解决方案的特点比较,不同应用场景使用不同解决方案
总结不同的解决方案对应不同的业务场景
详解
什么是高并发,高并发下会出现什么问题,如何确保数据一致性
例子:往缸里面倒水,一个人一瓢,如果成千上百的就是并发。这个时候有一个人计数,如果所有人同时向缸里面倒水,计数的人就无法控制准确记录,这就会导致缸里面的具体有多少瓢水无法准确确定。
资金账号体系的介绍,及数据库设计
涉及到支付就会有一套完整的资金账号体系,保证用户在产品中的一个资金变动的记录。
数据脚本:
CREATE TABLE `account` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `account_type` int(2) DEFAULT NULL COMMENT '账户类型 1余额账户 2冻结账户 3消费账户 4返佣账户 5信用账户', `user_id` bigint(20) DEFAULT NULL COMMENT '用户id', `begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额', `cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额', `final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额', `accoun_status` int(2) DEFAULT NULL COMMENT '1正常 2冻结状态', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `flag` int(1) DEFAULT NULL COMMENT '逻辑删除字段', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=124 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE `account_flew` ( `id` bigint(20) NOT NULL, `flew_type` int(1) DEFAULT NULL COMMENT '1充值 2消费 3分账', `business_type` int(2) DEFAULT NULL COMMENT '二级业务类型 1下单 2微信充值 3支付宝充值 ', `business_type_msg` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '二级业务类型的注解', `pre_order_id` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '业务关联id', `account_id` bigint(20) DEFAULT NULL COMMENT '账户表的主键id', `account_type` int(2) DEFAULT NULL COMMENT '账户类型表', `user_id` bigint(20) DEFAULT NULL COMMENT '用户id', `begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额', `cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额', `final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额', `pay_channel` int(2) DEFAULT NULL COMMENT '支付渠道', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `flag` int(2) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
操作账号相关代码编写
entity对象
@Data public class Account { private Long id; /** 账户类型 1余额账户 2冻结账户 3消费账户 4返佣账户 5信用账户 */ private Integer accountType; /** 用户id */ private Long userId; /** 期初余额 */ private Double beginMoney; /** 当期发生额 */ private Double curMoney; /** 期末余额 */ private Double finalMoney; /** 1正常 2冻结状态 */ private Integer accounStatus; /** */ private Date createTime; /** */ private Date updateTime; /** 逻辑删除字段 */ private Integer flag; } @Data public class AccountFlew { private Long id; /** 1充值 2消费 3分账 */ private Integer flewType; /** 二级业务类型 1下单 2微信充值 3支付宝充值 */ private Integer businessType; /** 二级业务类型的注解 */ private String businessTypeMsg; /** 业务关联id */ private String preOrderId; /** 账户表的主键id */ private Long accountId; /** 账户类型表 */ private Integer accountType; /** 用户id */ private Long userId; /** 期初余额 */ private Double beginMoney; /** 当期发生额 */ private Double curMoney; /** 期末余额 */ private Double finalMoney; /** 支付渠道 */ private Integer payChannel; /** */ private Date createTime; /** */ private Date updateTime; /** */ private Integer flag; }
pom文件相关的依赖
application.properties配置
基于java自带的锁机制解决数据一致性问题
Lock锁
private final static Lock lock = new ReentrantLock();
lock.lock();
lock.unlock();
synchronized
private Integer lock = new Integer(0);
public void locktest(){
synchronized (lock){
}
}
基于java自带的锁,控制并发的问题
多节点下无法控制
-Dfile.encoding=UTF-8 -Dserver.port=201
基于mysql自带的锁进行控制
mysql排它锁的介绍
Inodb行级锁,通过锁定行数据来保证数据一致性。登录linux的mysql,设置事务部自动提交set autocommit = 0,开启事务 begin,执行for update的sql,commit提交。select
id
from account
where id = #{id,jdbcType=BIGINT} for update
xml配置
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.course.account.infrastructure.mapper.AccountMapper"> <!-- 所有字段 --> <sql id="Base_Column_List"> id ,account_type ,user_id ,begin_money ,cur_money ,final_money ,account_status ,create_time ,update_time ,del_status </sql> <!-- 字段映射 --> <resultMap id="BaseResultMap" type="com.course.account.infrastructure.entity.Account"> <id column="id" property="id" jdbcType="BIGINT"/> <result column="account_type" jdbcType="VARCHAR" property="accountType"/> <result column="user_id" jdbcType="VARCHAR" property="userId"/> <result column="begin_money" jdbcType="DECIMAL" property="beginMoney"/> <result column="cur_money" jdbcType="DECIMAL" property="curMoney"/> <result column="final_money" jdbcType="DECIMAL" property="finalMoney"/> <result column="account_status" jdbcType="INTEGER" property="accountStatus"/> <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/> <result column="update_time" jdbcType="TIMESTAMP" property="updateTime"/> <result column="del_status" jdbcType="INTEGER" property="delStatus"/> </resultMap> <!-- 主键查询 --> <select id="lockAccountById" resultMap="BaseResultMap" parameterType="java.lang.Long"> select <include refid="Base_Column_List"/> from account where id = #{id,jdbcType=BIGINT} for update </select> </mapper>
死锁
client1:
select * from account where id = 1 for update;
select * from account where id = 2 for update;
client2:
select * from account where id = 2 for update;
select * from account where id = 1 for update;
mysql返回异常:
ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction
基于redis做并发控制
redis的分布式锁原理
reids是基于单线程的队列实现的,可以保证并发的安全;
基于redis命令:setnx命令来实现
java代码
@Autowired
private StringRedisTemplate stringRedisTemplate;
stringRedisTemplate.opsForValue().setIfAbsent();
stringRedisTemplate.expire(key,5, TimeUnit.SECONDS);
stringRedisTemplate.delete(key);
String key = “account_lock_”;
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key,“1”);
if(!b){
log.warn(“当前处于锁状态不能操作”);
return;
}
redis分布式锁存在的问题
问题:1,长生key问题;2,原子性问题;3,key超时问题;4,主从切换key失效问题
解决:
1:通过给key设置合理失效时间;
2:通过使用lua脚本;
3:(1)通过业务层中校验key是否有效决定是否提交,此方法可以大大降低key超时带来的问题,不能全部解决;(2)redission中的狗链会定期检查key是否持有,持有则延长key失效时间;
4:redisson中的redlock可以解决主从key失效问题
基于redission解决redis的分布式锁的原子性问题
加锁
waitTime和leaseTime分别是等待时间和过期时间
redissonService.getRLock(“1”).tryLock(waitTime,leaseTime,TimeUnit.SECONDS);
源码
/**
* 尝试获取锁
*
* @return
*/
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
/** * 一看就是为了获取异步执行的结果,所以重点应该看tryLockAsync() * * @return */ protected final <V> V get(RFuture<V> future) { return this.commandExecutor.get(future); } /** * 异步获取锁 * * @return */ public RFuture<Boolean> tryLockAsync() { return this.tryLockAsync(Thread.currentThread().getId()); } /** * 尝试获取锁 * * @param threadId * @return */ public RFuture<Boolean> tryLockAsync(long threadId) { return this.tryAcquireOnceAsync(-1L, (TimeUnit) null, threadId); } /** * 尝试获取锁 * * @param leaseTime * @param unit * @param threadId * @return */ private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) { /** * 如果自定义过期时间 */ if (leaseTime != -1L) { return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } /** * 如果是默认的过期时间 */ else { RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { //没有异常 if (e == null) { //成功获取锁 if (ttlRemaining) { // 更新过期时间 this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } } /** * 真正的获取锁的代码 * @param leaseTime * @param unit * @param threadId * @param command * @param <T> * @return */ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //这个字段后面用作续过期时间 this.internalLockLeaseTime = unit.toMillis(leaseTime); /** * 利用lua脚本执行相关逻辑 */ return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); } //如果key不存在 if (redis.call('exists', KEYS[1]) == 0) //设置这个key,并且设置超时时间,获取锁成功 then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; //如果key存在 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) //就对key自增,并且重置过期时间(重入锁) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; //获取key剩下的时间 return redis.call('pttl', KEYS[1]); /** * 重置过期时间 * @param threadId */ private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); //检查是否存在指定的定时任务 RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); //如果已经存在指定的定时任务 if (oldEntry != null) { oldEntry.addThreadId(threadId); } //如果是第一次创建这个定时任务 else { entry.addThreadId(threadId); this.renewExpiration(); } } /** * 定时任务重置过期时间 */ private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { //每三分之一的过期时间续一次,直至解锁 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { //拿到第一个线程id Long threadId = ent.getFirstThreadId(); if (threadId != null) { //续时间 RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { // 如果有异常,打印日志 if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } // 没有异常就继续续时间 else { RedissonLock.this.renewExpiration(); } }); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } } /** * 重置指定线程id的过期时间 * @param threadId * @return */ protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); } //如果已经存在这个key,那就给它续时间 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
图解
基于redis实现调用次数及并发的控制
redis是单线程的本身就可以保证并发控制,基于命令incrby命令来实现。
Double account_money = stringRedisTemplate.opsForValue().increment(“account_money”, -Double.valueOf(updateAccountDto.getMoney()));
Buchixihongshi123
redis中如果在操作数据的时候出现了异常情况下,如何保证数据和mysql一致?通过incrby做加法。
stringRedisTemplate.opsForValue().increment(“account_money”, Double.valueOf(updateAccountDto.getMoney()));
总结
1,单节点多节点下的解决方案:
单节点:可以通过java自带的锁机制,多节点不可以;
多节点:通过mysql锁redis分布式锁;
2,业务场景:
每次都需要成功的业务可以通过mysql锁;redis的incrby命令;
不要求每次都成功可以通过redis的分布式锁;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。