分布式锁和分布式限流器应该是算是比较常见的需求了,而Redis现在几乎是应用的标配了,于是很多人会倾向于选择基于Redis来实现,因为不需要引入额外的依赖。
分布式锁和分布式限流器在Java领域比较成熟和常用的开源实现是Redisson(中文官方介绍),下面从它的极小部分源码,分析下分布式锁和分布式限流器的实现逻辑。
分布式锁的实现
加锁
Redisson中加锁实现的核心源码如下,为了实现操作的原子性,不得不使用eval命令加Lua脚本的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
|
把上面的Lua代码的逻辑翻译成redis命令组成的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| //获取锁 if exists $lockName == 0 hset $lockName $threadUid 1 pexpire $lockName $expireTime return nil //重入锁 if hexists $lockName $threadUid == 1 hincrby $lockName $threadUid 1 pexpire $lockName $expireTime return nil //获取失败 return pttl $lockName
|
现在逻辑很清晰了吧,注意这里的锁使用了hash结构而不是string,因为实现的是可重入锁,需要记录加锁的线程标识,并维持一个计数器。
有的需求是要线程一直等待直到获取到锁的,Redisson中通过subscribe命令订阅锁相关的一个channel,收到通知后会再次尝试获取锁。
关于可重入锁,我在Scala并发编程实战 2:Lock 锁中有解释过:
所谓可重入锁,也就是说一个线程可以在持有该锁的时候,再次获取该锁。可重入锁通常与一个计数器关联,第一次获取锁的时候,计数器从0变为1,再次获取锁,变为2,以此类推。释放锁的时候,计数器每次减1,直至减为0,该锁才真正释放给其他线程。
另外需要注意的是,这里实现的锁是有过期时间的,如果需要一直持有锁,redisson中的实现是,会用一个定时任务不断去刷新过期时间。(可以想想,为什么不用一个永不过期的key来实现呢?)
解锁
下面看解锁的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
|
核心也是Lua实现,翻译下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| //锁是否存在 if exists $lockName == 0 publish $channel 0 return 1 //是否持有锁 if hexists $lockName $threadUid ==0 return nil
//减少持有计数 counter = hincrby $lockName $threadUid -1 if counter > 0 //刷新锁 pexpire $lockName $expireTime return 0 else //释放锁 del $lockName publish $channel 0 return 1
|
逻辑应该容易看懂,其中publish是为了通知其他线程锁释放了,可以来抢啦。
分布式限流器的实现
如果一个服务有多个实例,做限流的话,通常有两种方式,一种是对每个实例单独限制,一种是对所有实例整体的流量做限制。
Redisson中对这两种都有实现,以限流器的类型来区分,第一种称作PER_CLIENT,第二种为OVERALL。
限流算法有若干种,Redisson采用的是令牌桶算法,核心实现同样是Lua:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "local rate = redis.call('hget', KEYS[1], 'rate');" + "local interval = redis.call('hget', KEYS[1], 'interval');" + "local type = redis.call('hget', KEYS[1], 'type');" + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')" + "local valueName = KEYS[2];" + "if type == '1' then " + "valueName = KEYS[3];" + "end;" + "local currentValue = redis.call('get', valueName); " + "if currentValue ~= false then " + "if tonumber(currentValue) < tonumber(ARGV[1]) then " + "return redis.call('pttl', valueName); " + "else " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end; " + "else " + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); " + "redis.call('set', valueName, rate, 'px', interval); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", Arrays.asList(getName(), getValueName(), getClientValueName()), value);
|
简化为伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| //令牌桶容量 $rate = hget $configKey rate //间隔(比如rate=10,interval=1000,就代表1000毫秒中允许10次,即QPS为10) $interval = hget $configKey interval //类型 $type = hget $configKey type //检查限流器是否已经初始化 assert($rate,$interval,$type都存在)
//$valueName是存储令牌桶的key $valueName = $valueKey //如果是PER_CLIENT类型 if type == '1' $valueName = $clientValueKey
//$currentValue是当前令牌数,$acquireValue是申请的令牌数 $currentValue = get $valueName if $currentValue != false //令牌桶已经存在
if $currentValue < $acquireValue //令牌小于申请数,申请令牌失败 return pttl $valueName else //令牌 减去 申请数 decrby $valueName $acquireValue else //令牌桶不存在
//检查申请数不能大于令牌桶容量 assert($rate>$acquireValue) //创建令牌桶 set $valueName $rate px $interval decrby $valueName $acquireValue return nil
|
值得一提的是,令牌桶的配置信息也保存到了redis中,可以看到rate,interval,type信息都是从一个hash结构中获取的,从获取令牌桶配置到更新令牌桶整个过程是原子化的。这是考虑到多个应用实例的情况下,每个实例拿到的限流器的配置可能出现不一致的情况,特别是在修改限流器配置的时候。
参考文献
Redisson github
转载请注明原文地址:Liam’s Blog