首頁 > 資料庫 > Redis > 怎麼在SpringBoot中使用Redis實現分散式鎖

怎麼在SpringBoot中使用Redis實現分散式鎖

WBOY
發布: 2023-06-03 08:16:32
轉載
1530 人瀏覽過

一、Redis實作分散式鎖定原理

為什麼需要分散式鎖定

在聊天分散式鎖定之前,有必要先解釋一下,為什麼需要分散式鎖定

與分散式鎖相對就的是單機鎖,我們在寫多執行緒程式時,避免同時操作一個共享變數產生資料問題,通常會使用一把鎖來互斥以保證共享變數的正確性,其使用範圍在同一個進程中。如果換做是多個進程,需要同時操作一個共享資源,如何互斥?現在的業務應用通常是微服務架構,這也意味著一個應用會部署多個進程,多個進程如果需要修改MySQL中的同一行記錄,為了避免操作亂序導致髒數據,此時就需要引入分佈式鎖了。

怎麼在SpringBoot中使用Redis實現分散式鎖

想要實作分散式鎖,必須藉助一個外部系統,所有行程都去這個系統上申請加鎖。這個外部系統必須具有互斥能力,也就是說,如果兩個請求同時到達,系統只會成功地為一個進程加鎖,而另一個進程會失敗。這個外部系統可以是資料庫,也可以是Redis或Zookeeper,但為了追求效能,我們通常會選擇使用Redis或Zookeeper來做。

Redis可以作為一個共享儲存系統,多個客戶端可以共享訪問,因此可以被用來保存分散式鎖定。而且 Redis 的讀寫效能高,可以應付高並發的鎖定操作場景。這篇文章的重點在於介紹如何使用Redis實現分散式鎖定,並探討在實作過程中可能會遇到的問題。

分散式鎖定如何實現

作為分散式鎖定實作過程中的共用儲存系統,Redis可以使用鍵值對來保存鎖定變量,在接收和處理不同客戶端發送的加鎖和釋放鎖的操作請求。那麼,鍵值對的鍵和值具體是怎麼定的呢?我們要賦予鎖變數一個變數名,把這個變數名當作鍵值對的鍵,而鎖變數的值,則是鍵值對的值,這樣一來,Redis就能保存鎖變數了,客戶端也就可以透過Redis的命令操作來實現鎖定操作。

想要實作分散式鎖定,必須要求Redis有互斥的能力。可以使用SETNX指令,其意義是SET IF NOT EXIST,也就是如果key不存在,才會設定它的值,否則什麼都不做。實作一種分散式鎖的方法是,兩個客戶端進程互斥地執行該命令。

以下展示了Redis使用key/value對保存鎖定變量,以及兩個客戶端同時請求加鎖的操作過程。

怎麼在SpringBoot中使用Redis實現分散式鎖

加上鎖定作業完成後,加上鎖定成功的客戶端,就可以去操作共享資源,例如,修改MySQL的某一行資料。操作完成後,也要及時釋放鎖,給後來者讓出操作共享資源的機會。如何釋放鎖呢?直接使用DEL指令刪除這個key即可。這個邏輯非常簡單,整體的流程寫成偽程式碼就是下面這樣。

// 加锁
SETNX lock_key 1
// 业务逻辑
DO THINGS
// 释放锁
DEL lock_key
登入後複製

但是,以上實作有一個很大的問題,當客戶端1拿到鎖後,如果發生下面的場景,就會造成死鎖。

程式處理業務邏輯異常,沒及時釋放鎖定進程掛了,沒機會釋放鎖定

以上情況會導致已經取得鎖定的用戶端一直佔用鎖,其他用戶端永遠無法取得到鎖。

如何避免死鎖

為了解決以上死鎖問題,最容易想到的方案是在申請鎖定時,在Redis中實作時,給鎖定設定一個過期時間,假設操作共享資源的時間不會超過10s,那麼加鎖時,給這個key設定10s過期即可。

但以上操作還是有問題,加鎖、設定過期時間是2條指令,有可能只執行了第一條,第二條卻執行失敗,例如:

1.SETNX執行成功,執行EXPIRE時由於網路問題,執行失敗
2.SETNX執行成功,Redis異常宕機,EXPIRE沒有機會執行
3.SETNX執行成功,客戶端異常崩潰,EXPIRE沒有機會執行

總之這兩條指令如果不能保證是原子操作,就有潛在的風險導致過期時間設定失敗,依舊有可能發生死鎖問題。幸好在Redis 2.6.12之後,Redis擴展了SET指令的參數,可以在SET的同時指定EXPIRE時間,這條操作是原子的,例如以下指令是設定鎖的過期時間為10秒。

SET lock_key 1 EX 10 NX

#至此,解決了死鎖問題,但還是有其他問題。想像下面這個這樣一個場景:

怎麼在SpringBoot中使用Redis實現分散式鎖

  1. 客戶端1加鎖定成功,開始操作共享資源

  2. 客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)

  3. 客户端2加锁成功,开始操作共享资源

  4. 客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。

这里存在两个严重的问题:

  • 锁过期

  • 释放了别人的锁

第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景

第二个问题在于解锁操作是不够严谨的,因为它是一种不加区分地释放锁的操作,没有对锁的所有权进行检查。如何解决呢?

锁被别人给释放了

解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。

//释放锁 比较unique_value是否相等,避免误释放
if redis.get("key") == unique_value then
    return redis.del("key")
登入後複製

这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性问题了。

  1. 客户端1执行GET,判断锁是自己的

  2. 客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)

  3. 客户端1执行DEL,却释放了客户端2的锁

由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成,这样一来GET+DEL之间就不会有其他命令执行了。

以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。

//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
登入後複製

最后我们执行以下命令,即可

redis-cli  --eval  unlock.script lock_key , unique_value
登入後複製

这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:

  1. 加锁时要设置过期时间SET lock_key unique_value EX expire_time NX

  2. 操作共享资源

  3. 释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁

有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。

如何确定锁的过期时间

前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间

Redisson是一个已封装好这些工作的库,可以说是一种非常优秀的解决方案。Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。

怎麼在SpringBoot中使用Redis實現分散式鎖

那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。

Redis的部署方式对锁的影响

上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。

Redis发展到现在,几种常见的部署架构有:

  • 單機模式;

  • 主從模式;

  • 哨兵(sentinel)模式;

  • 叢集模式;

我們使用Redis時,一般會採用主從叢集哨兵的模式部署,哨兵的功能就是監測redis節點的運行狀態。普通的主從模式,當master崩潰時,需要手動切換讓slave成為master,使用主從哨兵結合的好處在於,當master異常宕機時,哨兵可以實現故障自動切換,把slave提升為新的master,繼續提供服務,以此保證可用性。那麼當主從發生切換時,分散式鎖依舊安全嗎?

怎麼在SpringBoot中使用Redis實現分散式鎖

想像這樣的場景:

  1. #客戶端1在master上執行SET指令,加鎖成功

  2. #此時,master異常宕機,SET指令還未同步到slave上(主從複製是異步的)

  3. 哨兵將slave提升為新的master,但這個鎖在新的master上丟失了,導致客戶端2來加鎖成功了,兩個客戶端共同操作共享資源

可見,當引入Redis副本後,分散式鎖還是可能受到影響。即使Redis透過sentinel保證高可用,如果這個master節點因為某些原因發生了主從切換,那麼就會出現鎖丟失的情況。

叢集模式Redlock實作高可靠的分散式鎖定

為了避免Redis實例故障而導致的鎖定無法運作的問題,Redis的開發者Antirez提出了分佈式鎖演算法Redlock。 Redlock演算法的基本思路,是讓客戶端和多個獨立的Redis實例依序請求加鎖,如果客戶端能夠和半數以上的實例成功地完成加鎖操作,那麼我們就認為,客戶端成功地獲得分散式鎖定了,否則加鎖失敗。這樣一來,即使有單一Redis實例發生故障,因為鎖定變數在其它實例上也有保存,所以,客戶端仍然可以正常地進行鎖定操作,鎖定變數並不會遺失。

來具體看下Redlock演算法的執行步驟。 Redlock演算法的實作要求Redis採用叢集部署模式,無哨兵節點,需要有N個獨立的Redis實例(官方建議至少5個實例)。接下來,我們可以分成3步驟來完成加鎖操作。

怎麼在SpringBoot中使用Redis實現分散式鎖

第一步是,客戶端取得目前時間。

第二步是,客戶端依序向N個Redis實例執行加鎖操作。

這裡的加鎖操作和在單一實例上執行的加鎖操作一樣,使用SET指令,帶上NX、EX/PX選項,以及帶上客戶端的唯一識別。當然,如果某個Redis實例發生故障了,為了確保在這種情況下,Redlock演算法能夠繼續運行,我們需要為加鎖操作設定一個逾時時間。如果客戶端在和一個Redis實例請求加鎖時,一直到逾時都沒有成功,那麼此時,客戶端會和下一個Redis實例繼續請求加鎖。一般需要將加鎖操作的超時時間設定為鎖的有效時間的一小部分,通常約為幾十毫秒。

第三步是,一旦客戶端完成了和所有Redis實例的加鎖操作,客戶端就要計算整個加鎖過程的總耗時。

客戶端只有在滿足兩個條件時,才能認為是加鎖成功,條件一是客戶端從超過半數(大於等於N/2 1)的Redis實例上成功獲取到了鎖;條件二是客戶端取得鎖的總耗時沒有超過鎖的有效時間。

為何只有在大多數實例加鎖成功時才能算操作成功?事實上,多個Redis實例一起使用組成了一個分散式系統。在分散式系統中總是會出現異常節點,所以在談論分散式系統時,需要考慮異常節點達到多少個,也依舊不影響整個系統的正確運作。這是一個分散式系統的容錯問題,這個問題的結論是:如果只存在故障節點,只要大多數節點正常,那麼整個系統依舊可以提供正確服務。

在滿足了這兩個條件後,我們需要重新計算這把鎖的有效時間,計算的結果是鎖的最初有效時間減去客戶端為取得鎖的總耗時。如果鎖的有效時間已經來不及完成共享資料的操作了,我們可以釋放鎖,以免出現還沒完成共享資源操作,鎖就過期了的情況

当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁

在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。如果N个Redis实例中超过一半的实例正常工作,就能确保分布式锁正常运作。为了提高分布式锁的可靠性,您可以在实际业务应用中使用Redlock算法。

二、代码实现Redis分布式锁

1.SpringBoot整合redis用到最多的当然属于我们的老朋友RedisTemplate,pom依赖如下:

<!-- springboot整合redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
登入後複製

2.Redis配置类:

package com.example.redisdemo.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @description: Redis配置类
 * @author Keson
 * @date 21:20 2022/11/14
 * @Param
 * @return
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 设置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
登入後複製

3.Service层面

package com.example.redisdemo.service;

import com.example.redisdemo.entity.CustomerBalance;
import java.util.concurrent.Callable;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO
 * @date 2022/11/14 15:12
 */
public interface RedisService {

    <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception;
}
登入後複製
package com.example.redisdemo.service.impl;

import com.example.redisdemo.entity.CustomerBalance;
import com.example.redisdemo.service.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author Keson
 * @version 1.0
 * @description: TODO Redis实现分布式锁
 * @date 2022/11/14 15:13
 */
@Service
@Slf4j
public class RedisServiceImpl implements RedisService {

    //设置默认过期时间
    private final static int DEFAULT_LOCK_EXPIRY_TIME = 20;
    //自定义lock key前缀
    private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE";

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{
        //自定义lock key
        String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode());
        //将UUID当做value,确保唯一性
        String lockReference = UUID.randomUUID().toString();

        try {
            if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) {
                throw new Exception("lock加锁失败");
            }
            return callable.call();
        } finally {
            unlock(lockKey, lockReference);
        }
    }

    //定义lock key
    String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) {
        return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode);
    }

    //redis加锁
    private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) {
        Boolean locked;
        try {
            //SET_IF_ABSENT --> NX: Only set the key if it does not already exist.
            //SET_IF_PRESENT --> XX: Only set the key if it already exist.
            locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8),
                            Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT));
        } catch (Exception e) {
            log.error("Lock failed for redis key: {}, value: {}", key, value);
            locked = false;
        }
        return locked != null && locked;
    }

    //redis解锁
    private boolean unlock(String key, String value) {
        try {
            //使用lua脚本保证删除的原子性,确保解锁
            String script = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] " +
                            "then return redis.call(&#39;del&#39;, KEYS[1]) " +
                            "else return 0 end";
            Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
                    connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1,
                            key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8)));
            return unlockState == null || !unlockState;
        } catch (Exception e) {
            log.error("unLock failed for redis key: {}, value: {}", key, value);
            return false;
        }
    }
}
登入後複製

4.业务调用实现分布式锁示例:

    @Override
    public int updateById(CustomerBalance customerBalance) throws Exception {
        return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance));
    }
登入後複製

以上是怎麼在SpringBoot中使用Redis實現分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:yisu.com
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板