基于Redis实现分布式锁和分布式限流器

分布式锁和分布式限流器应该是算是比较常见的需求了,而Redis现在几乎是应用的标配了,于是很多人会倾向于选择基于Redis来实现,因为不需要引入额外的依赖。

分布式锁和分布式限流器在Java领域比较成熟和常用的开源实现是Redisson(中文官方介绍),下面从它的极小部分源码,分析下分布式锁和分布式限流器的实现逻辑。

分布式锁的实现

加锁

Redisson中加锁实现的核心源码如下,为了实现操作的原子性,不得不使用eval命令加Lua脚本的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

把上面的Lua代码的逻辑翻译成redis命令组成的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
//获取锁
if exists $lockName == 0
hset $lockName $threadUid 1
pexpire $lockName $expireTime
return nil
//重入锁
if hexists $lockName $threadUid == 1
hincrby $lockName $threadUid 1
pexpire $lockName $expireTime
return nil
//获取失败
return pttl $lockName

现在逻辑很清晰了吧,注意这里的锁使用了hash结构而不是string,因为实现的是可重入锁,需要记录加锁的线程标识,并维持一个计数器。

有的需求是要线程一直等待直到获取到锁的,Redisson中通过subscribe命令订阅锁相关的一个channel,收到通知后会再次尝试获取锁。

关于可重入锁,我在Scala并发编程实战 2:Lock 锁中有解释过:

所谓可重入锁,也就是说一个线程可以在持有该锁的时候,再次获取该锁。可重入锁通常与一个计数器关联,第一次获取锁的时候,计数器从0变为1,再次获取锁,变为2,以此类推。释放锁的时候,计数器每次减1,直至减为0,该锁才真正释放给其他线程。

另外需要注意的是,这里实现的锁是有过期时间的,如果需要一直持有锁,redisson中的实现是,会用一个定时任务不断去刷新过期时间。(可以想想,为什么不用一个永不过期的key来实现呢?)

解锁

下面看解锁的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

核心也是Lua实现,翻译下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//锁是否存在
if exists $lockName == 0
publish $channel 0
return 1
//是否持有锁
if hexists $lockName $threadUid ==0
return nil

//减少持有计数
counter = hincrby $lockName $threadUid -1
if counter > 0
//刷新锁
pexpire $lockName $expireTime
return 0
else
//释放锁
del $lockName
publish $channel 0
return 1

逻辑应该容易看懂,其中publish是为了通知其他线程锁释放了,可以来抢啦。

分布式限流器的实现

如果一个服务有多个实例,做限流的话,通常有两种方式,一种是对每个实例单独限制,一种是对所有实例整体的流量做限制。
Redisson中对这两种都有实现,以限流器的类型来区分,第一种称作PER_CLIENT,第二种为OVERALL。

限流算法有若干种,Redisson采用的是令牌桶算法,核心实现同样是Lua:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"

+ "local valueName = KEYS[2];"
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "end;"

+ "local currentValue = redis.call('get', valueName); "
+ "if currentValue ~= false then "
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "return redis.call('pttl', valueName); "
+ "else "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end; "
+ "else "
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
+ "redis.call('set', valueName, rate, 'px', interval); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end;",
Arrays.asList(getName(), getValueName(), getClientValueName()),
value);

简化为伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//令牌桶容量
$rate = hget $configKey rate
//间隔(比如rate=10,interval=1000,就代表1000毫秒中允许10次,即QPS为10)
$interval = hget $configKey interval
//类型
$type = hget $configKey type
//检查限流器是否已经初始化
assert($rate,$interval,$type都存在)

//$valueName是存储令牌桶的key
$valueName = $valueKey
//如果是PER_CLIENT类型
if type == '1'
$valueName = $clientValueKey

//$currentValue是当前令牌数,$acquireValue是申请的令牌数
$currentValue = get $valueName
if $currentValue != false
//令牌桶已经存在

if $currentValue < $acquireValue
//令牌小于申请数,申请令牌失败
return pttl $valueName
else
//令牌 减去 申请数
decrby $valueName $acquireValue
else
//令牌桶不存在

//检查申请数不能大于令牌桶容量
assert($rate>$acquireValue)
//创建令牌桶
set $valueName $rate px $interval
decrby $valueName $acquireValue
return nil

值得一提的是,令牌桶的配置信息也保存到了redis中,可以看到rate,interval,type信息都是从一个hash结构中获取的,从获取令牌桶配置到更新令牌桶整个过程是原子化的。这是考虑到多个应用实例的情况下,每个实例拿到的限流器的配置可能出现不一致的情况,特别是在修改限流器配置的时候。

参考文献

Redisson github
转载请注明原文地址:Liam’s Blog