이 글에서는 Redisson이 구현한 분산 잠금의 원리를 자세히 소개하겠습니다. 매우 좋은 참조값을 가지고 있습니다. 아래 에디터로 살펴보겠습니다
Redisson 분산 잠금
이전 주석 기반 잠금에는 일종의 잠금 기능이 있습니다. 기본 redis 분산 잠금, redisson 구성 요소에서 제공하는 RLock을 기반으로 잠금을 구현합니다. 이 기사에서는 redisson이 잠금을 구현하는 방법을 살펴보겠습니다.
잠금을 구현하는 메커니즘은 버전마다 다릅니다.
최근 출시된 redisson 버전 3.2.3이 인용되었으며, 다른 버전에서 잠금을 구현할 수 있습니다. 메커니즘이 다릅니다. 초기 버전에서는 간단한 setnx, getset 및 기타 기존 명령을 사용하여 구성을 완료한 것으로 보입니다. 이후에는 redis가 스크립트 Lua를 지원하므로 구현 원칙이 변경되었습니다.
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.2.3</version> </dependency>
setnx는 교착 상태 문제를 더 잘 피하기 위해 getset 및 트랜잭션으로 완료되어야 하며 새 버전에서는 다음을 지원하므로 트랜잭션 사용을 피할 수 있습니다. lua 스크립트 여러 redis 명령을 실행할 뿐만 아니라 의미 표현도 더 명확해졌습니다.
RLock 인터페이스 기능
표준 인터페이스 Lock 상속
잠금, 잠금 해제, trylock 등과 같은 표준 잠금 인터페이스의 모든 기능을 갖추고 있습니다.
표준 인터페이스 잠금 확장
일반적으로 사용되는 방법은 강제 잠금 해제, 유효 기간이 있는 잠금 및 비동기식 메서드 집합입니다. 처음 두 가지 방법은 주로 표준 잠금으로 인해 발생할 수 있는 교착 상태 문제를 해결하는 것입니다. 예를 들어 스레드가 잠금을 획득한 후 스레드가 위치한 시스템이 충돌하면 잠금을 획득한 스레드가 정상적으로 잠금을 해제할 수 없어 잠금을 기다리는 나머지 스레드가 대기하게 됩니다.
재진입 메커니즘
각 버전의 구현에 차이가 있습니다. 재진입의 주요 고려 사항은 동일한 스레드가 그렇지 않을 때입니다. 잠금 해제 잠금 리소스를 다시 신청하는 경우에는 신청 과정을 거칠 필요 없이 계속해서 획득한 잠금을 반환하고 재진입 횟수만 기록하면 되는데, 이는 jdk의 ReentrantLock 기능과 유사합니다. 재진입 횟수는 hincrby 명령과 함께 사용됩니다. 자세한 매개변수는 아래 코드에 나와 있습니다.
같은 스레드인지 어떻게 확인하나요?
redisson의 솔루션은 RedissonLock 인스턴스의 GUID를 현재 스레드의 ID에 추가하고
<🎜을 반환하는 것입니다. > getLockNamepublic class RedissonLock extends RedissonExpirable implements RLock { final UUID id; protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L); this.commandExecutor = commandExecutor; this.id = id; } String getLockName(long threadId) { return this.id + ":" + threadId; }
RLock이 잠금을 획득하는 두 가지 시나리오
다음은 tryLock의 소스 코드입니다. tryAcquire 메소드는 Lock을 적용하고 잠금 유효 기간의 남은 시간을 반환하는 것입니다. 비어 있으면 다른 스레드에서 잠금을 직접 획득하여 반환하지 않았음을 의미합니다. 시간을 획득하면 대기 경쟁 로직이 실행됩니다. 입력됩니다.rreee
경쟁 없음, 직접 잠금 획득
첫 번째 단계를 살펴보세요 잠금 획득 및 잠금 해제 뒤에 Redis는 무엇을 하고 있나요? Redis 모니터를 사용하여 백그라운드에서 Redis 실행을 모니터링할 수 있습니다. @RequestLockable을 메서드에 추가하면 실제로 잠금 및 잠금 해제를 호출하는 redis 명령은 다음과 같습니다.Lock
high redis 버전은 lua 스크립트를 지원하므로 redisson도 이를 지원하고 lua 스크립트에 익숙하지 않은 분들도 찾아보실 수 있습니다. lua 명령 실행 논리는 다음과 같습니다.public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { //直接获取到锁 return true; } else { //有竞争的后续看 } }
잠금 프로세스:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)}); }
아아앙
잠금 해제
잠금 해제 과정 더 복잡한 것 같습니다.
"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
아아앙
有竞争,等待
有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在redisson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。
this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
this.await返回true,进入循环尝试获取锁。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { return true; } else { //重点是这段 time -= System.currentTimeMillis() - current; if(time <= 0L) { return false; } else { current = System.currentTimeMillis(); final RFuture subscribeFuture = this.subscribe(threadId); if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if(!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if(subscribeFuture.isSuccess()) { RedissonLock.this.unsubscribe(subscribeFuture, threadId); } } }); } return false; } else { boolean var16; try { time -= System.currentTimeMillis() - current; if(time <= 0L) { boolean currentTime1 = false; return currentTime1; } do { long currentTime = System.currentTimeMillis(); ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { var16 = true; return var16; } time -= System.currentTimeMillis() - currentTime; if(time <= 0L) { var16 = false; return var16; } currentTime = System.currentTimeMillis(); if(ttl.longValue() >= 0L && ttl.longValue() < time) { this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS); } else { this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while(time > 0L); var16 = false; } finally { this.unsubscribe(subscribeFuture, threadId); } return var16; } } } }
循环尝试一般有如下几种方法:
while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。
Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。
基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。
redisson依赖
由于redisson不光是针对锁,提供了很多客户端操作redis的方法,所以会依赖一些其它的框架,比如netty,如果只是简单的使用锁也可以自己去实现。
以上就是redisson实现分布式锁原理详解的内容,更多相关内容请关注PHP中文网(www.php.cn)!