下面由Redis教學專欄來介紹3種Redis分散式鎖定的對比,希望對需要的朋友有幫助!
我們通常使用的synchronized或Lock都是執行緒鎖,對同一個JVM進程內的多個執行緒有效。因為鎖的本質 是記憶體存放一個標記,記錄取得鎖的執行緒是誰,這個標記對每個執行緒都可見。然而我們啟動的多個訂單服務,就是多個JVM,記憶體中的鎖顯然是不共享的,每個JVM進程都有自己的鎖,自然無法保證執行緒的互斥了,這個時候我們就需要使用到分散式鎖了。常用的有三種解決方案:1.基於資料庫實作 2.基於zookeeper的臨時序列化節點實作 3.redis實作。本文我們介紹的就是redis的實作方式。
實現分散式鎖定要滿足3點:多進程可見,互斥,可重入。
1) 多進程可見
redis本身就是基於JVM之外的,因此滿足多進程可見的要求。
2) 互斥
# 即同一時間只能有一個行程取得鎖定標記,我們可以透過redis的setnx實現,只有第一次執行的才會成功並返回1,其它情況返回0。
釋放鎖定
釋放鎖定其實只需要把鎖的key刪除即可,使用del xxx指令。不過,如果在我們執行del之前,服務突然宕機,那麼鎖就永遠無法刪除了。所以我們可以透過setex 指令設定過期時間即可。
import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/** * 第一种分布式锁 */@Componentpublic class RedisService {private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired JedisPool jedisPool; // 获取锁之前的超时时间(获取锁的等待重试时间) private long acquireTimeout = 5000; // 获取锁之后的超时时间(防止死锁) private int timeOut = 10000; /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String val) { Jedis jedis = null; try { jedis = jedisPool.getResource(); // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { // 3. 获取锁成功就设置过期时间 if (jedis.setnx(lockName, val) == 1) { jedis.expire(lockName, timeOut/1000); return true; } } } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } return false; } /** * 释放分布式锁 * @param lockName 锁名称 */ public void unRedisLock(String lockName) { Jedis jedis = null; try { jedis = jedisPool.getResource(); // 释放锁 jedis.del(lockName); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } }// =============================================== public String get(String key) { Jedis jedis = null; String value = null; try { jedis = jedisPool.getResource(); value = jedis.get(key); log.info(value); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } return value; } public void set(String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } } /** * 关闭连接 */ public void returnResource(Jedis jedis) { try { if(jedis!=null) jedis.close(); } catch (Exception e) { } } }
上面的分散式鎖定實現了,但是這時候還可能出現另外2個問題:
一:取得鎖定時
setnx取得鎖定成功了,還沒來得及setex服務就宕機了,由於這種非原子性的操作,死鎖又發生了。其實redis提供了 nx 與 ex連用的指令。
二:釋放鎖時
1. 3個進程:A和B和C,在執行任務,並爭搶鎖,此時A獲取了鎖,並設置自動過期時間為10s
2. A開始執行業務,因為某種原因,業務阻塞,耗時超過了10秒,此時鎖自動釋放了
3. B恰好此時開始嘗試獲取鎖,因為鎖已經自動釋放,成功取得鎖定
4. A此時業務執行完畢,執行釋放鎖定邏輯(刪除key),於是B的鎖被釋放了,而B其實還在執行業務
5.此時進程C嘗試取得鎖,也成功了,因為A把B的鎖刪除了。
問題出現了:B和C同時取得了鎖,違反了互斥性!如何解決這個問題呢?我們應該在刪除鎖之前,判斷這個鎖是否是自己設定的鎖,如果不是(例如自己 的鎖已經超時釋放),那麼就不要刪除了。所以我們可以在set 鎖時,存入當前執行緒的唯一識別!刪除鎖前,判斷下裡面的值是不是與自己標識釋放一 致,如果不一致,表示不是自己的鎖,就不要刪除了。
/** * 第二种分布式锁 */public class RedisTool { private static final String LOCK_SUCCESS = "OK"; private static final Long RELEASE_SUCCESS = 1L; /** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { if (jedis.get(lockKey).equals(requestId)) { System.out.println("释放锁..." + Thread.currentThread().getName() + ",identifierValue:" + requestId); jedis.del(lockKey); return true; } return false; } }
依照上面方式實現分散式鎖定之後,就可以輕鬆解決大部分問題了。網路上很多部落格也都是這麼實現的,但是仍然有些場景是不滿足的,例如一個方法獲取到鎖之後,可能在方法內調這個方法此時就獲取不到鎖了。這時候我們就需要把鎖改進成可重入式鎖上了。
3) 重入鎖定:
##################################################################################### 也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。换一种说法:同一个线程再次进入同步代码时,可以使用自己已获取到的锁。可重入锁可以避免因同一线程中多次获取锁而导致死锁发生。像synchronized就是一个重入锁,它是通过moniter函数记录当前线程信息来实现的。实现可重入锁需要考虑两点:
获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取, 而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果锁进入了多次,在内层直接删除锁, 导致外部的业务在没有锁的情况下执行,会有安全问题。因此必须获取锁时累计重入的次数,释放时则减去重入次数,如果减到0,则可以删除锁。
下面我们假设锁的key为“ lock ”,hashKey是当前线程的id:“ threadId ”,锁自动释放时间假设为20 获取锁的步骤: 1、判断lock是否存在 EXISTS lock 2、不存在,则自己获取锁,记录重入层数为1. 2、存在,说明有人获取锁了,下面判断是不是自己的锁,即判断当前线程id作为hashKey是否存在:HEXISTS lock threadId 3、不存在,说明锁已经有了,且不是自己获取的,锁获取失败. 3、存在,说明是自己获取的锁,重入次数+1: HINCRBY lock threadId 1 ,最后更新锁自动释放时间, EXPIRE lock 20 释放锁的步骤: 1、判断当前线程id作为hashKey是否存在: HEXISTS lock threadId 2、不存在,说明锁已经失效,不用管了 2、存在,说明锁还在,重入次数减1: HINCRBY lock threadId -1 , 3、获取新的重入次数,判断重入次数是否为0,为0说明锁全部释放,删除key: DEL lock
因此,存储在锁中的信息就必须包含:key、线程标识、重入次数。不能再使用简单的key-value结构, 这里推荐使用hash结构。
获取锁的脚本(注释删掉,不然运行报错)
local key = KEYS[1]; -- 第1个参数,锁的keylocal threadId = ARGV[1]; -- 第2个参数,线程唯一标识local releaseTime = ARGV[2]; -- 第3个参数,锁的自动释放时间if(redis.call('exists', key) == 0) then -- 判断锁是否已存在 redis.call('hset', key, threadId, '1'); -- 不存在, 则获取锁 redis.call('expire', key, releaseTime); -- 设置有效期 return 1; -- 返回结果end;if(redis.call('hexists', key, threadId) == 1) then -- 锁已经存在,判断threadId是否是自己 redis.call('hincrby', key, threadId, '1'); -- 如果是自己,则重入次数+1 redis.call('expire', key, releaseTime); -- 设置有效期 return 1; -- 返回结果end;return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的脚本(注释删掉,不然运行报错)
local key = KEYS[1]; -- 第1个参数,锁的keylocal threadId = ARGV[1]; -- 第2个参数,线程唯一标识if (redis.call('HEXISTS', key, threadId) == 0) then -- 判断当前锁是否还是被自己持有 return nil; -- 如果已经不是自己,则直接返回end;local count = redis.call('HINCRBY', key, threadId, -1); -- 是自己的锁,则重入次数-1if (count == 0) then -- 判断是否重入次数是否已经为0 redis.call('DEL', key); -- 等于0说明可以释放锁,直接删除 return nil; end;
完整代码
import java.util.Collections;import java.util.UUID;import org.springframework.core.io.ClassPathResource;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.scripting.support.ResourceScriptSource;/** * Redis可重入锁 */public class RedisLock { private static final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class); private static final DefaultRedisScript<Long> LOCK_SCRIPT; private static final DefaultRedisScript<Object> UNLOCK_SCRIPT; static { // 加载释放锁的脚本 LOCK_SCRIPT = new DefaultRedisScript<>(); LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua"))); LOCK_SCRIPT.setResultType(Long.class); // 加载释放锁的脚本 UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua"))); } /** * 获取锁 * @param lockName 锁名称 * @param releaseTime 超时时间(单位:秒) * @return key 解锁标识 */ public static String tryLock(String lockName,String releaseTime) { // 存入的线程信息的前缀,防止与其它JVM中线程信息冲突 String key = UUID.randomUUID().toString(); // 执行脚本 Long result = redisTemplate.execute( LOCK_SCRIPT, Collections.singletonList(lockName), key + Thread.currentThread().getId(), releaseTime); // 判断结果 if(result != null && result.intValue() == 1) { return key; }else { return null; } } /** * 释放锁 * @param lockName 锁名称 * @param key 解锁标识 */ public static void unlock(String lockName,String key) { // 执行脚本 redisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(lockName), key + Thread.currentThread().getId(), null); } }
至此,一个比较完善的redis锁就开发完成了。
以上是關於3種Redis分散式鎖的對比的詳細內容。更多資訊請關注PHP中文網其他相關文章!