首页> 预测未来>正文

程序员如何 Get 分布式锁的正确姿势?| 技术头条

2018/11/19 8:02:26 来源:互联网

原标题:程序员如何 Get 分布式锁的正确姿势?| 技术头条


作者 | 刘春龙

责编 | 郭芮

在很多互联网产品应用中,有些场景需要加锁处理,比如秒杀、全局递增ID、楼层生成等等,大部分的解决方案是基于DB实现的,Redis也是较为常见的方案之一。

Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。

Redis命令介绍

使用Redis实现分布式锁,有两个重要函数需要介绍。

SETNX命令(SET if Not Exists)

语法:

SETNX key value

功能:

当且仅当 key 不存在,将 key 的值设为 value ,并返回1; 若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。

GETSET命令

语法:

GETSET key value

功能:

将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。

GET命令

语法:

GETkey

功能:

返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。

DEL命令

语法:

DEL key[ KEY…]

功能:

删除给定的一个或多个 key,不存在的 key 会被忽略。

兵贵精,不在多。分布式锁,我们就依靠这四个命令。但在具体实现,还有很多细节,需要仔细斟酌,因为在分布式并发多进程中,任何一点出现差错,都会导致死锁,hold住所有进程。

加锁实现

SETNX 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试 SETNX foo.lock <current unix time>。

理想是美好的,现实是残酷的。仅仅使用SETNX加锁带有竞争条件的,在某些特定的情况会造成死锁错误。

处理死锁

在上面的处理方式中,如果获取锁的客户端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。

因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去。

但是,在大并发情况,如果同时检测锁失效,并简单粗暴地删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。

情景描述如下:

此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。

所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好我们有GETSET方法,假设我们现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。

时间戳问题

我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间。

如果各服务器间,时间有差异,时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。锁的超时与否,严格依赖时间戳。

锁覆盖问题

现在唯一的问题是,C4设置foo.lock的新时间戳,是否会对C5获取得锁产生影响?

其实我们可以看到C4和C5只有在调用GET命令获得foo.lock的时间戳,通过比对时间戳,发现锁超时后,几乎同时调用GETSET方式获取锁,执行的时间差值极小,并且写入foo.lock中的都是有效时间戳,所以对锁并没有影响。

为了让这个锁更加强壮,获取锁的客户端应该在调用关键业务时,再次调用GET方法获取T1,和写入的T0时间戳进行对比,以免锁因其他情况被执行DEL意外解开而不知。但是如果遇到上面描述得问题,则T0则会与T1不一致,当然差别一般会很小。这就是锁覆盖问题。

锁覆盖会导致什么问题呢?

当客户端的锁过期时间被覆盖,会造成锁不具有标识性,会造成客户端无法释放锁(客户端只能释放明确自己持有的锁)。

nil 问题

GET返回nil时应该走哪种逻辑?

1、第一种走循环走setnx逻辑

2、第二种走超时逻辑

两种逻辑貌似都是OK,但是从逻辑处理上来说,当GET返回nil,表示锁是被删除的,而不是超时,应该走SETNX逻辑加锁。

对于"第二种走超时逻辑"是否会造成死锁,尚不清楚,不过推荐采用第一种方式。

GETSET返回nil时应该怎么处理?

前提:假设C4客户端获取锁后由于异常退出等原因未正常释放锁,导致锁超时。此时,C1、C2和C3客户端同时请求获取锁。C1、C2和C3客户端调用GET接口,C1返回T1,此时C3网络情况更好,快速进入获取锁,并执行DEL删除锁,C2返回T2(nil)。C1进入超时处理逻辑。C2面临上面提到「GET返回nil时应该走哪种逻辑?」的两种选择:1. 也进入超时处理逻辑;2. 继续循环走setnx逻辑(推荐)。

至于为什么会出现这种情况?就如上面设想的场景那样,多客户端时,每个客户端连接Redis后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到rRedis Server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如DEL、SETNX等。

正确的处理方式就是GETSET返回nil时,获取锁成功。

总结

代码实现

代码库

https://github.com/HuTu92/distributed-lock

源码

packagecom.github.hutu92.concurrent.locks;

importcom.alibaba.fastjson.JSON;

importredis.clients.jedis.Jedis;

importredis.clients.jedis.JedisPool;

/**

* Created by liuchunlong on 2018/8/31.

* <p>

* 基于redis的分布式锁 v1

*

* 需要客户端时间同步

*/

publicclassDistributedLock{

privatestaticfinallongRETRY_BARRIER = 3* 1000; // 请求锁重试屏障,单位毫秒

privatefinalJedisPool jedisPool; // redis连接池

privatefinalString lockKey; // lock Key

privatefinallonglockExpiryInNanos; // 锁的过期时长,单位纳秒

privatestaticfinalThreadLocal<Lock> lockThreadLocal = newThreadLocal<Lock>();

/**

* 构造方法

*

* @paramjedisPool redis连接池

* @paramlockKey 锁的Key

* @paramlockExpiryInMillis 锁的过期时长,单位毫秒

*/

publicDistributedLock(JedisPool jedisPool, String lockKey, longlockExpiryInMillis){

this.jedisPool = jedisPool;

this.lockKey = lockKey;

this.lockExpiryInNanos = lockExpiryInMillis * 1000;

}

/**

* 构造方法

* <p>

* 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期

*

* @paramjedisPool redis连接池

* @paramlockKey 锁的Key

*/

publicDistributedLock(JedisPool jedisPool, String lockKey){

this(jedisPool, lockKey, Integer.MAX_VALUE);

}

/**

* 获取锁在redis中的Key标记

*

* @returnlocks key

*/

publicString getLockKey(){

returnthis.lockKey;

}

/**

* 锁的过期时长

*

* @return

*/

publiclonggetLockExpiryInNanos(){

returnlockExpiryInNanos;

}

/**

* 请求分布式锁,不会阻塞,直接返回

*

* @paramjedis redis 连接

* @return成功获取锁返回true, 否则返回false

*/

privatebooleantryAcquire(Jedis jedis){

finalLock newLock = newLock(System.nanoTime() + this.lockExpiryInNanos);

/**

* 将新锁(newLock)写入redis中。如果成功写入,redis中不存在锁,获取锁成功;否则,redis中已存在锁,获取锁失败;

*/

if(jedis.setnx( this.lockKey, newLock.toString()) == 1) {

lockThreadLocal.set(newLock);

returntrue;

}

/**

* 至此,说明redis中已存在锁,获取锁失败,则需要进行如下操作:

* 1. 判断redis中已存在的锁是否过期,如果过期则直接获取锁;

* 2. 否则,获取锁失败;

*/

finalString currentLockValue = jedis.get(lockKey);

// 特别的,当jedis.get()获取已存在的锁currentLockValue为空时,应该重新SETNX

if(currentLockValue == null|| currentLockValue.length() == 0) {

tryAcquire(jedis);

}

finalLock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁

// 如果redis中已存在的锁已超时,则重新获取锁

if(isExpired(currentLock)) {

String originLockValue = jedis.getSet(lockKey, newLock.toString());

/**

* 这里还有个前置条件:

* 会对已存在的锁进行校验,jedis.get()和jedis.getSet()获取的锁必须是同一锁,重新获取锁才成功

*/

// 特别的,当jedis.getSet()获取已存在的锁originLockValue为空时,则认定获取锁成功

if(originLockValue == null|| originLockValue.length() == 0) {

lockThreadLocal.set(newLock);

returntrue;

}

if(originLockValue.equals(currentLockValue)) {

lockThreadLocal.set(newLock);

returntrue;

}

}

returnfalse;

}

/**

* 请求分布式锁,不会阻塞,直接返回

*

* @return成功获取锁返回true, 否则返回false

*/

publicbooleantryAcquire(){

Jedis jedis = null;

try{

jedis = jedisPool.getResource();

returntryAcquire(jedis);

} finally{

if(jedis != null) {

jedis.close();

}

}

}

/**

* 超时请求分布式锁,会阻塞

*

* 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时

*

* @paramacquireTimeoutInMillis 锁的请求超时时长

* @return

*/

publicbooleanacquire(longacquireTimeoutInMillis){

Jedis jedis = null;

try{

jedis = jedisPool.getResource();

longacquireTime = System.currentTimeMillis();

// 锁的请求到期时间

longexpiryTime = System.currentTimeMillis() + acquireTimeoutInMillis;

while(expiryTime >= System.currentTimeMillis()) {

booleanresult = tryAcquire(jedis);

if(result) { // 获取锁成功直接返回,否则循环重试

returntrue;

}

if((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {

Thread.yield();

}

}

} finally{

if(jedis != null) {

jedis.close();

}

}

returnfalse;

}

/**

* 释放锁

*/

publicvoidrelease(){

Jedis jedis = null;

try{

jedis = jedisPool.getResource();

release(jedis);

} finally{

if(jedis != null) {

jedis.close();

}

}

}

/**

* 释放锁

*

* @paramjedis

*/

privatevoidrelease(Jedis jedis){

Lock currlock = lockThreadLocal.get();

if(currlock != null) {

finalString currentLockValue = jedis.get(lockKey);

if(currentLockValue != null&& currentLockValue.length() != 0) {

finalLock currentLock = Lock.fromJson(currentLockValue); // redis中已存在的锁

if(currlock.equals(currentLock)) {

lockThreadLocal.remove();

jedis.del(lockKey);

}

}

}

}

/**

* 判断当前线程是否持有锁

*

* 未持有锁或者锁超时,返回false

*

* @return

*/

publicbooleanisLocked(){

Lock currlock = lockThreadLocal.get();

// 如果当前线程保存的lock不为null,并且未超时,则当前线程必然持有锁,锁未被意外释放

returncurrlock != null&& !currlock.isExpired();

}

/**

* 判断指定的lock是否是当前线程持有的锁

*

* @return

*/

booleanisMine(finalLock lock){

Lock currlock = lockThreadLocal.get();

returncurrlock != null&& currlock.equals(lock);

}

/**

* 判断锁是否超时

*

* @paramlock

* @return

*/

booleanisExpired(finalLock lock){

returnlock.isExpired();

}

/**

* 锁

*/

protectedstaticclassLock{

privatelongexpiryTime; // 锁的过期时间,注意,不是过期时长,单位纳秒

Lock( longexpiryTime) {

this.expiryTime = expiryTime;

}

/**

* 解析字符串,根据解析出的过期时间构造Lock

*

* @paramjson

* @return

*/

staticLock fromJson(String json){

returnJSON.parseObject(json, Lock.class);

}

@Override

publicString toString(){

returnJSON.toJSONString( this, false);

}

publiclonggetExpiryTime(){

returnexpiryTime;

}

/**

* 判断锁是否超时,如果锁的过期时间小于当前系统时间,则判定锁超时

*

* @return

*/

booleanisExpired(){

returnthis.expiryTime < System.nanoTime();

}

@Override

publicbooleanequals(Object obj){

returnobj != null

&& obj instanceofLock

&& this.expiryTime == ((Lock) obj).getExpiryTime();

}

}

}

优化

上面存在的锁覆盖问题是不可避免的,还有就是要求客户端时间同步。下面我们进一步优化这一问题。

Redis命令介绍:SET

1、语法:

SETkeyvalue[EX seconds][PX milliseconds][NX|XX]

2、功能:

3、可选参数:

从 Redis 2.6.12 版本开始,SET 命令的行为可以通过一系列参数来修改:

因为 SET 命令可以通过参数来实现和 SETNX 、SETEX 和 PSETEX 三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。

4、返回值:

5、使用模式:

命令 SET resource-name anystring NX EXlock-time 是一种在 Redis 中实现锁的简单方法。客户端执行以上的命令:

设置的过期时间到达之后,锁将自动释放。可以通过以下修改,让这个锁实现更健壮:

这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。

以下是一个简单的解锁脚本示例:

ifredis. call( "get",KEYS[ 1]) == ARGV[ 1]

then

returnredis. call( "del",KEYS[ 1])

else

return0

end

6、源码:

packagecom.github.hutu92;

importcom.alibaba.fastjson.JSON;

importredis.clients.jedis.Jedis;

importredis.clients.jedis.JedisPool;

importjava.util.Collections;

importjava.util.UUID;

importjava.util.concurrent.locks.ReentrantLock;

/**

* Created by liuchunlong on 2018/9/4.

* <p>

* 基于redis的分布式锁 v2

* <p>

* 不需要客户端时间同步

*/

publicclassDistributedLock{

privatestaticfinallongRETRY_BARRIER = 600; // 重试屏障,单位毫秒

privatestaticfinallongINTERVAL_TIMES = 200; // 下一次重试等待,单位毫秒

privatefinalJedisPool jedisPool; // redis连接池

privatefinalString lockKey; // lock Key

privatefinallonglockExpiryInMillis; // 锁的过期时长,单位纳秒

privatefinalThreadLocal<Lock> lockThreadLocal = newThreadLocal<Lock>();

/**

* 构造方法

*

* @paramjedisPool redis连接池

* @paramlockKey 锁的Key

* @paramlockExpiryInMillis 锁的过期时长,单位毫秒

*/

publicDistributedLock(JedisPool jedisPool, String lockKey, longlockExpiryInMillis){

this.jedisPool = jedisPool;

this.lockKey = lockKey;

this.lockExpiryInMillis = lockExpiryInMillis;

}

/**

* 构造方法

* <p>

* 使用锁默认的过期时长Integer.MAX_VALUE,即锁永远不会过期

*

* @paramjedisPool redis连接池

* @paramlockKey 锁的Key

*/

publicDistributedLock(JedisPool jedisPool, String lockKey){

this(jedisPool, lockKey, Integer.MAX_VALUE);

}

/**

* 获取锁在redis中的Key标记

*

* @returnlocks key

*/

publicString getLockKey(){

returnthis.lockKey;

}

/**

* 锁的过期时长

*

* @return

*/

publiclonggetLockExpiryInMillis(){

returnlockExpiryInMillis;

}

/**

* can override

*

* @paramjedis

* @return

*/

privateString nextUid(Jedis jedis){

// 可以考虑雪花算法..

returnUUID.randomUUID().toString();

}

privatesynchronizedJedis getClient(){

returnjedisPool.getResource();

}

privatesynchronizedvoidcloseClient(Jedis jedis){

jedis.close();

}

/**

* 请求分布式锁,不会阻塞,直接返回

*

* @paramjedis redis 连接

* @return成功获取锁返回true, 否则返回false

*/

privatebooleantryAcquire(Jedis jedis){

finalLock nLock = newLock(nextUid(jedis));

String result = jedis.set( this.lockKey, nLock.toString(), "NX", "PX", this.lockExpiryInMillis);

if( "OK".equals(result)) {

lockThreadLocal.set(nLock);

returntrue;

}

returnfalse;

}

/**

* 请求分布式锁,不会阻塞,直接返回

*

* @return成功获取锁返回true, 否则返回false

*/

publicbooleantryAcquire(){

Jedis jedis = null;

try{

jedis = getClient();

returntryAcquire(jedis);

} finally{

if(jedis != null) {

closeClient(jedis);

}

}

}

/**

* 超时请求分布式锁,会阻塞

*

* 采用"自旋获取锁"的方式,直至获取锁成功或者请求锁超时

*

* @paramacquireTimeoutInMillis 锁的请求超时时长

* @return

*/

publicbooleanacquire(longacquireTimeoutInMillis)throwsInterruptedException {

Jedis jedis = null;

try{

jedis = getClient();

longacquireTime = System.currentTimeMillis();

longexpiryTime = System.currentTimeMillis() + acquireTimeoutInMillis; // 锁的请求到期时间

while(expiryTime >= System.currentTimeMillis()) {

booleanresult = tryAcquire(jedis);

if(result) { // 获取锁成功直接返回,否则循环重试

returntrue;

}

Thread.sleep(INTERVAL_TIMES);

}

} finally{

if(jedis != null) {

closeClient(jedis);

}

}

returnfalse;

}

/**

* 释放锁

*

* @return

*/

publicbooleanrelease()throwsInterruptedException {

returnrelease(Integer.MAX_VALUE);

}

/**

* 释放锁

*

* @return

*/

publicbooleanrelease(longreleaseTimeoutInMillis)throwsInterruptedException {

Jedis jedis = null;

try{

jedis = getClient();

returnrelease(jedis, releaseTimeoutInMillis);

} finally{

if(jedis != null) {

closeClient(jedis);

}

}

}

/**

* 释放锁

*

* @paramjedis

* @paramreleaseTimeoutInMillis

* @return

*/

privatebooleanrelease(Jedis jedis, longreleaseTimeoutInMillis)throwsInterruptedException {

Lock cLock = lockThreadLocal.get();

if(cLock == null) {

System.out.println( "lock is null!");

}

if(cLock != null) {

String lua = "if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end";

longreleaseTime = System.currentTimeMillis();

longexpiryTime = System.currentTimeMillis() + releaseTimeoutInMillis; // 锁的释放到期时间

while(expiryTime >= System.currentTimeMillis()) {

Object result = jedis.eval(lua, Collections.singletonList( this.lockKey),

Collections.singletonList(cLock.toString()));

if(((Long) result) == 1L) {

lockThreadLocal.remove();

returntrue;

}

Thread.sleep(INTERVAL_TIMES);

}

}

returnfalse;

}

/**

* 锁

*/

protectedstaticclassLock{

privateString uid; // lock 唯一标识

Lock(String uid) {

this.uid = uid;

}

publicString getUid(){

returnuid;

}

@Override

publicString toString(){

returnJSON.toJSONString( this, false);

}

}

}

性能调优

这里我们使用ab性能测试工具来模拟测试。

由于没有使用队列,对高并发请求进行削峰,所以所有的压力都会被打到redis上。为了测试方便我这里只是本地启动了单机redis,没有做其它的调优配置。

我们并发测试场景是1000个并发请求,总共2000个请求。

ab-n 2000-c 1000"localhost:8080/lock/v2/seckill"

上述的地址是一个接口,接口代码如下:

@RestController

@RequestMapping( "/lock")

publicclassLockController{

privatestaticLongAdder longAdder = newLongAdder();

privatestaticLong ACQUIRE_TIMEOUT_IN_MILLIS = ( long) Integer.MAX_VALUE;

privatestaticLong stock = 100000L;

privatestaticDistributedLock lock;

static{

longAdder.add(stock);

}

privatefinalJedisPool jedisPool;

@Autowired

publicLockController(JedisPool jedisPool){

this.jedisPool = jedisPool;

lock = newDistributedLock(jedisPool, "seckillV2_"+ UUID.randomUUID().toString());

}

@GetMapping( "/v2/seckill")

publicString seckillV2()throwsInterruptedException {

booleanacquireResult = false;

try{

acquireResult = lock.acquire(ACQUIRE_TIMEOUT_IN_MILLIS);

if(!acquireResult) {

return"人太多了,换个姿势操作一下!";

}

if(longAdder.longValue() == 0L) {

return"已抢光!";

}

doSomeThing(jedisPool);

longAdder.decrement();

System.out.println( "已抢: "+ (stock - longAdder.longValue()) + ", 还剩下: "+ longAdder.longValue());

} finally{

if(acquireResult) {

booleanreleaseResult = lock.release();

if(!releaseResult) {

System.out.println( "释放锁失败!");

}

}

}

return"OK";

}

privatevoiddoSomeThing(JedisPool jedisPool){

Jedis jedis = null;

try{

jedis = jedisPool.getResource();

jedis.incr( "already_bought");

} finally{

if(jedis != null) {

jedis.close();

}

}

}

}

那么我们这里说的性能调优指的是什么呢?

仔细分析上面的源码你会发现,获取锁的逻辑是循环获取的,再每次循环之间,应该怎么去处理?如果不做任何处理,直接继续下一个循环,表面上看能够及时的获取锁,但这会给Redis更大的压力,如果Redis扛不住,到最后只会适得其反;而如果sleep等待,那么等待多久呢?等待久了,锁的获取和释放就会不及时;使用yield如何?等等......

1、

if((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {

Thread.yield();

}

请求获取锁的前600毫秒内直接循环重试,如果超过600毫秒还未获取到锁则每次循环都将线程推迟到下一个时间片执行。


主要参数说明:

2、

if((System.currentTimeMillis() - acquireTime) > RETRY_BARRIER) {

Thread.sleep(INTERVAL_TIMES);

} else{

Thread.yield();

}

请求获取锁的前600毫秒内每次循环重试都先将线程推迟到下一个时间片,如果超过600毫秒还未获取到锁则每次循环都将线程休眠200毫秒。


很明显,出错率降低了很多,每个请求的耗时也减少了一半。这是因为No1中在600毫秒内的直接循环重试,会产生很多意义的请求,给Redis造成了巨大的压力,无法响应请求。

3、

Thread.sleep(INTERVAL_TIMES);

请求获取锁的每次循环重试都将线程休眠200毫秒。


4、

Thread.sleep(INTERVAL_TIMES * 10);

请求获取锁的每次循环重试都将线程休眠2秒。


很明显,休眠时间过长,会使部分线程请求锁的时间变长,不能够及时获取到锁。

5、

Thread.yield();

请求获取锁的每次循环重试都将线程推迟到下一个时间片执行。


总结

总的来说,No2与No3表现得都还可以。但是No2使用了Thread.yield();也会给Redis造成压力,我们对比下两者的 Percentage of the requests served within a certain time (ms) 数据。可以看到No3的90%以下请求的用户平均时间要明显低于No2的。所以最终我们选择No3策略。

当然你也可以根据你Redis的QPS自行调整策略。

作者:刘春龙,曾就职国美、优信,现就职于金山,担任消息队列服务开发。

声明:本文为作者投稿,版权归其个人所有。



该文章来源互联网,如有侵权请联系删除

热门推荐

  • 大话社区
  • 未解之谜
  • UFO
  • 离奇事件
  • 离奇事件
  • UFO
  • 考古发现
本站内容来自互联网,不提供任何保证,亦不承担任何法律责任.如有侵权请联系删除,QQ:759281825.
COPYRIGHT © 2014-2024 xiaoqiweb.com INC. ALL RIGHTS RESERVED. 版权所有 笑奇网粤ICP备17087216号