Redis设置如果不存在
的命令,我们可以通过这个命令来实现互斥锁功能,在Redis官方文档里面推荐的标准实现方式是SET resource_name my_random_value NX PX 30000
这串命令,其中:
resource_name
表示要锁定的资源
NX
表示如果不存在则设置
PX 30000
表示过期时间为30000毫秒,也就是30秒
my_random_value
这个值在所有的客户端必须是唯一的,所有同一key的锁竞争者这个值都不能一样。
值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:只有key存在并且存储的值和我指定的值一样才能告诉我删除成功,避免错误释放别的竞争者的锁。
由于涉及到两个操作,因此我们需要通过Lua脚本保证操作的原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
举个不用Lua脚本的例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。
因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。
TryLock
其实就是使用SET resource_name my_random_value NX PX 30000
加锁,这里使用UUID
作为随机值,并且在加锁成功时把随机值返回,这个随机值会在Unlock
时使用;
Unlock
解锁逻辑就是执行前面说到的lua脚本
。
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功 l.randomValue = randomValue return nil } func (l *Lock) Unlock(ctx context.Context) error { return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() }
Lock
是阻塞的获取锁,因此在加锁失败的时候,需要重试。当然也可能出现其他异常情况(比如网络问题,请求超时等),这些情况则直接返回error
에는 하나가 있습니다.
단계는 다음과 같습니다.
잠금을 시도하고, 잠금에 성공하면 직접 반환합니다.
잠금에 실패하면 성공하거나 비정상적인 상황이 발생할 때까지 계속 루프를 돌면서 잠금을 시도합니다.
func (l *Lock) Lock(ctx context.Context) error { // 尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } // 加锁失败,不断尝试 ticker := time.NewTicker(l.tryLockInterval) defer ticker.Stop() for { select { case <-ctx.Done(): // 超时 return ErrTimeout case <-ticker.C: // 重新尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } } } }
We 이전 예에서 언급한 뮤텍스 잠금에 작은 문제가 있습니다. 즉, 잠금을 보유하고 있는 클라이언트 A가 차단되면 A의 잠금이 시간 초과 후 자동으로 해제될 수 있습니다. B 미리 잠금을 획득합니다.
이러한 상황의 발생을 줄이기 위해 A가 잠금을 보유하고 있는 동안 잠금의 만료 시간을 지속적으로 연장하고 클라이언트 B가 미리 잠금을 획득하는 상황을 줄일 수 있는 것이 감시 메커니즘입니다.
물론, 클라이언트 A가 잠금을 획득한 후 우연히 Redis와의 연결을 끊는 경우 타임아웃을 연장할 수 있는 방법이 없기 때문에 위의 상황을 완전히 피할 수 있는 방법은 없습니다.
잠금이 성공하면 스레드를 시작하고 잠금 만료 시간을 계속 연장합니다. 잠금 해제가 수행되면 Watchdog 스레드를 닫습니다.
워치독 프로세스는 다음과 같습니다.
잠금이 성공했습니다. 워치독을 시작하세요
워치독 스레드가 잠금 프로세스 시간을 계속 연장합니다
잠금 해제, 워치독 닫기
func (l *Lock) startWatchDog() { ticker := time.NewTicker(l.ttl / 3) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2) ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result() cancel() // 异常或锁已经不存在则不再续期 if err != nil || !ok { return } case <-l.watchDog: // 已经解锁 return } } }
TryLock : Watchdog 시작
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功,启动看门狗 go l.startWatchDog() return nil }
Unlock: watchdog 끄기
func (l *Lock) Unlock(ctx context.Context) error { err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() // 关闭看门狗 close(l.watchDog) return err }
위 구현은 단일 Redis 인스턴스를 기반으로 하기 때문에 이 인스턴스만 정지되면 잠금을 얻을 수 없기 때문에 모든 요청이 실패합니다. 내결함성을 향상시키기 위해 서로 다른 시스템에 분산된 여러 Redis 인스턴스를 사용할 수 있으며 대부분의 노드에 대한 잠금을 얻는 한 성공적으로 잠글 수 있습니다. 이것이 빨간색 잠금 알고리즘입니다. 동시에 여러 Redis 인스턴스에 대한 잠금을 획득해야 한다는 점을 제외하면 실제로 위의 단일 인스턴스 알고리즘을 기반으로 합니다.
在加锁逻辑里,我们主要是对每个Redis实例执行SET resource_name my_random_value NX PX 30000
获取锁,然后把成功获取锁的客户端放到一个channel
里(这里因为是多线程并发获取锁,使用slice可能有并发问题),同时使用sync.WaitGroup
等待所有获取锁操作结束。
然后判断成功获取到的锁的数量是否大于一半,如果没有得到一半以上的锁,说明加锁失败,释放已经获得的锁。
如果加锁成功,则启动看门狗延长锁的过期时间。
func (l *RedLock) TryLock(ctx context.Context) error { randomValue := gofakeit.UUID() var wg sync.WaitGroup wg.Add(len(l.clients)) // 成功获得锁的Redis实例的客户端 successClients := make(chan *redis.Client, len(l.clients)) for _, client := range l.clients { go func(client *redis.Client) { defer wg.Done() success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result() if err != nil { return } // 加锁失败 if !success { return } // 加锁成功,启动看门狗 go l.startWatchDog() successClients <- client }(client) } // 等待所有获取锁操作完成 wg.Wait() close(successClients) // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败 if len(successClients) < len(l.clients)/2+1 { // 就算加锁失败,也要把已经获得的锁给释放掉 for client := range successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl) l.script.Run(ctx, client, []string{l.resource}, randomValue) cancel() }(client) } return ErrLockFailed } // 加锁成功,启动看门狗 l.randomValue = randomValue l.successClients = nil for successClient := range successClients { l.successClients = append(l.successClients, successClient) } return nil }
我们需要延长所有成功获取到的锁的过期时间。
func (l *RedLock) startWatchDog() { l.watchDog = make(chan struct{}) ticker := time.NewTicker(resetTTLInterval) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 for _, client := range l.successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval) client.Expire(ctx, l.resource, ttl) cancel() }(client) } case <-l.watchDog: // 已经解锁 return } } }
我们需要解锁所有成功获取到的锁。
func (l *RedLock) Unlock(ctx context.Context) error { for _, client := range l.successClients { go func(client *redis.Client) { l.script.Run(ctx, client, []string{l.resource}, l.randomValue) }(client) } // 关闭看门狗 close(l.watchDog) return nil }
위 내용은 Go 및 Redis를 사용하여 분산 뮤텍스 잠금 및 빨간색 잠금을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!