다중 스레드 환경에서 동시에 하나의 스레드에서만 코드 블록에 액세스할 수 있도록 하기 위해 Java에서는 일반적으로 동기화된 구문과 ReetrantLock을 사용하여 이것이 실제로 이루어지도록 할 수 있습니다. 로컬 잠금 방법. 그러나 이제 기업은 분산 아키텍처를 채택하고 있습니다. 분산 환경에서 서로 다른 노드의 스레드가 동시에 실행되도록 하려면 어떻게 해야 할까요? 따라서 분산 시스템 간의 공유 리소스에 대한 상호 배타적인 액세스를 제어하는 방법인 분산 잠금이 도입되었습니다. 분산 시스템에서는 여러 서비스가 여러 컴퓨터에 배포됩니다. 클라이언트의 사용자가 데이터 삽입 요청을 시작할 때 분산 잠금 메커니즘이 보장되지 않으면 해당 여러 컴퓨터의 여러 서비스가 동시 삽입 작업으로 인해 반복될 수 있습니다. 중복 데이터를 허용하지 않는 일부 기업에 문제를 일으킬 수 있는 데이터 삽입. 분산 잠금 메커니즘은 이와 같은 문제를 해결하고 여러 서비스 간의 공유 리소스에 대한 상호 배타적 액세스를 보장하기 위한 것입니다. 한 서비스가 분산 잠금을 확보하고 다른 서비스가 잠금을 획득하지 못하면 후속 작업이 수행되지 않습니다. 일반적인 의미는 아래 그림과 같습니다.
분산 잠금의 특징
setnx+lua
(redis는 Lua 스크립트를 실행할 때 다른 작업이 수행되지 않도록 보장하여 작업의 원자성을 보장합니다) 또는 set를 알고 있습니다. 키 값 px 밀리초 nx
code>. 후자 방식의 핵심 구현 명령어는 다음과 같습니다. - 获取锁(unique_value可以是UUID等) SET resource_name unique_value NX PX 30000 - 释放锁(lua脚本中,一定要比较value,防止误解锁) if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
이 구현 방식에는 크게 3가지 포인트가 있습니다. (이 역시 면접 확률이 매우 높다는 점이기도 합니다.) setnx+lua
(redis保证执行lua脚本时不执行其他操作,保证操作的原子性),或者知道set key value px milliseconds nx
。后一种方式的核心实现命令如下:
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
这种实现方式有3大要点(也是面试概率非常高的地方):
잠금을 해제할 때 값을 확인해야 하며 잠금을 오해할 수 없습니다.
set命令要用
set key value px milliseconds nx
set 명령어는
set 키 값을 사용합니다. px milliseconds nx< /code>;
값은 고유해야 합니다.
- 사실 가장 큰 단점은 이러한 유형의 잠금은 Redis 노드에서 잠겨 있을 때만 작동한다는 것입니다. Redis가 Sentinel을 통해 고가용성을 보장하더라도 마스터 노드가 어떤 이유로 마스터-슬레이브로 전환하면 잠금이 손실됩니다.
Redis 마스터 노드 잠금
그러나 잠긴 키가 슬레이브 노드에 동기화되지 않았습니다.
마스터 오류가 발생하고 슬레이브 노드가 마스터 노드로 업그레이드되었습니다. 자물쇠가 분실되었습니다.
Single Point of Failure 문제를 피하기 위해 Redis 작성자 antirez는 분산 환경을 기반으로 한 보다 발전된 분산 잠금 구현 방법인 Redlock을 제안했습니다. Redlock은 Redis의 모든 분산 잠금 구현 중에서 면접관을 최고조로 만들 수 있는 유일한 방법이기도 합니다.
antirez의 redlock 알고리즘은 대략 다음과 같습니다.
Redis 분산 환경에서는 N개의 Redis 마스터가 있다고 가정합니다. 이러한 노드는 서로 완전히 독립적이며 마스터-슬레이브 복제 또는 기타 클러스터 조정 메커니즘이 없습니다. Redis 단일 인스턴스에서와 동일한 방법을 사용하여 N 인스턴스에 대한 잠금을 획득하고 해제할 것입니다. 이제 우리는 5개의 Redis 마스터 노드가 있다고 가정하고, 이 Redis 인스턴스가 동시에 다운되지 않도록 5개의 서버에서 이러한 Redis 인스턴스를 실행해야 합니다. 잠금을 얻으려면 클라이언트는 다음 작업을 수행해야 합니다.
(예: UUID)을 사용하여 5개 인스턴스에서 순차적으로 잠금을 획득해 보세요. Redis에서 잠금을 요청할 때 클라이언트는 네트워크 연결 및 응답 시간 초과를 설정해야 하며, 이는 잠금 만료 시간보다 작아야 합니다. 예를 들어 잠금 자동 만료 시간 TTL이 10초인 경우 제한 시간은 5~50밀리초 사이여야 합니다. 이렇게 하면 서버 측 Redis가 중단되고 클라이언트가 여전히 응답 결과를 기다리고 있는 상황을 피할 수 있습니다. 서버가 지정된 시간 내에 응답하지 않으면 클라이언트는 가능한 한 빨리 다른 Redis 인스턴스에서 잠금을 얻으려고 시도해야 합니다.
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
此处不讨论时钟漂移
redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389") .setMasterName("masterName") .setPassword("password").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); // 还可以getFairLock(), getReadWriteLock() RLock redLock = redissonClient.getLock("REDLOCK_KEY"); boolean isLock; try { isLock = redLock.tryLock(); // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //TODO if get lock success, do something; } } catch (Exception e) { } finally { // 无论如何, 最后都要解锁 redLock.unlock(); }
实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID(); String getLockName(long threadId) { return id + ":" + threadId; }
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 获取锁时向5个redis实例发送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 获取分布式锁的KEY的失效时间毫秒数 "return redis.call('pttl', KEYS[1]);", // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2] Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
获取锁的命令中,
KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;
ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;
ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 向5个redis实例都执行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分布式锁KEY不存在,那么向channel发布一条消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 如果就是当前线程占有分布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。
自定义一个注解,被注解的方法会执行获取分布式锁的逻辑
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RedisLock { /** * 业务键 * * @return */ String key(); /** * 锁的过期秒数,默认是5秒 * * @return */ int expire() default 5; /** * 尝试加锁,最多等待时间 * * @return */ long waitTime() default Long.MIN_VALUE; /** * 锁的超时时间单位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:
@Aspect @Component public class LockMethodAspect { @Autowired private RedisLockHelper redisLockHelper; @Autowired private JedisUtil jedisUtil; private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class); @Around("@annotation(com.redis.lock.annotation.RedisLock)") public Object around(ProceedingJoinPoint joinPoint) { Jedis jedis = jedisUtil.getJedis(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); RedisLock redisLock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); String key = redisLock.key(); try { final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit()); logger.info("isLock : {}",islock); if (!islock) { logger.error("获取锁失败"); throw new RuntimeException("获取锁失败"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系统异常"); } } finally { logger.info("释放锁"); redisLockHelper.unlock(jedis,key, value); jedis.close(); } } }
@Component public class RedisLockHelper { private long sleepTime = 100; /** * 直接使用setnx + expire方式获取分布式锁 * 非原子性 * * @param key * @param value * @param timeout * @return */ public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) { Long result = jedis.setnx(key, value); // result = 1时,设置成功,否则设置失败 if (result == 1L) { return jedis.expire(key, timeout) == 1L; } else { return false; } } /** * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */ public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判断是否成功 return result.equals(1L); } /** * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */ public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) { long seconds = timeUnit.toSeconds(timeout); return "OK".equals(jedis.set(key, value, "NX", "EX", seconds)); } /** * 自定义获取锁的超时时间 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */ public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException { long seconds = timeUnit.toSeconds(timeout); while (waitTime >= 0) { String result = jedis.set(key, value, "nx", "ex", seconds); if ("OK".equals(result)) { return true; } waitTime -= sleepTime; Thread.sleep(sleepTime); } return false; } /** * 错误的解锁方法—直接删除key * * @param key */ public void unlock_with_del(Jedis jedis,String key) { jedis.del(key); } /** * 使用Lua脚本进行解锁操纵,解锁的时候验证value值 * * @param jedis * @param key * @param value * @return */ public boolean unlock(Jedis jedis,String key,String value) { String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); } }
定义一个TestController来测试我们实现的分布式锁
@RestController public class TestController { @RedisLock(key = "redis_lock") @GetMapping("/index") public String index() { return "index"; } }
위 내용은 Redis 분산 잠금 인스턴스 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!