转载:https://blog.csdn.net/panjiapengfly/article/details/84758971
为何需要分布式锁?
在单进程的系统中,当遇到并发情况下,会出现一些数据异常的问题,但是如果这些数据是需要保证唯一性的话,那我们就希望在同一时刻,只能有一个线程在执行这块代码,通常我们一般都是通过简单的加锁或同步来实现并解决这个问题。
但是以上都是单进程多线程的情况,如果出现多进程多线程,显然会出现问题。因为多线程之间是可以共享内存的,但是多进程之间是不行的,所以这个时候需要用到分布式锁。
分布式锁常用实现方案
分布式锁通常是借助于一个第三方组件并利用它自身的排他性来达到多进程的互斥。如下:
- 基于数据库实现分布式锁
- 基于缓存,实现分布式锁,如redis
- 基于Zookeeper实现分布式锁
基于数据库:锁实现也有两种方式,一是基于数据库表(创建一张锁表),另一种是基于数据库排他锁。
基于zookeeper:锁的实现是依靠临时有序节点,每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。
基于缓存:下面我们要重点讲的就是redis。基于 Redis 的 NX EX 参数。
基于redis的分布式锁实现
Redis有一系列的命令,其特点是以NX结尾,NX的意思可以理解为 NOT EXISTS(不存在),SETNX命令 (SET IF NOT EXISTS) 可以理解为如果不存在则插入,Redis分布式锁的实现主要就是使用SETNX命令
1、使用setnx() 设置锁
- $expire = 10; //有效期10秒
- $key = ‘lock‘; //key
- $value = time() + $expire; //锁的值 = Unix时间戳 + 锁的有效期
- $lock = $redis->setnx($key, $value);
- //判断是否上锁成功,成功则执行下步操作
- if(! empty($lock))
- {
- //下步操作…
- }
如果返回 1 ,则表示当前进程获得锁,并获得了当前插入/更新缓存的操作权限。
如果返回 0,表示锁已被其他进程获取,这是进程可以返回结果或者等待当前锁失效再请求。
2、存在死锁的问题
如果单单只用SETNX命令设置锁的话,如果当持有锁的进程崩溃或删除锁失败时,其他进程将无法获取到锁,问题就大了。
解决方法是在获取锁失败的同时获取锁的值,并将值与当前时间进行对比,如果值小于当前时间说明锁以过期失效,进程可运用Redis的DEL命令删除该锁
- $expire = 10; //有效期10秒
- $key = ‘lock‘; //key
- $value = time() + $expire; //锁的值 = Unix时间戳 + 锁的有效期
- $status = true;
- while($status)
- {
- $lock = $redis->setnx($key, $value);
- if( empty($lock))
- {
- $value = $redis->get($key);
- if($value < time())
- {
- $redis->del($key);
- }
- } else{
- $status = false;
- //下步操作….
- }
- }
但是,简单粗暴的用DEL命令删除锁再SETNX命令上锁也会出现问题。比如,进程1获得锁后崩溃或删除锁失败,这时进程2检测到锁存在当已过期,用DEL命令删除锁并用SETNX命令设置锁,进程3也检测到锁过期,也用DEL命令删除锁也用SETNX命令设置了锁,这时进程2和进程3同时获得了锁。问题大了
为了解决这个问题,这里用到了Redis的GETSET命令,GETSET命令在给锁设置新值的同时返回锁的旧值,这里利用了GETSET命令同时获取和赋值的特性,在此期间其他进程无法修改锁的值。
例如:
进程1获得锁后操作超时/崩溃/删除锁失败,
进程2检测到锁已存在,但获取锁的值对比当前时间发现锁已过期,
进程2通过GETSET命令重新给锁赋予新的值,并获取到的锁的旧值,再次对比锁的旧值与当前时间,如果锁的旧值依然小于当前时间的话,这时进程2就可以忽略进程1余留下的废锁进行下步操作了。
进程2完成下步操作后返回前应该删除锁,但在删除锁时可以先检测锁是否还未过期,未过期才做删除操作,已过期的就没必要在去删除锁了,因为很有可能其他进程检测到锁过期时已经去获取锁了。
这里要说明的是,如果有其他进程在进程2之前获取到锁,那么进程2将获取锁失败,但是进程2在用GETSET获取锁的旧值时也赋予了锁新的值,改写了其他进程赋予锁的超时值。看到这大家可能会有疑问了,进程2没获取到锁怎么能改变锁的值呢?是的,进程2改变了锁的原有值,但这一点小小的时间误差带来的影响是可以忽略。
3、以下是Redis实现分布式锁的完整PHP代码:
- <?php
- /**
- * 实现Redis分布锁
- */
- $key = ‘test‘; //要更新信息的缓存KEY
- $lockKey = ‘lock:‘.$key; //设置锁KEY
- $lockExpire = 10; //设置锁的有效期为10秒
- //获取缓存信息
- $result = $redis->get($key);
- //判断缓存中是否有数据
- if( empty($result))
- {
- $status = TRUE;
- while ($status)
- {
- //设置锁值为当前时间戳 + 有效期
- $lockValue = time() + $lockExpire;
- /**
- * 创建锁
- * 试图以$lockKey为key创建一个缓存,value值为当前时间戳
- * 由于setnx()函数只有在不存在当前key的缓存时才会创建成功
- * 所以,用此函数就可以判断当前执行的操作是否已经有其他进程在执行了
- * @var [type]
- */
- $lock = $redis->setnx($lockKey, $lockValue);
- /**
- * 满足两个条件中的一个即可进行操作
- * 1、上面一步创建锁成功;
- * 2、 1)判断锁的值(时间戳)是否小于当前时间 $redis->get()
- * 2)同时给锁设置新值成功 $redis->getset()
- */
- if(! empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time() ))
- {
- //给锁设置生存时间
- $redis->expire($lockKey, $lockExpire);
- //******************************
- //此处执行插入、更新缓存操作…
- //******************************
- //以上程序走完删除锁
- //检测锁是否过期,过期锁没必要删除
- if($redis->ttl($lockKey))
- $redis->del($lockKey);
- $status = FALSE;
- } else{
- /**
- * 如果存在有效锁这里做相应处理
- * 等待当前操作完成再执行此次请求
- * 直接返回
- */
- sleep( 2); //等待2秒后再尝试执行操作
- }
- }
- }