다음 칼럼에서는 Redis Tutorial 칼럼에서 다룬 Redis 분산 잠금 세 가지 유형을 비교하여 소개하겠습니다. 도움이 필요한 친구들에게 도움이 되길 바랍니다!
우리가 일반적으로 사용하는 동기화 또는 잠금은 스레드 잠금으로, 동일한 JVM 프로세스의 여러 스레드에 효과적입니다. 잠금의 본질은 메모리에 표시를 저장하고 잠금을 획득한 스레드가 누구인지 기록하는 것이므로 이 표시는 모든 스레드에 표시됩니다. 그러나 우리가 시작한 다중 주문 서비스는 메모리의 잠금이 분명히 공유되지 않습니다. 각 JVM 프로세스는 당연히 스레드의 상호 배제를 보장할 수 없습니다. 잠그다. 일반적으로 사용되는 세 가지 솔루션이 있습니다. 1. 데이터베이스 기반 구현 2. Zookeeper 기반 임시 직렬화 노드 구현 3. redis 구현. 이번 포스팅에서는 Redis 구현에 대해 소개하겠습니다.
분산 잠금을 구현하려면 세 가지 사항을 충족해야 합니다. 즉, 여러 프로세스에 표시되고, 상호 배타적이며, 재진입이 가능합니다.
1) 여러 프로세스 표시
Redis 자체는 JVM 외부에 기반을 두고 있으므로 다중 프로세스 가시성 요구 사항을 충족합니다.
2) 상호 배제
즉, redis의 setnx를 통해 동시에 하나의 프로세스만 얻을 수 있으며, 첫 번째 실행만 성공하고 1을 반환합니다. 다른 경우에는 0을 반환합니다.
잠금 해제
잠금을 해제하려면 잠금의 키를 삭제하고 del xxx 명령을 사용하면 됩니다. 그러나 del을 실행하기 전에 서비스가 갑자기 다운되면 잠금은 절대 삭제되지 않습니다. 따라서 setex 명령을 통해 만료 시간을 설정할 수 있습니다.
import java.util.UUID;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/** * 第一种分布式锁 */@Componentpublic class RedisService {private final Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired JedisPool jedisPool; // 获取锁之前的超时时间(获取锁的等待重试时间) private long acquireTimeout = 5000; // 获取锁之后的超时时间(防止死锁) private int timeOut = 10000; /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String val) { Jedis jedis = null; try { jedis = jedisPool.getResource(); // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { // 3. 获取锁成功就设置过期时间 if (jedis.setnx(lockName, val) == 1) { jedis.expire(lockName, timeOut/1000); return true; } } } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } return false; } /** * 释放分布式锁 * @param lockName 锁名称 */ public void unRedisLock(String lockName) { Jedis jedis = null; try { jedis = jedisPool.getResource(); // 释放锁 jedis.del(lockName); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } }// =============================================== public String get(String key) { Jedis jedis = null; String value = null; try { jedis = jedisPool.getResource(); value = jedis.get(key); log.info(value); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } return value; } public void set(String key, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); } catch (Exception e) { log.error(e.getMessage()); } finally { returnResource(jedis); } } /** * 关闭连接 */ public void returnResource(Jedis jedis) { try { if(jedis!=null) jedis.close(); } catch (Exception e) { } } }
위의 분산 잠금이 구현되었지만 현재 발생할 수 있는 다른 두 가지 문제가 있습니다.
하나: 잠금을 획득할 때
Setnx가 잠금을 성공적으로 획득했지만, 그 전에 setex 서비스가 다운되었습니다. 이 비원자적인 성행위, 교착상태가 다시 발생했습니다. 실제로 redis는 nx 및 ex 명령을 제공합니다.
2: 잠금을 해제할 때
1. 세 프로세스 A, B, C가 작업을 실행하고 잠금을 놓고 경쟁하고 있습니다. 이때 A는 잠금을 획득하고 자동 만료 시간을 10초로 설정합니다
2. A가 업무를 실행하는데 어떤 이유에서인지 업무가 차단되어 10초 이상 걸렸는데 이때 잠금이 자동으로 해제되었습니다
3. B는 이때 잠금이 자동으로 걸려있어서 잠금을 획득하려고 시작했습니다. 4. A 이 업무가 실행되면 잠금 해제 로직(키 삭제)이 실행되어 B의 잠금이 해제되고 B는 실제로 계속 업무를 수행하고 있는 것입니다
5. 이때 이때 프로세스 C는 잠금을 획득하려고 시도하고 성공합니다. A가 B의 잠금을 삭제했기 때문입니다.
문제 발생: B와 C가 동시에 잠금을 획득하여 상호 배타성을 위반했습니다! 이 문제를 해결하는 방법? 잠금을 삭제하기 전에 해당 잠금이 우리가 직접 설정한 잠금인지 확인해야 합니다. 그렇지 않은 경우(예를 들어 시간이 지남에 따라 자체 잠금이 해제된 경우) 삭제하지 마세요. 따라서 잠금을 설정할 때 현재 스레드의 고유 식별자를 저장할 수 있습니다! 자물쇠를 삭제하기 전, 안에 있는 값이 본인의 식별 해제와 일치하는지 확인하세요. 일치하지 않는 경우에는 본인의 자물쇠가 아니라는 의미이므로 삭제하지 마세요.
/** * 第二种分布式锁 */public class RedisTool { private static final String LOCK_SUCCESS = "OK"; private static final Long RELEASE_SUCCESS = 1L; /** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { if (jedis.get(lockKey).equals(requestId)) { System.out.println("释放锁..." + Thread.currentThread().getName() + ",identifierValue:" + requestId); jedis.del(lockKey); return true; } return false; } }
3)재입장 잠금:
也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。换一种说法:同一个线程再次进入同步代码时,可以使用自己已获取到的锁。可重入锁可以避免因同一线程中多次获取锁而导致死锁发生。像synchronized就是一个重入锁,它是通过moniter函数记录当前线程信息来实现的。实现可重入锁需要考虑两点:
获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取, 而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果锁进入了多次,在内层直接删除锁, 导致外部的业务在没有锁的情况下执行,会有安全问题。因此必须获取锁时累计重入的次数,释放时则减去重入次数,如果减到0,则可以删除锁。
下面我们假设锁的key为“ lock ”,hashKey是当前线程的id:“ threadId ”,锁自动释放时间假设为20 获取锁的步骤: 1、判断lock是否存在 EXISTS lock 2、不存在,则自己获取锁,记录重入层数为1. 2、存在,说明有人获取锁了,下面判断是不是自己的锁,即判断当前线程id作为hashKey是否存在:HEXISTS lock threadId 3、不存在,说明锁已经有了,且不是自己获取的,锁获取失败. 3、存在,说明是自己获取的锁,重入次数+1: HINCRBY lock threadId 1 ,最后更新锁自动释放时间, EXPIRE lock 20 释放锁的步骤: 1、判断当前线程id作为hashKey是否存在: HEXISTS lock threadId 2、不存在,说明锁已经失效,不用管了 2、存在,说明锁还在,重入次数减1: HINCRBY lock threadId -1 , 3、获取新的重入次数,判断重入次数是否为0,为0说明锁全部释放,删除key: DEL lock
因此,存储在锁中的信息就必须包含:key、线程标识、重入次数。不能再使用简单的key-value结构, 这里推荐使用hash结构。
获取锁的脚本(注释删掉,不然运行报错)
local key = KEYS[1]; -- 第1个参数,锁的keylocal threadId = ARGV[1]; -- 第2个参数,线程唯一标识local releaseTime = ARGV[2]; -- 第3个参数,锁的自动释放时间if(redis.call('exists', key) == 0) then -- 判断锁是否已存在 redis.call('hset', key, threadId, '1'); -- 不存在, 则获取锁 redis.call('expire', key, releaseTime); -- 设置有效期 return 1; -- 返回结果end;if(redis.call('hexists', key, threadId) == 1) then -- 锁已经存在,判断threadId是否是自己 redis.call('hincrby', key, threadId, '1'); -- 如果是自己,则重入次数+1 redis.call('expire', key, releaseTime); -- 设置有效期 return 1; -- 返回结果end;return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的脚本(注释删掉,不然运行报错)
local key = KEYS[1]; -- 第1个参数,锁的keylocal threadId = ARGV[1]; -- 第2个参数,线程唯一标识if (redis.call('HEXISTS', key, threadId) == 0) then -- 判断当前锁是否还是被自己持有 return nil; -- 如果已经不是自己,则直接返回end;local count = redis.call('HINCRBY', key, threadId, -1); -- 是自己的锁,则重入次数-1if (count == 0) then -- 判断是否重入次数是否已经为0 redis.call('DEL', key); -- 等于0说明可以释放锁,直接删除 return nil; end;
完整代码
import java.util.Collections;import java.util.UUID;import org.springframework.core.io.ClassPathResource;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.scripting.support.ResourceScriptSource;/** * Redis可重入锁 */public class RedisLock { private static final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class); private static final DefaultRedisScript<Long> LOCK_SCRIPT; private static final DefaultRedisScript<Object> UNLOCK_SCRIPT; static { // 加载释放锁的脚本 LOCK_SCRIPT = new DefaultRedisScript<>(); LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua"))); LOCK_SCRIPT.setResultType(Long.class); // 加载释放锁的脚本 UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua"))); } /** * 获取锁 * @param lockName 锁名称 * @param releaseTime 超时时间(单位:秒) * @return key 解锁标识 */ public static String tryLock(String lockName,String releaseTime) { // 存入的线程信息的前缀,防止与其它JVM中线程信息冲突 String key = UUID.randomUUID().toString(); // 执行脚本 Long result = redisTemplate.execute( LOCK_SCRIPT, Collections.singletonList(lockName), key + Thread.currentThread().getId(), releaseTime); // 判断结果 if(result != null && result.intValue() == 1) { return key; }else { return null; } } /** * 释放锁 * @param lockName 锁名称 * @param key 解锁标识 */ public static void unlock(String lockName,String key) { // 执行脚本 redisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(lockName), key + Thread.currentThread().getId(), null); } }
至此,一个比较完善的redis锁就开发完成了。
위 내용은 세 가지 유형의 Redis 분산 잠금 비교의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!