赞
踩
在传统的基于数据库的架构中,对于数据的抢占问题往往是通过数据库事务(ACID)来保证的。在分布式环境中,出于对性能以及一致性敏感度的要求,使得分布式锁成为了一种比较常见而高效的解决方案。
场景1:
场景2:
某服务提供一组任务,A请求随机从任务组中获取一个任务;B请求随机从任务组中获取一个任务。
在理想的情况下,A从任务组中挑选一个任务,任务组删除该任务,B从剩下的的任务中再挑一个,任务组删除该任务。
同样的,在真实情况下,如果不做任何处理,可能会出现A和B挑中了同一个任务的情况。
可以保证在分布式部署的应用集群中,同一个方法在同一操作只能被一台机器上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
这把锁有高可用的获取锁和释放锁功能
这把锁获取锁和释放锁的性能要好…
use test; CREATE TABLE `DistributedLock` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁名', `desc` varchar(1024) NOT NULL DEFAULT '备注信息', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成', PRIMARY KEY (`id`), UNIQUE KEY `uidx_name` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法'; //数据库中的每一条记录就是一把锁,利用的mysql唯一索引的排他性 lock(name,desc){ insert into DistributedLock(`name`,`desc`) values (#{name},#{desc}); } unlock(name){ delete from DistributedLock where name = #{name} }
锁重入:可增加可重入功能(避免再次获取锁导致死锁)
增加字段进程识别信息(ip、服务名称、线程id) 与 重入计数count,如果是同一个进程同一个线程则允许重入。
获取:再次获取锁的同时更新count(+1).
释放:更新count-1,当count==0删除记录。
可靠性
主从mysql:mysql宕机,立刻切换。
锁的持有者挂掉:定时任务清楚持有一定时间的锁。
性能
db操作都有一定性能损耗
阻塞锁
有此需求的业务线需要使用自旋多次尝试获取锁的实现。
boolean lock(){ connection.setAutoCommit(false) while(true){ try{ result = select ... from DistributedLock where name=lock for update; if(result==null){ return true; } }catch(Exception e){ connection.commit(); } sleep(*); } return false; } void unlock(){ connection.commit(); }
其他附加功能与实现一基本一致,这里需要注意的是“where name=lock ”,name字段必须要走索引,否则会锁表。有些情况下,比如表不大,mysql优化器会不走这个索引,导致锁表问题。
所谓乐观锁与前边最大区别在于基于CAS思想,是不具有互斥性,不会产生锁等待而消耗资源,操作过程中认为不存在并发冲突,只有update version失败后才能觉察到。我们的抢购、秒杀就是用了这种实现以防止超卖。
通过增加递增的版本号字段实现乐观锁
select ...,version
update table set version+1 where version=xx
当然有人说可以在更新的时候这样写,通过比较拿到的account是否发生了变化来处理。如果还是除次拿到的值则允许成功更新。
update personal_bank set account=200
where id="xxx" and account=oldAccount
但是实现会有什么问题吗?留给大家思考
Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制
要保证的高可用(1一个业务节点宕机,不产生死锁;2不会被其他线程释放,谁家的锁只能由谁释放;3.保证redis加锁的原子性 4.可重入性等等),目前来看网上大部分的redis锁实现都非常不严谨,漏洞很常见,谨慎使用也许业务量小并不容易发现bug!!!
SETNX命令(推荐使用set(arg1,arg2,arg3,arg4,arg5))
语法:ETNX key value
功能:当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
GETSET命令
语法:GETSET key value
功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。
GET命令
语法:GET key
功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。
DEL命令语法:
DEL key [KEY …]
功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。
EVAL命令语法:
EVAL script numkeys key [key …] arg [arg …]
从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。
以下为代码摘抄,后期会作为一个功能点开源。(以下代码并不严谨,只做说明示例!!)
/** * 加锁代码摘录 */ @Override public boolean tryLock(String lockName, long timeout, TimeUnit unit) throws InterruptedException { LockInfo lockInfo = new LockInfo(Thread.currentThread(), lockName); // 先判断重入锁 if (reentrantIfNeed(lockInfo)) { return true; } else { return tryAcquire(lockInfo, timeout, unit); } } /** * @param lockInfo * @param timeout * @param unit * @return * @throws InterruptedException */ private boolean tryAcquire(LockInfo lockInfo, long timeout, TimeUnit unit) throws InterruptedException { // lock first time Long loopTimeout = (unit != null) ? unit.toMillis(timeout) : timeout; long startMillis = System.currentTimeMillis(); boolean isAcqired = false; do { String result = getCache().set(lockInfo.getName(), lockInfo.getValue(), SET_IF_NOT_EXIST, SET_EXPIRE_MILLISECONDS, LOCK_EXPIRE_MILLSECOND); if ("OK".equals(result)) { isAcqired = true; break; } TimeUnit.NANOSECONDS.sleep(RETRY_INTERVAL); } while (System.currentTimeMillis() - startMillis > loopTimeout); if (isAcqired) { allLockInfo.add(lockInfo); } return isAcqired; } /** * 解锁代码摘录 */ protected static final String UNLOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return -1 end"; @Override public void unlock(String lockName) { Thread currentThread = Thread.currentThread(); LockInfo lockInfo = new LockInfo(currentThread, lockName); LockInfo locked = getExistedLock(lockInfo); if (null == locked) { throw new IllegalMonitorStateException("currentThread : [ " + currentThread + " ] unlock : [ " + lockName + " ] fail because of not owner."); } int reentrantTimes = locked.reentrantMinusOne(); if (reentrantTimes > 0) { return; } else if (reentrantTimes < 0) { throw new IllegalMonitorStateException("currentThread : [ " + currentThread + " ] unlock : [ " + lockName + " ] fail because of reentrant can't be negative."); } else { Object result = getCache().eval(UNLOCK_LUA_SCRIPT, Collections.singletonList(lockName), Collections.singletonList(locked.getValue())); if (!UNLOCK_FAIL.equals(result)) { allLockInfo.remove(locked); } } }
/** * @author zhangsh */ public class RedisWatchLock { private static final String redisHost = "127.0.0.1"; private static final int port = 6379; private static JedisPoolConfig config; private static JedisPool pool; private static ExecutorService service; private static int count = 10; private static CountDownLatch latch; private static AtomicInteger Countor = new AtomicInteger(0); static { config = new JedisPoolConfig(); config.setMaxIdle(10); config.setMaxWaitMillis(1000); config.setMaxTotal(30); pool = new JedisPool(config, redisHost, port); service = Executors.newFixedThreadPool(10); latch = new CountDownLatch(count); } public static void main(String args[]) { int count = 10; String ThreadNamePrefix = "thread-"; Jedis cli = pool.getResource(); cli.del("redis_inc_key");// 先删除既定的key cli.set("redis_inc_key", String.valueOf(1));// 设定默认值 for (int i = 0; i < count; i++) { Thread th = new Thread(new TestThread(pool)); th.setName(ThreadNamePrefix + i); System.out.println(th.getName() + "inited..."); service.submit(th); } service.shutdown(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("all sub thread sucess"); System.out.println("countor is " + Countor.get()); String countStr = cli.get("redis_inc_key"); System.out.println(countStr); } public static class TestThread implements Runnable { private String incKeyStr = "redis_inc_key"; private Jedis cli; private JedisPool pool; public TestThread(JedisPool pool) { cli = pool.getResource(); this.pool = pool; } public void run() { try { for (int i = 0; i < 100; i++) { actomicAdd();//生产环境中批量操作尽量使用redisPipeLine!! } } catch (Exception e) { pool.returnBrokenResource(cli); } finally { pool.returnResource(cli); latch.countDown(); } } /** * 0 watch key * 1 multi * 2 set key value(queued) * 3 exec * * return null:fail * reurn "ok": succeed * * watch每次都需要执行(注册) */ public void actomicAdd() { cli.watch(incKeyStr);// 0.watch key boolean flag = true; while (flag) { String countStr = cli.get("redis_inc_key"); int countInt = Integer.parseInt(countStr); int expect = countInt + 1; Transaction tx = cli.multi(); // 1.multi tx.set(incKeyStr, String.valueOf(expect));// 2.set key value // (queued) List<Object> list = tx.exec();// 3.exec if (list == null) { System.out.println("fail"); continue; } else { flag = false; System.out.println("succeed"); } System.out.println("my expect num is " + expect); System.out.println("seting...."); } Countor.incrementAndGet(); } } }
缺点:1.db操作性能较差,并且有锁表的风险
2.非阻塞操作失败后,需要轮询,占用cpu资源;
3.长时间不commit或者长时间轮询,可能会占用较多连接资源
缺点:1.锁删除失败 过期时间不好控制
2.非阻塞,操作失败后,需要轮询,占用cpu资源;
缺点:性能不如redis实现,主要原因是写操作(获取锁释放锁)都需要在Leader上执行,然后同步到follower。
总之:ZooKeeper有较好的性能和可靠性。
从理解的难易程度角度(从低到高)数据库 > 缓存 > Zookeeper
从实现的复杂性角度(从低到高)Zookeeper >= 缓存 > 数据库
从性能角度(从高到低)缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低)Zookeeper > 缓存 > 数据库
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。