首頁 > 資料庫 > Redis > 主體

使用Redis實現一個安全可靠的分散式鎖

青灯夜游
發布: 2021-04-19 10:20:32
轉載
2022 人瀏覽過

這篇文章跟大家介紹如何使用Redis實現一個安全可靠的分散式鎖,說明分散式鎖實作的主要要素,常見誤解。有一定的參考價值,有需要的朋友可以參考一下,希望對大家有幫助。

使用Redis實現一個安全可靠的分散式鎖

並發場景下多個行程或執行緒共享資源的讀寫,需要保證對資源的存取互斥。在單機系統中,我們可以使用Java並發包中的API、synchronized關鍵字等方式來解決;但是在分散式系統下,這些方式不再適用,我們需要自己實作分散式鎖定。

常見的分散式鎖定的實作方案有:基於資料庫、基於Redis、基於Zookeeper等。作為Redis專題的一部分,本文將基於Redis聊一聊分散式鎖的實作方案。 【相關推薦:Redis影片教學

#分析與實作


問題分析

分散式鎖定與JVM內建的鎖定有著共同的目的:讓應用程式以預期的順序存取或操作共享的資源,防止多個執行緒同時對相同資源操作,導致系統運作紊亂、不可控。常用於商品庫存扣減、優惠券扣減等場景。

理論上來講,為了確保鎖的安全性和有效性,分散式鎖至少需要滿足以下條件:

  • 互斥性:在同一時間內,僅有一個執行緒能夠獲得鎖定;
  • 無死鎖:執行緒取得鎖定後,必須保證能夠釋放,即使執行緒取得鎖定後應用程式宕機,也能在限定時間內釋放;
  • 加鎖與解鎖必須是同一個執行緒;

在實作方式上,分散式鎖大體分為三個步驟:

  • a-取得資源的操作權;
  • b-對資源執行操作;
  • c-釋放資源的操作權;

無論是Java內建的鎖,或是分散式鎖,也無論使用哪一種分散式實作方案,都是圍繞a、c兩個步驟。 Redis對於實現分散式鎖定天然友好,原因如下:

  • 命令處理階段Redis使用單執行緒處理,同一個key同時只有一個執行緒能夠處理,沒有多執行緒競態問題。
  • SET key value NX PX milliseconds指令在不存在key的情況下新增具有過期時間的key,為安全性加鎖提供支援。
  • Lua腳本和DEL指令為安全解鎖提供可靠支撐。

程式碼實作

  • Maven依賴
#
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  	<version>${your-spring-boot-version}</version>
</dependency>
登入後複製
  • 設定檔

在application.properties增加以下內容,單機版Redis實例。

spring.redis.database=0
spring.redis.host=localhost
spring.redis.port=6379
登入後複製
  • RedisConfig
@Configuration
public class RedisConfig {

    // 自己定义了一个 RedisTemplate
    @Bean
    @SuppressWarnings("all")
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory)
        throws UnknownHostException {
        // 我们为了自己开发方便,一般直接使用 <String, Object>
        RedisTemplate<String, Object> template = new RedisTemplate<String,
            Object>();
        template.setConnectionFactory(factory);
        // Json序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // String 的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}
登入後複製
  • #RedisLock
@Service
public class RedisLock {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 加锁,最多等待maxWait毫秒
     *
     * @param lockKey   锁定key
     * @param lockValue 锁定value
     * @param timeout   锁定时长(毫秒)
     * @param maxWait   加锁等待时间(毫秒)
     * @return true-成功,false-失败
     */
    public boolean tryAcquire(String lockKey, String lockValue, int timeout, long maxWait) {
        long start = System.currentTimeMillis();

        while (true) {
            // 尝试加锁
            Boolean ret = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, timeout, TimeUnit.MILLISECONDS);
            if (!ObjectUtils.isEmpty(ret) && ret) {
                return true;
            }

            // 计算已经等待的时间
            long now = System.currentTimeMillis();
            if (now - start > maxWait) {
                return false;
            }

            try {
                Thread.sleep(200);
            } catch (Exception ex) {
                return false;
            }
        }
    }

    /**
     * 释放锁
     *
     * @param lockKey   锁定key
     * @param lockValue 锁定value
     * @return true-成功,false-失败
     */
    public boolean releaseLock(String lockKey, String lockValue) {
        // lua脚本
        String script = "if redis.call(&#39;get&#39;,KEYS[1]) == ARGV[1] then return redis.call(&#39;del&#39;,KEYS[1]) else return 0 end";

        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = redisTemplate.opsForValue().getOperations().execute(redisScript, Collections.singletonList(lockKey), lockValue);
        return result != null && result > 0L;
    }
}
登入後複製
  • 測試案例
#
@SpringBootTest
class RedisDistLockDemoApplicationTests {

    @Resource
    private RedisLock redisLock;

    @Test
    public void testLock() {
        redisLock.tryAcquire("abcd", "abcd", 5 * 60 * 1000, 5 * 1000);
        redisLock.releaseLock("abcd", "abcd");
    }
}
登入後複製

安全隱憂

可能很多同學(也包括我)在日常工作中都是使用上面的實現方式,看似是穩健的:

  • 使用set指令NXPX選項加鎖,保證了加鎖互斥,避免了死鎖;
  • 使用lua腳本解鎖,防止解除其他執行緒的鎖;
  • 加鎖、解鎖指令都是原子操作;

#其實以上實現的穩健有個前提條件:單機版Redis、開啟AOF持久化方式並設定appendfsync=always

但是在哨兵模式和叢集模式下可能有問題,為什麼呢?

哨兵模式和叢集模式基於主從架構,主從之間透過命令傳播實現資料同步,而命令傳播是非同步的。

所以就存在主節點資料寫入成功,在還未通知從節點情況下,主節點就宕機的可能。

當從節點透過故障轉移提升為新的主節點後,其他執行緒就有機會重新加鎖成功,導致不滿足分散式鎖定的互斥條件。

官方RedLock


叢集模式下,若叢集所有節點穩定運行,不發生故障轉移的情況下,安全性是有保障的。但是,沒有任何系統能夠保證100%穩定,基於Redis的分散式鎖必須考慮容錯。

由於主從同步基於非同步複製原理,所以哨兵模式和叢集模式天生無法滿足此條件。為此,Redis作者特別提出了一個解決方案-RedLock(Redis Distribute Lock)。

設計想法

根據官方文件的說明,把RedLock的設計思路介紹。

先說環境需求,需要N(N>=3)獨立部署的Redis實例,彼此之間不需要主從複製、故障轉移等技術。

為了取得鎖,用戶端將依照下列流程進行:

  • 获取当前时间(毫秒)作为开始时间start;
  • 使用相同的key和随机value,按顺序向所有N个节点发起获取锁的请求。当向每个实例设置锁时,客户端会使用一个过期时间(小于锁的自动释放时间)。比如锁的自动释放时间是10秒,这个超时时间应该是5-50毫秒。这是为了防止客户端在一个已经宕机的实例浪费太多时间:如果Redis实例宕机,客户端尽快处理下一个实例。
  • 客户端计算加锁消耗的时间cost(cost=start-now)。只有客户端在半数以上实例加锁成功,并且整个耗时小于整个有效时间(ttl),才能认为当前客户端加锁成功。
  • 如果客户端加锁成功,那么整个锁的真正有效时间应该是:validTime=ttl-cost。
  • 如果客户端加锁失败(可能是获取锁成功实例数未过半,也可能是耗时超过ttl),那么客户端应该向所有实例尝试解锁(即使刚刚客户端认为加锁失败)。

RedLock的设计思路延续了Redis内部多种场景的投票方案,通过多个实例分别加锁解决竞态问题,虽然加锁消耗了时间,但是消除了主从机制下的安全问题。

代码实现

官方推荐Java实现为Redisson,它具备可重入特性,按照RedLock进行实现,支持独立实例模式、集群模式、主从模式、哨兵模式等;API比较简单,上手容易。示例如下(直接通过测试用例):

    @Test
    public void testRedLock() throws InterruptedException {

        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        final RedissonClient client = Redisson.create(config);

        // 获取锁实例
        final RLock lock = client.getLock("test-lock");

        // 加锁
        lock.lock(60 * 1000, TimeUnit.MILLISECONDS);
        try {
            // 假装做些什么事情
            Thread.sleep(50 * 1000);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //解锁
            lock.unlock();
        }
    }
登入後複製

Redisson封装的非常好,我们可以像使用Java内置的锁一样去使用,代码简洁的不能再少了。关于Redisson源码的分析,网上有很多文章大家可以找找看。

全文总结


分布式锁是我们研发过程中常用的的一种解决并发问题的方式,Redis是只是一种实现方式。

关键的是要弄清楚加锁、解锁背后的原理,以及实现分布式锁需要解决的核心问题,同时考虑我们所采用的中间件有什么特性可以支撑。了解这些后,实现起来就不是什么问题了。

学习了RedLock的思想,我们是不是也可以在自己的应用程序内实现了分布式锁呢?欢迎沟通!

更多编程相关知识,请访问:编程入门!!

以上是使用Redis實現一個安全可靠的分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:juejin.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!