Fei Chao 氏の理解によれば、多くの学生が分散ロックを使用する場合、Baidu で直接検索して分散ロックを見つけます。 Redis 分散ロック ツール クラスは直接使用されます。重要なのは、このツール クラスには多くの System.out.println(); やその他のステートメントも含まれていることです。実際、Redis 分散ロックへのより正しいアプローチは、redisson クライアントを使用することです。ツール. 具体的には、検索できる最大のゲイ出会い系サイト github を紹介します.
まず、これまでに Redis の分散ロックを正しく使用したことがあり、対応する公式ドキュメントを読んだことがあれば、 this question So easy. Let’s take a look
正直に言うと、英語が得意なら英語のドキュメントを読んだほうが理解できるかもしれません
デフォルトでは、ロック ウォッチドッグ タイムアウトは 30 秒で、Config.lockWatchdogTimeout 設定を通じて変更できます。
ただし、中国語のドキュメントを見ている場合は、
デフォルトウォッチドッグ チェック ロックのタイムアウトは 30 秒です
この文フェイチャオは中国の観点からすると曖昧な文です。これには 2 つの意味があります
#これを見た後は、誰もがそうしないことを願っていますhack me. 小学校の体育教師です。中国語の先生と同一人物ですが、中国語が苦手ならソースコードを作りましょう!ソースコード解析公式ドキュメントのシンプルなデモに示されている例に基づいて最終的な例を作成しました。この例は、次のような上のスクリーンショットの Ctr C および Ctr V 波操作に基づいています。1。ウォッチドッグのデフォルトは次のとおりです。 30 秒ごとにチェックしますロック タイムアウト時間
2. ウォッチドッグはロック タイムアウト時間をチェックします。デフォルトのロック タイムアウト時間は 30 秒です
public class DemoMain { public static void main(String[] args) throws Exception { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("anyLock"); lock.lock(); //lock.unlock(); } }
create
ここから、internalLockLeaseTime と lockWatchdogTimeout の 2 つのパラメーターが等しいことがわかります。 lockWatchdogTimeout のデフォルト値は次のとおりですpublic class Config { private long lockWatchdogTimeout = 30 * 1000; public long getLockWatchdogTimeout() { return lockWatchdogTimeout; } //省略无关代码 }
lock
##写真のフレームから、ロックの取得に成功すると、スケジュールされたタスク、つまりウォッチドッグが開始されることがわかります。スケジュールされたタスクは、renewExpirationAsync(threadId) の更新を定期的にチェックします。
ここでは、タイミングに netty-common パッケージの HashedWheelTimer が使用されています。Feichao 公式アカウントは、主要な検索エンジンと緊密な協力関係を確立しています。これを検索するだけで済みます。 .画像から、スケジュールされたスケジュールの各呼び出し間の時間差は、internalLockLeaseTime / 3 であることがわかります。つまり、10 秒です。
真相が判明
解決策も非常に簡単です。注意している限り、ロックを取得したスレッドは、ビジネスの処理後に適時にロックを解放します。再入可能なロックであってもロックを取得できない場合は、スレッドは現在の接続を解放し、一定期間スリープすることができます。
public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { TODO ......... } else { // 释放当前redis连接 redis.close(); // 休眠1000毫秒 sleep(1000); } } }
2. B のロックは A によって解放されます。Redis のロック実装の原理は SETNX コマンドにあることがわかっています。キーが存在しない場合、キーの値は value に設定され、戻り値は 1 になります。指定されたキーがすでに存在する場合、SETNX は何も実行せず、戻り値は 0 になります。
SETNX key value
次のシナリオを想像してみましょう: 2 つのスレッド A と B がキー myLock をロックしようとします。スレッド A が最初にロックを取得し (ロックが 3 秒後に期限切れになる場合)、スレッド B はロックの取得を待機しています。ロック、現時点では何も問題はありません。
現時点でビジネス ロジックに時間がかかり、実行時間が redis ロックの有効期限を超えている場合、スレッド A のロックは自動的に解放され (キーが削除され)、スレッド B は、キー myLock が存在しないため、SETNX を実行します。 コマンドもロックを取得しました。
上記の状況を回避するには、通常、ロック時に各スレッドを識別するために独自の一意の値を用意し、指定された値でのみキーを解放する必要があります。そうしないと、ロック解放の混乱したシーンが発生します。 。
emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:
@Transaction public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); } } }
给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。
比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。
一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。
一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。
@Autowired DataSourceTransactionManager dataSourceTransactionManager; @Transaction public void lock() { //手动开启事务 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); //手动提交事务 dataSourceTransactionManager.commit(transactionStatus); } } } catch (Exception e) { //手动回滚事务 dataSourceTransactionManager.rollback(transactionStatus); } }
这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。
同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?
那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。
要是redis锁的过期时间能够自动续期就好了。
为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。
redisson对分布式锁做了很好封装,只需调用API即可。
RLock lock = redissonClient.getLock("stockLock");
redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。
举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。
通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。
@Slf4j @Service public class RedisDistributionLockPlus { /** * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象 */ private static final long DEFAULT_LOCK_TIMEOUT = 30; private static final long TIME_SECONDS_FIVE = 5 ; /** * 每个key的过期时间 {@link LockContent} */ private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512); /** * redis执行成功的返回 */ private static final Long EXEC_SUCCESS = 1L; /** * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间 */ private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " + "if redis.call('exists', KEYS[1]) == 0 then " + "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " + "for k, v in pairs(t) do " + "if v == 'OK' then return tonumber(ARGV[2]) end " + "end " + "return 0 end"; /** * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout */ private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "local ctime = tonumber(ARGV[2]) " + "local biz_timeout = tonumber(ARGV[3]) " + "if ctime > 0 then " + "if redis.call('exists', KEYS[2]) == 1 then " + "local avg_time = redis.call('get', KEYS[2]) " + "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " + "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " + "else redis.call('del', KEYS[2]) end " + "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " + "end " + "return redis.call('del', KEYS[1]) " + "else return 0 end"; /** * 续约lua脚本 */ private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"; private final StringRedisTemplate redisTemplate; public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; ScheduleTask task = new ScheduleTask(this, lockContentMap); // 启动定时任务 ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS); } /** * 加锁 * 取到锁加锁,取不到锁一直等待知道获得锁 * * @param lockKey * @param requestId 全局唯一 * @param expire 锁过期时间, 单位秒 * @return */ public boolean lock(String lockKey, String requestId, long expire) { log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId); for (; ; ) { // 判断是否已经有线程持有锁,减少redis的压力 LockContent lockContentOld = lockContentMap.get(lockKey); boolean unLocked = null == lockContentOld; // 如果没有被锁,就获取锁 if (unLocked) { long startTime = System.currentTimeMillis(); // 计算超时时间 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire; String lockKeyRenew = lockKey + "_renew"; RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); if (null != lockExpire && lockExpire > 0) { // 将锁放入map LockContent lockContent = new LockContent(); lockContent.setStartTime(startTime); lockContent.setLockExpire(lockExpire); lockContent.setExpireTime(startTime + lockExpire * 1000); lockContent.setRequestId(requestId); lockContent.setThread(Thread.currentThread()); lockContent.setBizExpire(bizExpire); lockContent.setLockCount(1); lockContentMap.put(lockKey, lockContent); log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId); return true; } } // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁 if (Thread.currentThread() == lockContentOld.getThread() && requestId.equals(lockContentOld.getRequestId())){ // 计数 +1 lockContentOld.setLockCount(lockContentOld.getLockCount()+1); return true; } // 如果被锁或获取锁失败,则等待100毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // 这里用lombok 有问题 log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e); return false; } } } /** * 解锁 * * @param lockKey * @param lockValue */ public boolean unlock(String lockKey, String lockValue) { String lockKeyRenew = lockKey + "_renew"; LockContent lockContent = lockContentMap.get(lockKey); long consumeTime; if (null == lockContent) { consumeTime = 0L; } else if (lockValue.equals(lockContent.getRequestId())) { int lockCount = lockContent.getLockCount(); // 每次释放锁, 计数 -1,减到0时删除redis上的key if (--lockCount > 0) { lockContent.setLockCount(lockCount); return false; } consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000; } else { log.info("释放锁失败,不是自己的锁。"); return false; } // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁 lockContentMap.remove(lockKey); RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime), Long.toString(lockContent.getBizExpire())); return EXEC_SUCCESS.equals(result); } /** * 续约 * * @param lockKey * @param lockContent * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决)) */ public boolean renew(String lockKey, LockContent lockContent) { // 检测执行业务线程的状态 Thread.State state = lockContent.getThread().getState(); if (Thread.State.TERMINATED == state) { log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent); return false; } String requestId = lockContent.getRequestId(); long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000; RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut)); log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result)); return EXEC_SUCCESS.equals(result); } static class ScheduleExecutor { public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) { long delay = unit.toMillis(initialDelay); long period_ = unit.toMillis(period); // 定时执行 new Timer("Lock-Renew-Task").schedule(task, delay, period_); } } static class ScheduleTask extends TimerTask { private final RedisDistributionLockPlus redisDistributionLock; private final Map<String, LockContent> lockContentMap; public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) { this.redisDistributionLock = redisDistributionLock; this.lockContentMap = lockContentMap; } @Override public void run() { if (lockContentMap.isEmpty()) { return; } Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet(); for (Map.Entry<String, LockContent> entry : entries) { String lockKey = entry.getKey(); LockContent lockContent = entry.getValue(); long expireTime = lockContent.getExpireTime(); // 减少线程池中任务数量 if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) { //线程池异步续约 ThreadPool.submit(() -> { boolean renew = redisDistributionLock.renew(lockKey, lockContent); if (renew) { long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000; lockContent.setExpireTime(expireTimeNew); } else { // 续约失败,说明已经执行完 OR redis 出现问题 lockContentMap.remove(lockKey); } }); } } } } }
redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。
redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。
如果此时redis master节点宕机,为保证集群可用性,会进行主备切换,slave变为了redis master。A客户端错误地认为它在旧的master节点上成功加锁,但实际上锁已经被B客户端在新的master节点上加上了。
此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。
至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。
小结一下:上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。
以上がRedis 分散ロックを更新する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。