Eine verteilte Sperre ist eine Sperre, die es ermöglicht, dass mehrere Prozesse in einem verteilten System oder Clustermodus sichtbar sind und sich gegenseitig ausschließen.
Verteilte Sperren basierend auf Redis implementieren:
Gegenseitiger Ausschluss: Stellen Sie sicher, dass nur ein Thread die Sperre erwerben kann;
Nicht blockierend: Versuchen Sie, die Sperre zu erwerben, und geben Sie bei Erfolg „true“ zurück , false, wenn fehlgeschlagen;
Ablaufzeit der Sperre hinzufügen, um einen durch Dienstausfallzeiten verursachten Deadlock zu vermeiden.
SET lock thread1 NX EX 10
SET lock thread1 NX EX 10
手动释放;DEL key1
超时释放,获取锁时添加一个超时锁;
package com.guor.utils; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; public class RedisLock implements ILock{ private String name; private StringRedisTemplate stringRedisTemplate; public RedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; @Override public boolean tryLock(long timeout) { // 获取线程唯一标识 long threadId = Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId+"", timeout, TimeUnit.SECONDS); // 防止拆箱的空指针异常 return Boolean.TRUE.equals(success); } @Override public void unlock() { stringRedisTemplate.delete(KEY_PREFIX + name); } }
如果线程1获取锁,但线程1发生了阻塞,导致Redis超时释放锁;
此时,线程2尝试获取锁,成功,并执行业务;
此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);
但是,线程1删除的锁,和线程2的锁是同一把锁,这就是分布式锁误删问题
;
在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。
package com.guor.utils; import cn.hutool.core.lang.UUID; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.concurrent.TimeUnit; public class RedisLock implements ILock{ private String name; private StringRedisTemplate stringRedisTemplate; public RedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; private static final String UUID_PREFIX = UUID.randomUUID().toString(true) + "-"; @Override public boolean tryLock(long timeout) { // 获取线程唯一标识 String threadId = UUID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS); // 防止拆箱的空指针异常 return Boolean.TRUE.equals(success); } @Override public void unlock() { // 获取线程唯一标识 String threadId = UUID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标识 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标示是否一致 if(threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } } }
SETNX
DEL key1
timeout release, beim Erfassen eine Zeitüberschreitung hinzufügen lock Lock; 2. Codebeispiel <!--redisson--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
Problem mit versehentlichem Löschen der Sperre
;package com.guor.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); /** * 单点地址useSingleServer,集群地址useClusterServers */ config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456"); // 创建RedissonClient对象 return Redisson.create(config); } }
SETNX
implementierte verteilte Sperre weist die folgenden Probleme auf: 1 Es ist nicht wiedereintrittsfähig: Derselbe Thread kann nicht mehrmals dieselbe Sperre erwerben. 2. Kein Wiederholungsversuch. Der Erwerb der Sperre erfolgt nur einmal und gibt „False“ zurück. Es gibt keinen Wiederholungsmechanismus. 3. Timeout-FreigabeObwohl Timeout-Freigabe von Sperren Deadlocks vermeiden kann, führt dies bei längerer Geschäftsausführung auch zur Sperrenfreigabe, was ein Sicherheitsrisiko darstellt. 4. Master-Slave-Konsistenz Wenn Redis in einem Cluster bereitgestellt wird, kommt es zu einer Verzögerung bei der Master-Slave-Synchronisierung. Wenn der Host ausfällt, wird ein Slave als Host ausgewählt, der Slave verfügt jedoch nicht über eine Die Sperrkennung kann zu diesem Zeitpunkt von anderen Threads übernommen werden, was zu Sicherheitsproblemen führen kann. 4. Redisson implementiert verteilte SperrenRedisson ist ein Java-Speicherdatengitter, das auf der Redis-Implementierung basiert. Neben der Bereitstellung häufig verwendeter verteilter Java-Objekte werden auch viele verteilte Dienste bereitgestellt, einschließlich der Implementierung verschiedener verteilter Sperren. 1. pom🎜package com.guor; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @SpringBootTest class RedissonTest { @Resource private RedissonClient redissonClient; private RLock lock; @BeforeEach void setUp() { // 获取指定名称的锁 lock = redissonClient.getLock("nezha"); } @Test void test() throws InterruptedException { // 尝试获取锁 boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS); if (!isLock) { log.error("获取锁失败"); return; } try { log.info("哪吒最帅,哈哈哈"); } finally { // 释放锁 lock.unlock(); } } }
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 最大等待时间 long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { return true; } else { // 剩余等待时间 = 最大等待时间 - 获取锁失败消耗的时间 time -= System.currentTimeMillis() - current; if (time <= 0L) {// 获取锁失败 this.acquireFailed(waitTime, unit, threadId); return false; } else { // 再次尝试获取锁 current = System.currentTimeMillis(); // subscribe订阅其它释放锁的信号 RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); // 当Future在等待指定时间time内完成时,返回true if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { // 取消订阅 this.unsubscribe(subscribeFuture, threadId); } }); } this.acquireFailed(waitTime, unit, threadId); return false;// 获取锁失败 } else { try { // 剩余等待时间 = 剩余等待时间 - 获取锁失败消耗的时间 time -= System.currentTimeMillis() - current; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); boolean var20 = false; return var20; } else { boolean var16; do { long currentTime = System.currentTimeMillis(); // 重试获取锁 ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { var16 = true; return var16; } // 再次失败了,再看一下剩余时间 time -= System.currentTimeMillis() - currentTime; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } // 再重试获取锁 currentTime = System.currentTimeMillis(); if (ttl >= 0L && ttl < time) { // 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while(time > 0L); this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } } finally { this.unsubscribe(subscribeFuture, threadId); } } } } }
private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); // this.getEntryName():锁的名字,一个锁对应一个entry // putIfAbsent:如果不存在,将锁和entry放到map里 RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { // 同一个线程多次获取锁,相当于重入 oldEntry.addThreadId(threadId); } else { // 如果是第一次 entry.addThreadId(threadId); // 更新有效期 this.renewExpiration(); } }
private void renewExpiration() { // 从map中得到当前锁的entry RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { // 开启延时任务 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { // 取出线程id Long threadId = ent.getFirstThreadId(); if (threadId != null) { // 刷新有效期 RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { // 递归调用更新有效期,永不过期 RedissonLock.this.renewExpiration(); } } }); } } } }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);// 10S ee.setTimeout(task); } }
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 判断当前线程的锁是否是当前线程 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then // 更新有效期 redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId)); }
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 锁释放时间 this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, // 判断锁成功 "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); // 如果不存在,记录锁标识,次数+1 redis.call('pexpire', KEYS[1], ARGV[1]); // 设置锁有效期 return nil; // 相当于Java的null end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); // 如果存在,判断锁标识是否是自己的,次数+1 redis.call('pexpire', KEYS[1], ARGV[1]); // 设置锁有效期 return nil; end; // 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,KEYS[1]:锁的名称 return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId)); }
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise(); RFuture<Boolean> future = this.unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消更新任务 this.cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); } else if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId); result.tryFailure(cause); } else { result.trySuccess((Object)null); } }); return result; }
Das obige ist der detaillierte Inhalt vonWas ist die Methode zur Implementierung der verteilten Redis-Sperre?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!