互斥锁
Redis里有一个设置如果不存在
的命令,我们可以通过这个命令来实现互斥锁功能,在Redis官方文档里面推荐的标准实现方式是SET resource_name my_random_value NX PX 30000
这串命令,其中:
值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:只有key存在并且存储的值和我指定的值一样才能告诉我删除成功,避免错误释放别的竞争者的锁。
由于涉及到两个操作,因此我们需要通过Lua脚本保证操作的原子性:
1 2 3 4 5 | if redis.call( "get" ,KEYS[1]) == ARGV[1] then
return redis.call( "del" ,KEYS[1])
else
return 0
end
|
登录后复制
举个不用Lua脚本的例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。
因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。
TryLock和Unlock实现
TryLock
其实就是使用SET resource_name my_random_value NX PX 30000
加锁,这里使用UUID
作为随机值,并且在加锁成功时把随机值返回,这个随机值会在Unlock
时使用;
Unlock
解锁逻辑就是执行前面说到的lua脚本
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result()
if err != nil {
return err
}
if !success {
return ErrLockFailed
}
l.randomValue = randomValue
return nil
}
func (l *Lock) Unlock(ctx context.Context) error {
return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
}
|
登录后复制
Lock实现
Lock
是阻塞的获取锁,因此在加锁失败的时候,需要重试。当然也可能出现其他异常情况(比如网络问题,请求超时等),这些情况则直接返回error
。
步骤如下:
尝试加锁,加锁成功直接返回
加锁失败则不断循环尝试加锁直到成功或出现异常情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | func (l *Lock) Lock(ctx context.Context) error {
err := l.TryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockFailed) {
return err
}
ticker := time.NewTicker(l.tryLockInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ErrTimeout
case <-ticker.C:
err := l.TryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockFailed) {
return err
}
}
}
}
|
登录后复制
实现看门狗机制
我们前面的例子中提到的互斥锁有一个小问题,就是如果持有锁客户端A被阻塞,那么A的锁可能会超时被自动释放,导致客户端B提前获取到锁。
为了减少这种情况的发生,我们可以在A持有锁期间,不断地延长锁的过期时间,减少客户端B提前获取到锁的情况,这就是看门狗机制。
当然,这没办法完全避免上述情况的发生,因为如果客户端A获取锁之后,刚好与Redis的连接关闭了,这时候也就没办法延长超时时间了。
看门狗实现
加锁成功时启动一个线程,不断地延长锁地过期时间;在Unlock时关闭看门狗线程。
看门狗流程如下:
加锁成功,启动看门狗
看门狗线程不断延长锁的过程时间
解锁,关闭看门狗
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | func (l *Lock) startWatchDog() {
ticker := time.NewTicker(l.ttl / 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2)
ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result()
cancel()
if err != nil || !ok {
return
}
case <-l.watchDog:
return
}
}
}
|
登录后复制
TryLock:启动看门狗
1 2 3 4 5 6 7 8 9 10 11 12 13 | func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
if err != nil {
return err
}
if !success {
return ErrLockFailed
}
go l.startWatchDog()
return nil
}
|
登录后复制
Unlock:关闭看门狗
1 2 3 4 5 6 | func (l *Lock) Unlock(ctx context.Context) error {
err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
close(l.watchDog)
return err
}
|
登录后复制
红锁
由于上面的实现是基于单Redis实例,如果这个唯一的实例挂了,那么所有请求都会因为拿不到锁而失败,为了提高容错性,我们可以使用多个分布在不同机器上的Redis实例,并且只要拿到其中大多数节点的锁就能加锁成功,这就是红锁算法。它其实也是基于上面的单实例算法的,只是我们需要同时对多个Redis实例获取锁。
加锁实现
在加锁逻辑里,我们主要是对每个Redis实例执行SET resource_name my_random_value NX PX 30000
获取锁,然后把成功获取锁的客户端放到一个channel
里(这里因为是多线程并发获取锁,使用slice可能有并发问题),同时使用sync.WaitGroup
等待所有获取锁操作结束。
然后判断成功获取到的锁的数量是否大于一半,如果没有得到一半以上的锁,说明加锁失败,释放已经获得的锁。
如果加锁成功,则启动看门狗延长锁的过期时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | func (l *RedLock) TryLock(ctx context.Context) error {
randomValue := gofakeit.UUID()
var wg sync.WaitGroup
wg.Add(len(l.clients))
successClients := make(chan *redis.Client, len(l.clients))
for _, client := range l.clients {
go func(client *redis.Client) {
defer wg.Done()
success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result()
if err != nil {
return
}
if !success {
return
}
go l.startWatchDog()
successClients <- client
}(client)
}
wg.Wait()
close(successClients)
if len(successClients) < len(l.clients)/2+1 {
for client := range successClients {
go func(client *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), ttl)
l.script.Run(ctx, client, []string{l.resource}, randomValue)
cancel()
}(client)
}
return ErrLockFailed
}
l.randomValue = randomValue
l.successClients = nil
for successClient := range successClients {
l.successClients = append(l.successClients, successClient)
}
return nil
}
|
登录后复制
看门狗实现
我们需要延长所有成功获取到的锁的过期时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | func (l *RedLock) startWatchDog() {
l.watchDog = make(chan struct{})
ticker := time.NewTicker(resetTTLInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
for _, client := range l.successClients {
go func(client *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval)
client.Expire(ctx, l.resource, ttl)
cancel()
}(client)
}
case <-l.watchDog:
return
}
}
}
|
登录后复制
解锁实现
我们需要解锁所有成功获取到的锁。
1 2 3 4 5 6 7 8 9 10 | func (l *RedLock) Unlock(ctx context.Context) error {
for _, client := range l.successClients {
go func(client *redis.Client) {
l.script.Run(ctx, client, []string{l.resource}, l.randomValue)
}(client)
}
close(l.watchDog)
return nil
}
|
登录后复制
以上是怎么用Go和Redis实现分布式互斥锁和红锁的详细内容。更多信息请关注PHP中文网其他相关文章!