Bevor wir über verteilte Sperren sprechen, ist es notwendig Zuerst Erklären Sie, warum verteilte Sperre
erforderlich ist. 分布式锁
。
与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量的正确性,其使用范围是在同一个进程中。如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?现在的业务应用通常是微服务架构,这也意味着一个应用会部署多个进程,多个进程如果需要修改MySQL中的同一行记录,为了避免操作乱序导致脏数据,此时就需要引入分布式锁了。
想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上申请加锁。这个外部系统必须具有互斥能力,也就是说,如果两个请求同时到达,系统只会成功地为一个进程加锁,而另一个进程会失败。这个外部系统可以是数据库,也可以是Redis或Zookeeper,但为了追求性能,我们通常会选择使用Redis或Zookeeper来做。
Redis可以作为一个共享存储系统,多个客户端可以共享访问,因此可以被用来保存分布式锁。而且 Redis 的读写性能高,可以应对高并发的锁操作场景。这篇文章的重点在于介绍如何使用Redis实现分布式锁,并探讨在实现过程中可能会遇到的问题。
作为分布式锁实现过程中的共享存储系统,Redis可以使用键值对来保存锁变量,在接收和处理不同客户端发送的加锁和释放锁的操作请求。那么,键值对的键和值具体是怎么定的呢?我们要赋予锁变量一个变量名,把这个变量名作为键值对的键,而锁变量的值,则是键值对的值,这样一来,Redis就能保存锁变量了,客户端也就可以通过Redis的命令操作来实现锁操作。
想要实现分布式锁,必须要求Redis有互斥的能力。可以使用SETNX命令,其含义是SET IF NOT EXIST,即如果key不存在,才会设置它的值,否则什么也不做。实现一种分布式锁的方法是,两个客户端进程互斥地执行该命令。
以下展示了Redis使用key/value对保存锁变量,以及两个客户端同时请求加锁的操作过程。
加锁操作完成后,加锁成功的客户端,就可以去操作共享资源,例如,修改MySQL的某一行数据。操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?直接使用DEL命令删除这个key即可。这个逻辑非常简单,整体的流程写成伪代码就是下面这样。
// 加锁 SETNX lock_key 1 // 业务逻辑 DO THINGS // 释放锁 DEL lock_key
但是,以上实现存在一个很大的问题,当客户端1拿到锁后,如果发生下面的场景,就会造成死锁。
程序处理业务逻辑异常,没及时释放锁进程挂了,没机会释放锁
以上情况会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。
为了解决以上死锁问题,最容易想到的方案是在申请锁时,在Redis中实现时,给锁设置一个过期时间,假设操作共享资源的时间不会超过10s,那么加锁时,给这个key设置10s过期即可。
但以上操作还是有问题,加锁、设置过期时间是2条命令,有可能只执行了第一条,第二条却执行失败
,例如:
1.SETNX执行成功,执行EXPIRE时由于网络问题,执行失败
2.SETNX执行成功,Redis异常宕机,EXPIRE没有机会执行
3.SETNX执行成功,客户端异常崩溃,EXPIRE没有机会执行
总之这两条命令如果不能保证是原子操作,就有潜在的风险导致过期时间设置失败,依旧有可能发生死锁问题
Wenn Sie verteilte Sperren implementieren möchten, müssen Sie ein externes System verwenden. Alle Prozesse gehen zu diesem System, um Sperren zu beantragen. Dieses externe System muss sich gegenseitig ausschließen. Das heißt, wenn zwei Anforderungen gleichzeitig eingehen, sperrt das System nur einen Prozess erfolgreich und der andere Prozess schlägt fehl. Dieses externe System kann eine Datenbank, Redis oder Zookeeper sein, aber um die Leistung zu steigern, entscheiden wir uns normalerweise für die Verwendung von Redis oder Zookeeper. Redis kann als gemeinsames Speichersystem verwendet werden, und mehrere Clients können den Zugriff teilen, sodass es zum Speichern verteilter Sperren verwendet werden kann. Darüber hinaus verfügt Redis über eine hohe Lese- und Schreibleistung und kann Sperrbetriebsszenarien mit hoher Parallelität verarbeiten. Der Schwerpunkt dieses Artikels liegt auf der Einführung in die Verwendung von Redis zur Implementierung verteilter Sperren und der Erörterung der Probleme, die während des Implementierungsprozesses auftreten können. So implementieren Sie verteilte Sperren
Als gemeinsam genutztes Speichersystem bei der Implementierung verteilter Sperren kann Redis Schlüssel-Wert-Paare zum Speichern verwenden Locks-Variablen werden zum Empfangen und Verarbeiten von Sperr- und Sperrfreigabeoperationsanforderungen verwendet, die von verschiedenen Clients gesendet werden. Wie werden also Schlüssel und Wert des Schlüssel-Wert-Paares bestimmt? Wir müssen der Sperrvariablen einen Variablennamen geben und diesen Variablennamen als Schlüssel des Schlüssel-Wert-Paares verwenden. Der Wert der Sperrvariablen ist der Wert des Schlüssel-Wert-Paares. Auf diese Weise kann Redis das speichern Lock-Variable, und der Client kann Lock-Operationen über Redis-Befehlsoperationen implementieren.
#🎜🎜#Um verteilte Sperren zu implementieren, muss Redis über gegenseitige Ausschlussfunktionen verfügen. Sie können den Befehl SETNX verwenden, was SET IF NOT EXIST bedeutet, d. h. wenn der Schlüssel nicht existiert, wird sein Wert gesetzt, andernfalls wird nichts unternommen. Eine verteilte Sperre wird implementiert, indem zwei Clientprozesse den Befehl gegenseitig ausschließend ausführen. #🎜🎜##🎜🎜#Das Folgende zeigt den Betriebsprozess von Redis unter Verwendung von Schlüssel/Wert-Paaren zum Speichern von Sperrvariablen und zwei Clients, die gleichzeitig Sperren anfordern. #🎜🎜##🎜🎜##🎜🎜##🎜🎜#Nachdem der Sperrvorgang abgeschlossen ist, kann der Client mit erfolgreicher Sperrung die gemeinsam genutzten Ressourcen bedienen, beispielsweise eine bestimmte Datenzeile in MySQL ändern. Nach Abschluss des Vorgangs muss die Sperre rechtzeitig aufgehoben werden, um Nachzüglern die Möglichkeit zu geben, die gemeinsam genutzten Ressourcen zu bedienen. Wie löse ich die Sperre? Verwenden Sie einfach den Befehl DEL, um diesen Schlüssel zu löschen. Die Logik ist sehr einfach. Der in Pseudocode geschriebene Gesamtprozess ist wie folgt. #🎜🎜#//释放锁 比较unique_value是否相等,避免误释放 if redis.get("key") == unique_value then return redis.del("key")
Sperren und Ablaufzeit festlegen sind zwei Befehle. Es ist möglich, dass nur der erste ausgeführt wird, der zweite jedoch fehlschlägt , zum Beispiel:#🎜🎜##🎜🎜##🎜🎜#1. SETNX wurde erfolgreich ausgeführt, aber EXPIRE schlug aufgrund von Netzwerkproblemen fehl<br/> 2. SETNX wurde erfolgreich ausgeführt, aber Redis stürzte abnormal ab und EXPIRE hatte keine Chance zur Ausführung<br/ >3.SETNX wird erfolgreich ausgeführt, aber der Client stürzt ungewöhnlich ab und EXPIRE hat keine Chance zur Ausführung #🎜🎜##🎜🎜##🎜🎜# Kurz gesagt, wenn diese beiden Befehle nicht garantiert werden können Bei atomaren Vorgängen besteht ein potenzielles Ablaufrisiko. Wenn die Zeiteinstellung fehlschlägt, können weiterhin Deadlock-Probleme auftreten
. Glücklicherweise hat Redis nach Redis 2.6.12 die Parameter des SET-Befehls erweitert. Sie können die EXPIRE-Zeit gleichzeitig mit SET angeben. Der folgende Befehl setzt beispielsweise die Ablaufzeit der Sperre auf 10 Sekunden . #🎜🎜##🎜🎜##🎜🎜#SET lock_key 1 EX 10 NX#🎜🎜##🎜🎜##🎜🎜#Bisher wurde das Deadlock-Problem gelöst, aber es gibt noch andere Probleme. Stellen Sie sich das folgende Szenario vor: #🎜🎜##🎜🎜##🎜🎜##🎜🎜##🎜🎜##🎜🎜##🎜🎜#Client 1 wurde erfolgreich gesperrt und startete den Betrieb gemeinsam genutzter Ressourcen#🎜🎜 #客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)
客户端2加锁成功,开始操作共享资源
客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。
这里存在两个严重的问题:
锁过期
释放了别人的锁
第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景
。
第二个问题在于解锁操作是不够严谨的,因为它是一种不加区分地释放锁的操作,没有对锁的所有权进行检查。如何解决呢?
解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID
,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。
//释放锁 比较unique_value是否相等,避免误释放 if redis.get("key") == unique_value then return redis.del("key")
这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性
问题了。
客户端1执行GET,判断锁是自己的
客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)
客户端1执行DEL,却释放了客户端2的锁
由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成
,这样一来GET+DEL之间就不会有其他命令执行了。
以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。
//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
最后我们执行以下命令,即可
redis-cli --eval unlock.script lock_key , unique_value
这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:
加锁时要设置过期时间SET lock_key unique_value EX expire_time NX
操作共享资源
释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁
有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。
前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间
。
Redisson是一个已封装好这些工作的库,可以说是一种非常优秀的解决方案。Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。
那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。
上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。
Redis发展到现在,几种常见的部署架构有:
Einzelmaschinenmodus;
Sentinel-Modus;
Wenn wir Redis verwenden, verwendet im Allgemeinen Master-s Lavacluster+ Sentinel Bei der Bereitstellung im Modus besteht die Rolle von Sentinel darin, den Betriebsstatus von Redis-Knoten zu überwachen. Im normalen Master-Slave-Modus müssen Sie bei einem Master-Absturz manuell umschalten, um den Slave zum Master zu machen. Der Vorteil der Verwendung der Master-Slave-+Sentry-Kombination besteht darin, dass der Sentinel bei einem abnormalen Master-Absturz ein automatisches Failover implementieren kann und befördern Sie den Slave zum neuen Master, stellen Sie weiterhin Dienste bereit, um die Verfügbarkeit sicherzustellen
. Ist die verteilte Sperre also noch sicher, wenn der Master-Slave-Wechsel erfolgt?
Stellen Sie sich vor Ein solches Szenario:
一般会采用主从集群+哨兵的模式部署,哨兵的作用就是监测redis节点的运行状态。普通的主从模式,当master崩溃时,需要手动切换让slave成为master,使用主从+哨兵结合的好处在于,当master异常宕机时,哨兵可以实现故障自动切换,把slave提升为新的master,继续提供服务,以此保证可用性
。那么当主从发生切换时,分布式锁依旧安全吗?
想像这样的场景:
客户端1在master上执行SET命令,加锁成功
此时,master异常宕机,SET命令还未同步到slave上(主从复制是异步的)
哨兵将slave提升为新的master,但这个锁在新的master上丢失了,导致客户端2来加锁成功了,两个客户端共同操作共享资源
可见,当引入Redis副本后,分布式锁还是可能受到影响。即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况。
集群模式+Redlock实现高可靠的分布式锁
为了避免Redis实例故障而导致的锁无法工作的问题,Redis的开发者 Antirez提出了分布式锁算法Redlock。Redlock算法的基本思路,是让客户端和多个独立的Redis实例依次请求加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁了,否则加锁失败
。这样一来,即使有单个Redis实例发生故障,因为锁变量在其它实例上也有保存,所以,客户端仍然可以正常地进行锁操作,锁变量并不会丢失。
来具体看下Redlock算法的执行步骤。Redlock算法的实现要求Redis采用集群部署模式,无哨兵节点,需要有N个独立的Redis实例(官方推荐至少5个实例)。接下来,我们可以分成3步来完成加锁操作。
第一步是,客户端获取当前时间。
第二步是,客户端按顺序依次向N个Redis实例执行加锁操作。
这里的加锁操作和在单实例上执行的加锁操作一样,使用SET命令,带上NX、EX/PX选项,以及带上客户端的唯一标识。当然,如果某个Redis实例发生故障了,为了保证在这种情况下,Redlock算法能够继续运行,我们需要给加锁操作设置一个超时时间。如果客户端在和一个Redis实例请求加锁时,一直到超时都没有成功,那么此时,客户端会和下一个Redis实例继续请求加锁。一般需要将加锁操作的超时时间设置为锁的有效时间的一小部分,通常约为几十毫秒。
第三步是,一旦客户端完成了和所有Redis实例的加锁操作,客户端就要计算整个加锁过程的总耗时。
客户端只有在满足两个条件时,才能认为是加锁成功,条件一是客户端从超过半数(大于等于 N/2+1)的Redis实例上成功获取到了锁;条件二是客户端获取锁的总耗时没有超过锁的有效时间。
为何只有在大多数实例加锁成功时才能算操作成功?事实上,多个Redis实例一起使用组成了一个分布式系统。在分布式系统中总会出现异常节点,所以在谈论分布式系统时,需要考虑异常节点达到多少个,也依旧不影响整个系统的正确运行。这是一个分布式系统的容错问题,这个问题的结论是:如果只存在故障节点,只要大多数节点正常,那么整个系统依旧可以提供正确服务。
在满足了这两个条件后,我们需要重新计算这把锁的有效时间,计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成共享资源操作,锁就过期了的情况
Der Client kann die Sperre nur dann als erfolgreich betrachten, wenn zwei Bedingungen erfüllt sind. Die erste Bedingung ist, dass der Client die Sperre von mehr als der Hälfte (größer oder gleich N/2+1) erfolgreich erhalten hat Redis-Instanzen; die zweite Bedingung: Die Gesamtzeit, die der Client benötigt, um die Sperre zu erhalten, überschreitet nicht die effektive Zeit der Sperre.
🎜🎜Warum kann der Vorgang nur dann als erfolgreich angesehen werden, wenn die meisten Instanzen erfolgreich gesperrt wurden? Tatsächlich werden mehrere Redis-Instanzen zusammen verwendet, um ein verteiltes System zu bilden. In einem verteilten System wird es immer abnormale Knoten geben. Wenn Sie also über ein verteiltes System sprechen, müssen Sie berücksichtigen, wie viele abnormale Knoten es gibt, ohne den ordnungsgemäßen Betrieb des gesamten Systems zu beeinträchtigen. Dies ist ein Fehlertoleranzproblem in einem verteilten System. Die Schlussfolgerung dieses Problems lautet: Wenn nur fehlerhafte Knoten vorhanden sind, kann das gesamte System immer noch korrekte Dienste bereitstellen, solange die meisten Knoten normal sind. 🎜🎜Nachdem wir diese beiden Bedingungen erfüllt haben, müssen wir die effektive Zeit der Sperre neu berechnen. Das Ergebnis der Berechnung ist die anfängliche effektive Zeit der Sperre abzüglich der Gesamtzeit, die der Client für den Erhalt der Sperre aufgewendet hat. Wenn die Gültigkeitsdauer der Sperre zu spät ist, um den Vorgang mit gemeinsam genutzten Daten abzuschließen, können wir die Sperre aufheben, um zu verhindern, dass die Sperre abläuft, bevor der Vorgang mit gemeinsam genutzten Ressourcen abgeschlossen ist. 🎜当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作
。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁
。
在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。如果N个Redis实例中超过一半的实例正常工作,就能确保分布式锁正常运作。为了提高分布式锁的可靠性,您可以在实际业务应用中使用Redlock算法。
<!-- springboot整合redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
package com.example.redisdemo.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @description: Redis配置类 * @author Keson * @date 21:20 2022/11/14 * @Param * @return * @version 1.0 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // 设置序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer);// key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化 redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
package com.example.redisdemo.service; import com.example.redisdemo.entity.CustomerBalance; import java.util.concurrent.Callable; /** * @author Keson * @version 1.0 * @description: TODO * @date 2022/11/14 15:12 */ public interface RedisService { <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception; }
package com.example.redisdemo.service.impl; import com.example.redisdemo.entity.CustomerBalance; import com.example.redisdemo.service.RedisService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.types.Expiration; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * @author Keson * @version 1.0 * @description: TODO Redis实现分布式锁 * @date 2022/11/14 15:13 */ @Service @Slf4j public class RedisServiceImpl implements RedisService { //设置默认过期时间 private final static int DEFAULT_LOCK_EXPIRY_TIME = 20; //自定义lock key前缀 private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE"; @Autowired private RedisTemplate redisTemplate; @Override public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{ //自定义lock key String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode()); //将UUID当做value,确保唯一性 String lockReference = UUID.randomUUID().toString(); try { if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) { throw new Exception("lock加锁失败"); } return callable.call(); } finally { unlock(lockKey, lockReference); } } //定义lock key String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) { return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode); } //redis加锁 private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) { Boolean locked; try { //SET_IF_ABSENT --> NX: Only set the key if it does not already exist. //SET_IF_PRESENT --> XX: Only set the key if it already exist. locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT)); } catch (Exception e) { log.error("Lock failed for redis key: {}, value: {}", key, value); locked = false; } return locked != null && locked; } //redis解锁 private boolean unlock(String key, String value) { try { //使用lua脚本保证删除的原子性,确保解锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] " + "then return redis.call('del', KEYS[1]) " + "else return 0 end"; Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8))); return unlockState == null || !unlockState; } catch (Exception e) { log.error("unLock failed for redis key: {}, value: {}", key, value); return false; } } }
@Override public int updateById(CustomerBalance customerBalance) throws Exception { return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance)); }
Das obige ist der detaillierte Inhalt vonSo implementieren Sie verteilte Sperren mit Redis in SpringBoot. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!