マルチスレッド環境では、コード ブロックに同時に 1 つのスレッドのみがアクセスできるようにするために、Java では通常、同期構文と ReetrantLock を使用して、これが実際にローカル ロック メソッドであることを確認してください。しかし現在、企業は分散アーキテクチャを採用しており、分散環境では、異なるノード上のスレッドが確実に同時に実行されるようにするにはどうすればよいでしょうか?したがって、分散システム間で共有リソースへの相互排他的アクセスを制御する方法である分散ロックが導入されています。 分散システムでは、複数のサービスが複数のマシンにデプロイされます。クライアント上のユーザーがデータ挿入リクエストを開始するときに、分散ロック メカニズムの保証がない場合、複数のサービスが同時挿入操作を実行する可能性があります。その結果、データが繰り返し挿入されることになり、冗長データを許可しない一部の企業では問題が発生する可能性があります。分散ロック機構は、このような問題を解決し、複数のサービス間で共有リソースへの相互排他的なアクセスを保証するものであり、あるサービスが分散ロックを獲得し、他のサービスがロックを獲得しない場合、それ以降の操作は実行されません。一般的な意味は、次の図に示すとおりです。
分散ロックの特徴
set を知っているかもしれません。キー値 px ミリ秒 nx。後者の方法の核となる実装コマンドは次のとおりです。
- 获取锁(unique_value可以是UUID等) SET resource_name unique_value NX PX 30000 - 释放锁(lua脚本中,一定要比较value,防止误解锁) if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
この実装方法には 3 つの主要なポイントがあります (これは、面接の可能性が非常に高い場所でもあります):
set コマンドではset key value px ミリ秒 nx
- を使用する必要があります;
値は一意である必要があります;
- ロックを解放するとき 値を確認するために、ロックを誤解することはできません;
- 実際、このタイプのロックの最大の欠点は、1 つの Redis にのみ作用することです。 Redis がセンチネルの高可用性を通じてそれを保証している場合でも、ノードをロックするとき、マスター ノードが何らかの理由でマスターとスレーブを切り替えると、ロックは失われます:
ロックを取得しましたRedis マスター ノード上 ;。 Redlock は、Redis のすべての分散ロック実装の中で、面接官を興奮させることができる唯一の方法でもあります。Redlock
- しかし、ロックされたキーはまだスレーブ ノードに同期されていません;
- マスターに障害が発生し、フェイルオーバーが発生し、スレーブ ノードがマスター ノードにアップグレードされると、
- によりロックが失われます。
- 単一障害点の問題を回避するために、Redis 作成者 antirez は、分散環境に基づいたより高度な分散ロック実装方法を提案しました。
Redis 高度な分散ロック: Redlock
antirez によって提案されたレッドロック アルゴリズムはおおよそ次のとおりです: Redis 分散環境では、N 個の Redis マスターがあると仮定します。これらのノードは互いに完全に独立しており、マスター/スレーブ レプリケーションやその他のクラスター調整メカニズム
はありません。 Redis 単一インスタンスの場合と同じ方法を使用して、N 個のインスタンスのロックを取得および解放するようにします。ここでは、5 つの Redis マスター ノードがあると仮定します。これらの Redis インスタンスを 5 つのサーバーで実行して、すべてが同時にダウンしないようにする必要があります。ロックを取得するには、クライアントは次の操作を実行する必要があります:
(UUID など) を使用して、5 つのインスタンスから順番にロックを取得してみます。 Redis からのロックを要求する場合、クライアントはネットワーク接続と応答タイムアウトを設定する必要があります。これは、ロックの有効期限よりも短くする必要があります。たとえば、ロックの自動有効期限 TTL が 10 秒の場合、タイムアウトは 5 ~ 50 ミリ秒にする必要があります。これにより、サーバー側 Redis がハングアップし、クライアントがまだ応答結果を待っているという状況を回避できます。指定された時間内にサーバーが応答しない場合、クライアントはできるだけ早く別の Redis インスタンスからロックを取得しようとする必要があります。
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
此处不讨论时钟漂移
redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389") .setMasterName("masterName") .setPassword("password").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); // 还可以getFairLock(), getReadWriteLock() RLock redLock = redissonClient.getLock("REDLOCK_KEY"); boolean isLock; try { isLock = redLock.tryLock(); // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //TODO if get lock success, do something; } } catch (Exception e) { } finally { // 无论如何, 最后都要解锁 redLock.unlock(); }
实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID(); String getLockName(long threadId) { return id + ":" + threadId; }
获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 获取锁时向5个redis实例发送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 获取分布式锁的KEY的失效时间毫秒数 "return redis.call('pttl', KEYS[1]);", // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2] Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
获取锁的命令中,
KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;
ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;
ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:
释放锁的代码为redLock.unlock(),核心源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // 向5个redis实例都执行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分布式锁KEY不存在,那么向channel发布一条消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 如果就是当前线程占有分布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。
自定义一个注解,被注解的方法会执行获取分布式锁的逻辑
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RedisLock { /** * 业务键 * * @return */ String key(); /** * 锁的过期秒数,默认是5秒 * * @return */ int expire() default 5; /** * 尝试加锁,最多等待时间 * * @return */ long waitTime() default Long.MIN_VALUE; /** * 锁的超时时间单位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:
@Aspect @Component public class LockMethodAspect { @Autowired private RedisLockHelper redisLockHelper; @Autowired private JedisUtil jedisUtil; private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class); @Around("@annotation(com.redis.lock.annotation.RedisLock)") public Object around(ProceedingJoinPoint joinPoint) { Jedis jedis = jedisUtil.getJedis(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); RedisLock redisLock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); String key = redisLock.key(); try { final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit()); logger.info("isLock : {}",islock); if (!islock) { logger.error("获取锁失败"); throw new RuntimeException("获取锁失败"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系统异常"); } } finally { logger.info("释放锁"); redisLockHelper.unlock(jedis,key, value); jedis.close(); } } }
@Component public class RedisLockHelper { private long sleepTime = 100; /** * 直接使用setnx + expire方式获取分布式锁 * 非原子性 * * @param key * @param value * @param timeout * @return */ public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) { Long result = jedis.setnx(key, value); // result = 1时,设置成功,否则设置失败 if (result == 1L) { return jedis.expire(key, timeout) == 1L; } else { return false; } } /** * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */ public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判断是否成功 return result.equals(1L); } /** * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */ public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) { long seconds = timeUnit.toSeconds(timeout); return "OK".equals(jedis.set(key, value, "NX", "EX", seconds)); } /** * 自定义获取锁的超时时间 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */ public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException { long seconds = timeUnit.toSeconds(timeout); while (waitTime >= 0) { String result = jedis.set(key, value, "nx", "ex", seconds); if ("OK".equals(result)) { return true; } waitTime -= sleepTime; Thread.sleep(sleepTime); } return false; } /** * 错误的解锁方法—直接删除key * * @param key */ public void unlock_with_del(Jedis jedis,String key) { jedis.del(key); } /** * 使用Lua脚本进行解锁操纵,解锁的时候验证value值 * * @param jedis * @param key * @param value * @return */ public boolean unlock(Jedis jedis,String key,String value) { String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); } }
定义一个TestController来测试我们实现的分布式锁
@RestController public class TestController { @RedisLock(key = "redis_lock") @GetMapping("/index") public String index() { return "index"; } }
以上がRedis 分散ロック インスタンスの分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。