Redis 분산 잠금을 갱신하는 방법
Redis 분산 잠금 갱신 방법
Redis 분산 잠금의 올바른 자세
Fei Chao에 따르면 많은 학생들이 분산 잠금을 사용할 때 Baidu에서 직접 검색하여 Redis 분산 잠금 도구 클래스를 찾아 직접 사용하는 것이 핵심입니다. 이 도구 클래스는 또한 많은 System.out.println(); 및 기타 명령문으로 채워져 있습니다. 실제로 Redis 분산 잠금에 대한 더 올바른 접근 방식은 클라이언트 도구 redisson을 사용하는 것입니다. -섹스 데이트 웹사이트 github.
답변
먼저 Redis의 분산 잠금 장치를 올바르게 사용하고 해당 공식 문서를 읽었다면 이 질문은 매우 쉽습니다. 영어가 훌륭하다면 영어 문서를 읽어보시면 이해가 더 쉬울 것입니다
기본적으로 잠금 감시 시간 제한은 30초이며 Config.lockWatchdogTimeout 설정을 통해 변경할 수 있습니다.
Watchdog이 잠금을 확인합니다. 기본 타임아웃 시간은 30초입니다페이차오라는 문장은 언어학적으로 두 가지 의미를 갖는 문장입니다.
1. Watchdog은 타임아웃 시간을 확인하기 위해 기본적으로 30초를 설정합니다. a lock
2. Watchdog은 lock timeout을 확인합니다. 기본 lock 시간은 30초입니다
공식문서에 나와 있는 예제를 바탕으로 가장 간단한 데모를 작성했습니다. 예제는 Ctr+C와 Ctr+를 기반으로 합니다. 위 스크린샷의 V wave 작업은 다음과 같습니다이것을 보고 우리 초등학교 체육 선생님과 중국어 선생님이 같은 사람이지만 모두가 비난하지 않기를 바랍니다. .중국어가 잘 안되면 소스를 만들어도 됩니다!
소스코드 분석
public class DemoMain { public static void main(String[] args) throws Exception { Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("anyLock"); lock.lock(); //lock.unlock(); } }
create
여기서 우리는 두 매개 변수 InternalLockLeaseTime과 lockWatchdogTimeout이 동일하다는 것을 알 수 있습니다.
lockWatchdogTimeout의 기본값은 다음과 같습니다
public class Config { private long lockWatchdogTimeout = 30 * 1000; public long getLockWatchdogTimeout() { return lockWatchdogTimeout; } //省略无关代码 }
소스 코드 분석을 통해 기본적으로 잠금 시간은 30초임을 알 수 있습니다. 잠긴 업무가 완료되지 않은 경우 30-10 = 20초가 지나면 갱신이 수행되고 잠금이 30초로 재설정됩니다. 이때 일부 학생들은 다시 업무 머신이 다운되면 어떻게 되는지 묻습니다. ? 다운되면 예약된 작업을 실행할 수 없고 기간을 갱신할 수 없으며 당연히 30초 후에 잠금이 해제됩니다.
Redis 분산 잠금의 5가지 함정 1. 잠금이 해제되지 않습니다.
이 상황은 제가 위에서 했던 실수인 저수준 실수입니다. 현재 스레드로 인해 Redis 잠금을 획득한 후 비즈니스 처리 후 제때에 잠금이 해제되지 않아 다른 스레드가 계속 잠금을 획득하려고 시도하게 됩니다. 예를 들어 Jedis 클라이언트를 사용하면 다음 오류 메시지가 보고됩니다.
Redis 스레드 풀에는 처리할 여유 스레드가 없습니다. 클라이언트 명령.
해결 방법도 매우 간단합니다. 잠금을 얻은 스레드는 비즈니스를 처리한 후 시간 내에 잠금을 해제하지만 잠금을 얻은 후에는 잠금을 해제할 수 있습니다. 현재 연결을 끊고 일정 시간 동안 절전 모드로 전환합니다.
public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { TODO ......... } else { // 释放当前redis连接 redis.close(); // 休眠1000毫秒 sleep(1000); } } }
2. B의 잠금이 A에 의해 해제되었습니다
우리는 Redis의 잠금 구현 원리가 SETNX 명령에 있다는 것을 알고 있습니다. 키가 없으면 키 값이 value로 설정되고 반환 값은 1입니다. 지정된 키가 이미 있으면 SETNX는 어떤 작업도 수행하지 않고 반환 값은 0입니다.
SETNX key value
두 스레드 A와 B가 myLock 키를 잠그려고 시도합니다. 스레드 A가 먼저 잠금을 획득하고(잠금이 3초 후에 만료되는 경우) 지금까지 스레드 B가 잠금을 획득하려고 기다리고 있습니다. 전혀 문제가 없습니다. 이때 비즈니스 로직에 시간이 많이 걸리고 실행 시간이 redis 잠금 만료 시간을 초과한 경우 스레드 A의 잠금이 자동으로 해제되고(키가 삭제됨) 스레드 B는 myLock 키가 수행하는 것을 감지합니다. 존재하지 않으며 SETNX 명령을 실행하여 잠금을 얻습니다.
그러나 스레드 A가 비즈니스 로직을 완료하더라도 잠금은 여전히 해제되므로(즉, 키가 삭제됨) 스레드 B의 잠금도 스레드 A에 의해 해제됩니다.
위 상황을 방지하려면 일반적으로 잠금 시 각 스레드를 식별하기 위해 고유한 값을 가져와 지정된 값으로만 키를 놓아야 합니다. 그렇지 않으면 혼란스러운 잠금 해제 장면이 발생하게 됩니다.
三、数据库事务超时
emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:
@Transaction public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); } } }
给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。
比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。
一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。
一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。
@Autowired DataSourceTransactionManager dataSourceTransactionManager; @Transaction public void lock() { //手动开启事务 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); //手动提交事务 dataSourceTransactionManager.commit(transactionStatus); } } } catch (Exception e) { //手动回滚事务 dataSourceTransactionManager.rollback(transactionStatus); } }
四、锁过期了,业务还没执行完
这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。
同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?
那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。
要是redis锁的过期时间能够自动续期就好了。
为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。
redisson对分布式锁做了很好封装,只需调用API即可。
RLock lock = redissonClient.getLock("stockLock");
redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。
举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。
通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。
@Slf4j @Service public class RedisDistributionLockPlus { /** * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象 */ private static final long DEFAULT_LOCK_TIMEOUT = 30; private static final long TIME_SECONDS_FIVE = 5 ; /** * 每个key的过期时间 {@link LockContent} */ private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512); /** * redis执行成功的返回 */ private static final Long EXEC_SUCCESS = 1L; /** * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间 */ private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " + "if redis.call('exists', KEYS[1]) == 0 then " + "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " + "for k, v in pairs(t) do " + "if v == 'OK' then return tonumber(ARGV[2]) end " + "end " + "return 0 end"; /** * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout */ private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "local ctime = tonumber(ARGV[2]) " + "local biz_timeout = tonumber(ARGV[3]) " + "if ctime > 0 then " + "if redis.call('exists', KEYS[2]) == 1 then " + "local avg_time = redis.call('get', KEYS[2]) " + "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " + "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " + "else redis.call('del', KEYS[2]) end " + "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " + "end " + "return redis.call('del', KEYS[1]) " + "else return 0 end"; /** * 续约lua脚本 */ private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end"; private final StringRedisTemplate redisTemplate; public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; ScheduleTask task = new ScheduleTask(this, lockContentMap); // 启动定时任务 ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS); } /** * 加锁 * 取到锁加锁,取不到锁一直等待知道获得锁 * * @param lockKey * @param requestId 全局唯一 * @param expire 锁过期时间, 单位秒 * @return */ public boolean lock(String lockKey, String requestId, long expire) { log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId); for (; ; ) { // 判断是否已经有线程持有锁,减少redis的压力 LockContent lockContentOld = lockContentMap.get(lockKey); boolean unLocked = null == lockContentOld; // 如果没有被锁,就获取锁 if (unLocked) { long startTime = System.currentTimeMillis(); // 计算超时时间 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire; String lockKeyRenew = lockKey + "_renew"; RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); if (null != lockExpire && lockExpire > 0) { // 将锁放入map LockContent lockContent = new LockContent(); lockContent.setStartTime(startTime); lockContent.setLockExpire(lockExpire); lockContent.setExpireTime(startTime + lockExpire * 1000); lockContent.setRequestId(requestId); lockContent.setThread(Thread.currentThread()); lockContent.setBizExpire(bizExpire); lockContent.setLockCount(1); lockContentMap.put(lockKey, lockContent); log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId); return true; } } // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁 if (Thread.currentThread() == lockContentOld.getThread() && requestId.equals(lockContentOld.getRequestId())){ // 计数 +1 lockContentOld.setLockCount(lockContentOld.getLockCount()+1); return true; } // 如果被锁或获取锁失败,则等待100毫秒 try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // 这里用lombok 有问题 log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e); return false; } } } /** * 解锁 * * @param lockKey * @param lockValue */ public boolean unlock(String lockKey, String lockValue) { String lockKeyRenew = lockKey + "_renew"; LockContent lockContent = lockContentMap.get(lockKey); long consumeTime; if (null == lockContent) { consumeTime = 0L; } else if (lockValue.equals(lockContent.getRequestId())) { int lockCount = lockContent.getLockCount(); // 每次释放锁, 计数 -1,减到0时删除redis上的key if (--lockCount > 0) { lockContent.setLockCount(lockCount); return false; } consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000; } else { log.info("释放锁失败,不是自己的锁。"); return false; } // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁 lockContentMap.remove(lockKey); RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime), Long.toString(lockContent.getBizExpire())); return EXEC_SUCCESS.equals(result); } /** * 续约 * * @param lockKey * @param lockContent * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决)) */ public boolean renew(String lockKey, LockContent lockContent) { // 检测执行业务线程的状态 Thread.State state = lockContent.getThread().getState(); if (Thread.State.TERMINATED == state) { log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent); return false; } String requestId = lockContent.getRequestId(); long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000; RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut)); log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result)); return EXEC_SUCCESS.equals(result); } static class ScheduleExecutor { public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) { long delay = unit.toMillis(initialDelay); long period_ = unit.toMillis(period); // 定时执行 new Timer("Lock-Renew-Task").schedule(task, delay, period_); } } static class ScheduleTask extends TimerTask { private final RedisDistributionLockPlus redisDistributionLock; private final Map<String, LockContent> lockContentMap; public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) { this.redisDistributionLock = redisDistributionLock; this.lockContentMap = lockContentMap; } @Override public void run() { if (lockContentMap.isEmpty()) { return; } Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet(); for (Map.Entry<String, LockContent> entry : entries) { String lockKey = entry.getKey(); LockContent lockContent = entry.getValue(); long expireTime = lockContent.getExpireTime(); // 减少线程池中任务数量 if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) { //线程池异步续约 ThreadPool.submit(() -> { boolean renew = redisDistributionLock.renew(lockKey, lockContent); if (renew) { long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000; lockContent.setExpireTime(expireTimeNew); } else { // 续约失败,说明已经执行完 OR redis 出现问题 lockContentMap.remove(lockKey); } }); } } } } }
五、redis主从复制的坑
redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。
redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。
如果此时redis master节点宕机,为保证集群可用性,会进行主备切换,slave变为了redis master。A客户端错误地认为它在旧的master节点上成功加锁,但实际上锁已经被B客户端在新的master节点上加上了。
此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。
至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。
小结一下:上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。
위 내용은 Redis 분산 잠금을 갱신하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











Redis Cluster Mode는 Sharding을 통해 Redis 인스턴스를 여러 서버에 배포하여 확장 성 및 가용성을 향상시킵니다. 시공 단계는 다음과 같습니다. 포트가 다른 홀수 redis 인스턴스를 만듭니다. 3 개의 센티넬 인스턴스를 만들고, Redis 인스턴스 및 장애 조치를 모니터링합니다. Sentinel 구성 파일 구성, Redis 인스턴스 정보 및 장애 조치 설정 모니터링 추가; Redis 인스턴스 구성 파일 구성, 클러스터 모드 활성화 및 클러스터 정보 파일 경로를 지정합니다. 각 redis 인스턴스의 정보를 포함하는 Nodes.conf 파일을 작성합니다. 클러스터를 시작하고 Create 명령을 실행하여 클러스터를 작성하고 복제본 수를 지정하십시오. 클러스터에 로그인하여 클러스터 정보 명령을 실행하여 클러스터 상태를 확인하십시오. 만들다

Redis는 해시 테이블을 사용하여 데이터를 저장하고 문자열, 목록, 해시 테이블, 컬렉션 및 주문한 컬렉션과 같은 데이터 구조를 지원합니다. Redis는 Snapshots (RDB)를 통해 데이터를 유지하고 WRITE 전용 (AOF) 메커니즘을 추가합니다. Redis는 마스터 슬레이브 복제를 사용하여 데이터 가용성을 향상시킵니다. Redis는 단일 스레드 이벤트 루프를 사용하여 연결 및 명령을 처리하여 데이터 원자력과 일관성을 보장합니다. Redis는 키의 만료 시간을 설정하고 게으른 삭제 메커니즘을 사용하여 만료 키를 삭제합니다.

Redis-Server가 찾을 수없는 문제를 해결하기위한 단계 : Redis가 올바르게 설치되어 있는지 확인하십시오. 환경 변수를 설정 redis_host 및 redis_port; Redis Server Redis-Server를 시작하십시오. 서버가 Redis-Cli Ping을 실행 중인지 확인하십시오.

Redis에서 모든 키를 보려면 세 가지 방법이 있습니다. 키 명령을 사용하여 지정된 패턴과 일치하는 모든 키를 반환하십시오. 스캔 명령을 사용하여 키를 반복하고 키 세트를 반환하십시오. 정보 명령을 사용하여 총 키 수를 얻으십시오.

Redis 버전 번호를 보려면 다음 세 가지 방법을 사용할 수 있습니다. (1) info 명령을 입력하고 (2) -version 옵션으로 서버를 시작하고 (3) 구성 파일을 봅니다.

Redis 소스 코드를 이해하는 가장 좋은 방법은 단계별로 이동하는 것입니다. Redis의 기본 사항에 익숙해집니다. 특정 모듈을 선택하거나 시작점으로 기능합니다. 모듈 또는 함수의 진입 점으로 시작하여 코드를 한 줄씩 봅니다. 함수 호출 체인을 통해 코드를 봅니다. Redis가 사용하는 기본 데이터 구조에 익숙해 지십시오. Redis가 사용하는 알고리즘을 식별하십시오.

Redis 지시 사항을 사용하려면 다음 단계가 필요합니다. Redis 클라이언트를 엽니 다. 명령 (동사 키 값)을 입력하십시오. 필요한 매개 변수를 제공합니다 (명령어마다 다름). 명령을 실행하려면 Enter를 누르십시오. Redis는 작업 결과를 나타내는 응답을 반환합니다 (일반적으로 OK 또는 -err).

Redis 순서 세트 (ZSETS)는 순서가있는 요소를 저장하고 관련 점수별로 정렬하는 데 사용됩니다. ZSET을 사용하는 단계에는 다음이 포함됩니다. 1. ZSET을 만듭니다. 2. 회원 추가; 3. 회원 점수를 얻으십시오. 4. 순위를 얻으십시오. 5. 순위 범위에서 멤버를 받으십시오. 6. 회원 삭제; 7. 요소 수를 얻으십시오. 8. 점수 범위에서 멤버 수를 얻으십시오.
