Es gibt eine in Redis设置如果不存在
的命令,我们可以通过这个命令来实现互斥锁功能,在Redis官方文档里面推荐的标准实现方式是SET resource_name my_random_value NX PX 30000
这串命令,其中:
resource_name
表示要锁定的资源
NX
表示如果不存在则设置
PX 30000
表示过期时间为30000毫秒,也就是30秒
my_random_value
这个值在所有的客户端必须是唯一的,所有同一key的锁竞争者这个值都不能一样。
值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:只有key存在并且存储的值和我指定的值一样才能告诉我删除成功,避免错误释放别的竞争者的锁。
由于涉及到两个操作,因此我们需要通过Lua脚本保证操作的原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
举个不用Lua脚本的例子:客户端A取得资源锁,但是紧接着被一个其他操作阻塞了,当客户端A运行完毕其他操作后要释放锁时,原来的锁早已超时并且被Redis自动释放,并且在这期间资源锁又被客户端B再次获取到。
因为判断和删除是两个操作,所以有可能A刚判断完锁就过期自动释放了,然后B就获取到了锁,然后A又调用了Del,导致把B的锁给释放了。
TryLock
其实就是使用SET resource_name my_random_value NX PX 30000
加锁,这里使用UUID
作为随机值,并且在加锁成功时把随机值返回,这个随机值会在Unlock
时使用;
Unlock
解锁逻辑就是执行前面说到的lua脚本
。
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功 l.randomValue = randomValue return nil } func (l *Lock) Unlock(ctx context.Context) error { return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() }
Lock
是阻塞的获取锁,因此在加锁失败的时候,需要重试。当然也可能出现其他异常情况(比如网络问题,请求超时等),这些情况则直接返回error
.
Die Schritte sind wie folgt:
Versuchen Sie zu sperren und kehren Sie direkt zurück, wenn die Sperre erfolgreich ist.
Wenn die Sperre fehlschlägt, wird die Schleife fortgesetzt und versucht zu sperren, bis sie erfolgreich ist oder eine ungewöhnliche Situation auftritt
func (l *Lock) Lock(ctx context.Context) error { // 尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } // 加锁失败,不断尝试 ticker := time.NewTicker(l.tryLockInterval) defer ticker.Stop() for { select { case <-ctx.Done(): // 超时 return ErrTimeout case <-ticker.C: // 重新尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } } } }
Wir haben ein kleines Problem mit der Mutex-Sperre, die im vorherigen Beispiel erwähnt wurde. Das heißt, wenn Client A, der die Sperre hält, blockiert ist, kann die Sperre von A nach einer Zeitüberschreitung automatisch freigegeben werden, was zu Client-Fehlern führt B, um das Schloss im Voraus zu erwerben.
Um das Auftreten dieser Situation zu reduzieren, können wir die Ablaufzeit der Sperre kontinuierlich verlängern, während A die Sperre hält, und die Situation reduzieren, in der Client B die Sperre im Voraus erhält. Dies ist der Watchdog-Mechanismus.
Natürlich gibt es keine Möglichkeit, die obige Situation vollständig zu vermeiden, denn wenn Client A nach dem Erwerb der Sperre zufällig die Verbindung mit Redis schließt, gibt es keine Möglichkeit, das Timeout zu verlängern.
Starten Sie einen Thread, wenn die Sperre erfolgreich ist, und verlängern Sie kontinuierlich die Ablaufzeit der Sperre. Schließen Sie den Watchdog-Thread, wenn die Sperre ausgeführt wird.
Der Watchdog-Prozess ist wie folgt:
Die Sperre ist erfolgreich, starten Sie den Watchdog
Der Watchdog-Thread verlängert weiterhin die Sperrprozesszeit
Entsperren, schließen Sie den Watchdog
func (l *Lock) startWatchDog() { ticker := time.NewTicker(l.ttl / 3) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2) ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result() cancel() // 异常或锁已经不存在则不再续期 if err != nil || !ok { return } case <-l.watchDog: // 已经解锁 return } } }
TryLock : Watchdog starten
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功,启动看门狗 go l.startWatchDog() return nil }
Unlock: Watchdog ausschalten
func (l *Lock) Unlock(ctx context.Context) error { err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() // 关闭看门狗 close(l.watchDog) return err }
Da die obige Implementierung auf einer einzelnen Redis-Instanz basiert, schlagen alle Anforderungen fehl, da die Sperre nicht erhalten werden kann Um die Fehlertoleranz zu verbessern, können wir mehrere Redis-Instanzen verwenden, die auf verschiedenen Maschinen verteilt sind, und die Sperre kann erfolgreich sein, solange wir die Sperren der meisten Knoten erhalten. Dies ist der Red-Lock-Algorithmus. Es basiert tatsächlich auf dem oben genannten Einzelinstanzalgorithmus, mit der Ausnahme, dass wir Sperren für mehrere Redis-Instanzen gleichzeitig erwerben müssen.
在加锁逻辑里,我们主要是对每个Redis实例执行SET resource_name my_random_value NX PX 30000
获取锁,然后把成功获取锁的客户端放到一个channel
里(这里因为是多线程并发获取锁,使用slice可能有并发问题),同时使用sync.WaitGroup
等待所有获取锁操作结束。
然后判断成功获取到的锁的数量是否大于一半,如果没有得到一半以上的锁,说明加锁失败,释放已经获得的锁。
如果加锁成功,则启动看门狗延长锁的过期时间。
func (l *RedLock) TryLock(ctx context.Context) error { randomValue := gofakeit.UUID() var wg sync.WaitGroup wg.Add(len(l.clients)) // 成功获得锁的Redis实例的客户端 successClients := make(chan *redis.Client, len(l.clients)) for _, client := range l.clients { go func(client *redis.Client) { defer wg.Done() success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result() if err != nil { return } // 加锁失败 if !success { return } // 加锁成功,启动看门狗 go l.startWatchDog() successClients <- client }(client) } // 等待所有获取锁操作完成 wg.Wait() close(successClients) // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败 if len(successClients) < len(l.clients)/2+1 { // 就算加锁失败,也要把已经获得的锁给释放掉 for client := range successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl) l.script.Run(ctx, client, []string{l.resource}, randomValue) cancel() }(client) } return ErrLockFailed } // 加锁成功,启动看门狗 l.randomValue = randomValue l.successClients = nil for successClient := range successClients { l.successClients = append(l.successClients, successClient) } return nil }
我们需要延长所有成功获取到的锁的过期时间。
func (l *RedLock) startWatchDog() { l.watchDog = make(chan struct{}) ticker := time.NewTicker(resetTTLInterval) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 for _, client := range l.successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval) client.Expire(ctx, l.resource, ttl) cancel() }(client) } case <-l.watchDog: // 已经解锁 return } } }
我们需要解锁所有成功获取到的锁。
func (l *RedLock) Unlock(ctx context.Context) error { for _, client := range l.successClients { go func(client *redis.Client) { l.script.Run(ctx, client, []string{l.resource}, l.randomValue) }(client) } // 关闭看门狗 close(l.watchDog) return nil }
Das obige ist der detaillierte Inhalt vonSo implementieren Sie verteilte Mutex-Sperren und rote Sperren mit Go und Redis. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!