この記事では、redisson による分散ロック実装の原理を詳しく紹介します。非常に優れた参考値です。以下のエディターで見てみましょう
Redisson 分散ロック
先ほどのアノテーションベースのロックには、基本的な Redis 分散ロックであるロックが実装されています。この記事では、コンポーネントによって提供される redisson RLock に基づいて、redisson がロックを実装する方法について説明します。
バージョンが異なれば、ロック機構も異なります。
は、最近リリースされた redisson のバージョン 3.2.3 を指します。初期のバージョンでは、単純な setnx、getset が使用されるようです。通常のコマンドが実行されるまで待ちます。その後、redis がスクリプト Lua をサポートしたため、実装原則が変更されました。
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.2.3</version> </dependency>
setnx は、デッドロックの問題を回避するために getset とトランザクションで完了する必要があります。新しいバージョンでは、トランザクションの使用や複数の Redis コマンドの操作を回避できる Lua スクリプトがサポートされており、セマンティック表現がより明確になっています。 。
RLockインターフェースの特徴
標準インターフェースLockを継承
ロック、ロック解除、トライロックなど、標準ロックインターフェースのすべての機能を備えています。
標準インターフェイス Lock を拡張しました
多くのメソッドを拡張しました。最も一般的に使用されるものは、強制ロック解放、有効期間付きロック、および一連の非同期メソッドです。最初の 2 つの方法は主に、標準のロックによって発生する可能性のあるデッドロックの問題を解決するためのものです。たとえば、スレッドがロックを取得した後、そのスレッドが存在するマシンがクラッシュします。このとき、ロックを取得したスレッドはロックを正常に解放できず、ロックを待っている残りのスレッドが待機することになります。
再入可能メカニズム
各バージョンの実装には違いがあります。再入可能性の主な考慮事項は、ロックを解放する前に同じスレッドが再度ロック リソースを適用する場合、そのプロセスを通過する必要はありません。アプリケーション プロセスは、ロック リソースを取得するだけで済みます。ロックは、jdk の ReentrantLock 関数と同様に、再エントリ数を返して記録し続けることができます。再エントリ数は、hincrby コマンドと組み合わせて使用されます。詳細なパラメーターは次のコードにあります。
同じスレッドかどうかを確認するにはどうすればよいですか?
redissonの解決策は、RedissonLockインスタンスのguidと現在のスレッドのIDを追加し、getLockNameを通してそれを返すことです
public class RedissonLock extends RedissonExpirable implements RLock { final UUID id; protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L); this.commandExecutor = commandExecutor; this.id = id; } String getLockName(long threadId) { return this.id + ":" + threadId; }
RLockがロックを取得するための2つのシナリオ
ここから入手してください tryLock のソース コードを見ると、tryAcquire メソッドはロックに適用され、ロック有効期間の残り時間を返します。それが空の場合、ロックが直接取得されておらず、返されていないことを意味します。他のスレッドで時間を取得すると、待ちの競争ロジックに入ります。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { //直接获取到锁 return true; } else { //有竞争的后续看 } }
競合なし、ロックを直接取得します
まず、ロックの取得とロックの解放の背後で Redis が何をしているかを見てみましょう。 Redis モニターを使用して、Redis の実行を監視できます。背景。 @RequestLockable をメソッドに追加すると、実際に lock と lock が呼び出されます。 以下は redis コマンドです:
lock
redis の上位バージョンは lua スクリプトをサポートするため、redisson もそれをサポートします。 Lua スクリプトに詳しくない場合は、調べてください。 lua コマンドを実行するロジックは次のとおりです。<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)}); }
ロック プロセス:
"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" 1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "346e1eb8-5bfd-4d49-9870-042df402f248:21" "1" 1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"
ロック解除
ロック解除のプロセスは少し複雑に見えます:
"EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end; if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0" "1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
主にロック解除メッセージを送信してウェイクアップします。待機キュー内のスレッドが再度ロックを獲得するために競合します。
りー
有竞争,等待
有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在redisson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。
this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
this.await返回true,进入循环尝试获取锁。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { return true; } else { //重点是这段 time -= System.currentTimeMillis() - current; if(time <= 0L) { return false; } else { current = System.currentTimeMillis(); final RFuture subscribeFuture = this.subscribe(threadId); if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if(!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener() { public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if(subscribeFuture.isSuccess()) { RedissonLock.this.unsubscribe(subscribeFuture, threadId); } } }); } return false; } else { boolean var16; try { time -= System.currentTimeMillis() - current; if(time <= 0L) { boolean currentTime1 = false; return currentTime1; } do { long currentTime = System.currentTimeMillis(); ttl = this.tryAcquire(leaseTime, unit); if(ttl == null) { var16 = true; return var16; } time -= System.currentTimeMillis() - currentTime; if(time <= 0L) { var16 = false; return var16; } currentTime = System.currentTimeMillis(); if(ttl.longValue() >= 0L && ttl.longValue() < time) { this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS); } else { this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while(time > 0L); var16 = false; } finally { this.unsubscribe(subscribeFuture, threadId); } return var16; } } } }
循环尝试一般有如下几种方法:
while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。
Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。
基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。
redisson依赖
由于redisson不光是针对锁,提供了很多客户端操作redis的方法,所以会依赖一些其它的框架,比如netty,如果只是简单的使用锁也可以自己去实现。
以上就是redisson实现分布式锁原理详解的内容,更多相关内容请关注PHP中文网(www.php.cn)!