Redis裡有一個設定如果不存在
的命令,我們可以透過這個指令來實現互斥鎖功能,在Redis官方文件裡面推薦的標準實現方式是SET resource_name my_random_value NX PX 30000
這串指令,其中:
##resource_name表示要鎖定的資源
表示要鎖定的資源
表示如果不存在則設定
表示過期時間為30000毫秒,也就是30秒
這個值在所有的客戶端必須是唯一的,而所有同一key的鎖定競爭者這個值都不能一樣。
值必須是隨機數主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在並且儲存的值和我指定的值一樣才能告訴我刪除成功,避免錯誤釋放別的競爭者的鎖。
由於涉及兩個操作,因此我們需要透過Lua腳本保證操作的原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
因為判斷和刪除是兩個操作,所以有可能A剛判斷完鎖就過期自動釋放了,然後B就獲取到了鎖,然後A又調用了Del,導致把B的鎖給釋放了。 TryLock和Unlock實作
TryLock
其實就是用SET resource_name my_random_value NX PX 30000
加鎖,這裡使用
#UUID#UUID
作為隨機值,並且在加鎖成功時把隨機值返回,這個隨機值會在Unlock
時使用;
解鎖邏輯就是執行前面說到的lua腳本
。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result()
if err != nil {
return err
}
// 加锁失败
if !success {
return ErrLockFailed
}
// 加锁成功
l.randomValue = randomValue
return nil
}
func (l *Lock) Unlock(ctx context.Context) error {
return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
}</pre><div class="contentsignin">登入後複製</div></div>
Lock實作
。
加鎖失敗則不斷循環嘗試加鎖直到成功或出現異常情況
func (l *Lock) Lock(ctx context.Context) error { // 尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } // 加锁失败,不断尝试 ticker := time.NewTicker(l.tryLockInterval) defer ticker.Stop() for { select { case <-ctx.Done(): // 超时 return ErrTimeout case <-ticker.C: // 重新尝试加锁 err := l.TryLock(ctx) if err == nil { return nil } if !errors.Is(err, ErrLockFailed) { return err } } } }
實現看門狗機制
為了減少這種情況的發生,我們可以在A持有鎖期間,不斷地延長鎖的過期時間,減少客戶端B提前獲取到鎖的情況,這就是看門狗機制。
當然,這沒辦法完全避免上述情況的發生,因為如果客戶端A取得鎖定之後,剛好與Redis的連線關閉了,這時候也就沒辦法延長超時時間了。
加鎖成功時啟動一個線程,不斷地延長鎖地過期時間;在Unlock時關閉看門狗線程。
解鎖,看門狗
func (l *Lock) startWatchDog() { ticker := time.NewTicker(l.ttl / 3) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2) ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result() cancel() // 异常或锁已经不存在则不再续期 if err != nil || !ok { return } case <-l.watchDog: // 已经解锁 return } } }
TryLock:啟動看門狗
func (l *Lock) TryLock(ctx context.Context) error { success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result() if err != nil { return err } // 加锁失败 if !success { return ErrLockFailed } // 加锁成功,启动看门狗 go l.startWatchDog() return nil }
func (l *Lock) Unlock(ctx context.Context) error { err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err() // 关闭看门狗 close(l.watchDog) return err }
在加锁逻辑里,我们主要是对每个Redis实例执行SET resource_name my_random_value NX PX 30000
获取锁,然后把成功获取锁的客户端放到一个channel
里(这里因为是多线程并发获取锁,使用slice可能有并发问题),同时使用sync.WaitGroup
等待所有获取锁操作结束。
然后判断成功获取到的锁的数量是否大于一半,如果没有得到一半以上的锁,说明加锁失败,释放已经获得的锁。
如果加锁成功,则启动看门狗延长锁的过期时间。
func (l *RedLock) TryLock(ctx context.Context) error { randomValue := gofakeit.UUID() var wg sync.WaitGroup wg.Add(len(l.clients)) // 成功获得锁的Redis实例的客户端 successClients := make(chan *redis.Client, len(l.clients)) for _, client := range l.clients { go func(client *redis.Client) { defer wg.Done() success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result() if err != nil { return } // 加锁失败 if !success { return } // 加锁成功,启动看门狗 go l.startWatchDog() successClients <- client }(client) } // 等待所有获取锁操作完成 wg.Wait() close(successClients) // 如果成功加锁得客户端少于客户端数量的一半+1,表示加锁失败 if len(successClients) < len(l.clients)/2+1 { // 就算加锁失败,也要把已经获得的锁给释放掉 for client := range successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl) l.script.Run(ctx, client, []string{l.resource}, randomValue) cancel() }(client) } return ErrLockFailed } // 加锁成功,启动看门狗 l.randomValue = randomValue l.successClients = nil for successClient := range successClients { l.successClients = append(l.successClients, successClient) } return nil }
我们需要延长所有成功获取到的锁的过期时间。
func (l *RedLock) startWatchDog() { l.watchDog = make(chan struct{}) ticker := time.NewTicker(resetTTLInterval) defer ticker.Stop() for { select { case <-ticker.C: // 延长锁的过期时间 for _, client := range l.successClients { go func(client *redis.Client) { ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval) client.Expire(ctx, l.resource, ttl) cancel() }(client) } case <-l.watchDog: // 已经解锁 return } } }
我们需要解锁所有成功获取到的锁。
func (l *RedLock) Unlock(ctx context.Context) error { for _, client := range l.successClients { go func(client *redis.Client) { l.script.Run(ctx, client, []string{l.resource}, l.randomValue) }(client) } // 关闭看门狗 close(l.watchDog) return nil }
以上是怎麼用Go和Redis實現分散式互斥鎖和紅鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!