분산 잠금에 대해 이야기하기 전에 분산 잠금
이 필요한 이유를 설명할 필요가 있습니다. 分布式锁
。
与分布式锁相对就的是单机锁,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来互斥以保证共享变量的正确性,其使用范围是在同一个进程中。如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?现在的业务应用通常是微服务架构,这也意味着一个应用会部署多个进程,多个进程如果需要修改MySQL中的同一行记录,为了避免操作乱序导致脏数据,此时就需要引入分布式锁了。
想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上申请加锁。这个外部系统必须具有互斥能力,也就是说,如果两个请求同时到达,系统只会成功地为一个进程加锁,而另一个进程会失败。这个外部系统可以是数据库,也可以是Redis或Zookeeper,但为了追求性能,我们通常会选择使用Redis或Zookeeper来做。
Redis可以作为一个共享存储系统,多个客户端可以共享访问,因此可以被用来保存分布式锁。而且 Redis 的读写性能高,可以应对高并发的锁操作场景。这篇文章的重点在于介绍如何使用Redis实现分布式锁,并探讨在实现过程中可能会遇到的问题。
作为分布式锁实现过程中的共享存储系统,Redis可以使用键值对来保存锁变量,在接收和处理不同客户端发送的加锁和释放锁的操作请求。那么,键值对的键和值具体是怎么定的呢?我们要赋予锁变量一个变量名,把这个变量名作为键值对的键,而锁变量的值,则是键值对的值,这样一来,Redis就能保存锁变量了,客户端也就可以通过Redis的命令操作来实现锁操作。
想要实现分布式锁,必须要求Redis有互斥的能力。可以使用SETNX命令,其含义是SET IF NOT EXIST,即如果key不存在,才会设置它的值,否则什么也不做。实现一种分布式锁的方法是,两个客户端进程互斥地执行该命令。
以下展示了Redis使用key/value对保存锁变量,以及两个客户端同时请求加锁的操作过程。
加锁操作完成后,加锁成功的客户端,就可以去操作共享资源,例如,修改MySQL的某一行数据。操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?直接使用DEL命令删除这个key即可。这个逻辑非常简单,整体的流程写成伪代码就是下面这样。
// 加锁 SETNX lock_key 1 // 业务逻辑 DO THINGS // 释放锁 DEL lock_key
但是,以上实现存在一个很大的问题,当客户端1拿到锁后,如果发生下面的场景,就会造成死锁。
程序处理业务逻辑异常,没及时释放锁进程挂了,没机会释放锁
以上情况会导致已经获得锁的客户端一直占用锁,其他客户端永远无法获取到锁。
为了解决以上死锁问题,最容易想到的方案是在申请锁时,在Redis中实现时,给锁设置一个过期时间,假设操作共享资源的时间不会超过10s,那么加锁时,给这个key设置10s过期即可。
但以上操作还是有问题,加锁、设置过期时间是2条命令,有可能只执行了第一条,第二条却执行失败
,例如:
1.SETNX执行成功,执行EXPIRE时由于网络问题,执行失败
2.SETNX执行成功,Redis异常宕机,EXPIRE没有机会执行
3.SETNX执行成功,客户端异常崩溃,EXPIRE没有机会执行
总之这两条命令如果不能保证是原子操作,就有潜在的风险导致过期时间设置失败,依旧有可能发生死锁问题
Think 분산 잠금을 구현하려면 외부 시스템을 사용해야 하며 모든 프로세스가 이 시스템으로 이동하여 잠금을 적용합니다. 이 외부 시스템은 상호 배타적이어야 합니다. 즉, 두 요청이 동시에 도착하면 시스템은 한 프로세스만 성공적으로 잠그고 다른 프로세스는 실패합니다. 이러한 외부 시스템은 데이터베이스, Redis 또는 Zookeeper가 될 수 있지만, 성능을 추구하기 위해 일반적으로 Redis 또는 Zookeeper를 사용합니다. Redis는 공유 스토리지 시스템으로 사용할 수 있으며, 여러 클라이언트가 액세스를 공유할 수 있으므로 분산 잠금을 저장하는 데 사용할 수 있습니다. 또한 Redis는 읽기 및 쓰기 성능이 뛰어나며 동시성이 높은 잠금 작업 시나리오를 처리할 수 있습니다. 이 기사의 초점은 Redis를 사용하여 분산 잠금을 구현하는 방법을 소개하고 구현 프로세스 중에 발생할 수 있는 문제를 논의하는 것입니다. 분산 잠금 구현 방법
분산 잠금 구현의 공유 스토리지 시스템인 Redis는 키-값 쌍을 사용하여 잠금 변수를 저장하고 다른 클라이언트에서 보낸 잠금 및 해제 작업을 수신하고 처리할 수 있습니다. 그렇다면 키-값 쌍의 키와 값은 어떻게 결정되나요? 잠금 변수에 변수 이름을 지정하고 이 변수 이름을 키-값 쌍의 키로 사용해야 하며, 잠금 변수의 값은 키-값 쌍의 값이 됩니다. 이런 식으로 Redis는 lock 변수 및 클라이언트는 Redis 명령 작업을 통해 잠금 작업을 구현할 수 있습니다.
🎜분산 잠금을 구현하려면 Redis에 상호 배제 기능이 있어야 합니다. SET IF NOT EXIST를 의미하는 SETNX 명령을 사용할 수 있습니다. 즉, 키가 없으면 해당 값이 설정되고 그렇지 않으면 아무 작업도 수행되지 않습니다. 분산 잠금은 두 클라이언트 프로세스가 상호 배타적인 명령을 실행하도록 하여 구현됩니다. 🎜🎜다음은 Redis가 키/값 쌍을 사용하여 잠금 변수를 저장하는 방법과 동시에 잠금을 요청하는 두 클라이언트의 작업 프로세스를 보여줍니다. 🎜🎜🎜🎜Add 잠금 작업이 완료된 후 성공적으로 잠긴 클라이언트는 공유 리소스를 작업할 수 있습니다. 예를 들어 MySQL의 특정 데이터 행을 수정할 수 있습니다. 작업이 완료된 후, 후발자에게 공유 리소스를 운영할 수 있는 기회를 제공하기 위해 잠금을 제때 해제해야 합니다. 잠금을 해제하는 방법은 무엇입니까? 이 키를 삭제하려면 DEL 명령을 사용하세요. 로직은 매우 간단합니다. 의사 코드로 작성된 전체 프로세스는 다음과 같습니다. 🎜//释放锁 比较unique_value是否相等,避免误释放 if redis.get("key") == unique_value then return redis.del("key")
잠금 및 만료 시간 설정은 두 가지 명령입니다. 첫 번째 명령만 실행되고 두 번째 명령은 실패할 수 있습니다
. 예: 🎜🎜 🎜1. SETNX가 성공적으로 실행되었으나 네트워크 문제로 인해 EXPIRE가 실패함客户端1操作共享资源耗时太久,超过了锁的过期时间,锁失效(锁被自动释放)
客户端2加锁成功,开始操作共享资源
客户端1操作共享资源完成,在finally块中手动释放锁,但此时它释放的是客户端2的锁。
这里存在两个严重的问题:
锁过期
释放了别人的锁
第1个问题是评估操作共享资源的时间不准确导致的,如果只是一味增大过期时间,只能缓解问题降低出现问题的概率,依旧无法彻底解决问题。原因在于客户端在拿到锁之后,在操作共享资源时,遇到的场景是很复杂的,既然是预估的时间,也只能是大致的计算,不可能覆盖所有导致耗时变长的场景
。
第二个问题在于解锁操作是不够严谨的,因为它是一种不加区分地释放锁的操作,没有对锁的所有权进行检查。如何解决呢?
解决办法是,客户端在加锁时,设置一个只有自己知道的唯一标识进去,例如可以是自己的线程ID
,如果是redis实现,就是SET key unique_value EX 10 NX。之后在释放锁时,要先判断这把锁是否归自己持有,只有是自己的才能释放它。
//释放锁 比较unique_value是否相等,避免误释放 if redis.get("key") == unique_value then return redis.del("key")
这里释放锁使用的是GET + DEL两条命令,这时又会遇到原子性
问题了。
客户端1执行GET,判断锁是自己的
客户端2执行了SET命令,强制获取到锁(虽然发生概念很低,但要严谨考虑锁的安全性)
客户端1执行DEL,却释放了客户端2的锁
由此可见,以上GET + DEL两个命令还是必须原子的执行才行。怎样原子执行两条命令呢?答案是Lua脚本,可以把以上逻辑写成Lua脚本,让Redis执行。因为Redis处理每个请求是单线程执行的,在执行一个Lua脚本时其它请求必须等待,直到这个Lua脚本处理完成
,这样一来GET+DEL之间就不会有其他命令执行了。
以下是使用Lua脚本(unlock.script)实现的释放锁操作的伪代码,其中,KEYS[1]表示lock_key,ARGV[1]是当前客户端的唯一标识,这两个值都是我们在执行 Lua脚本时作为参数传入的。
//Lua脚本语言,释放锁 比较unique_value是否相等,避免误释放 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
最后我们执行以下命令,即可
redis-cli --eval unlock.script lock_key , unique_value
这样一路优先下来,整个加锁、解锁流程就更严谨了,先小结一下,基于Redis实现的分布式锁,一个严谨的流程如下:
加锁时要设置过期时间SET lock_key unique_value EX expire_time NX
操作共享资源
释放锁:Lua脚本,先GET判断锁是否归属自己,再DEL释放锁
有了这个严谨的锁模型,我们还需要重新思考之前的那个问题,锁的过期时间不好评估怎么办。
前面提到过,过期时间如果评估得不好,这个锁就会有提前过期的风险,一种妥协的解决方案是,尽量冗余过期时间,降低锁提前过期的概率,但这个方案并不能完美解决问题。是否可以设置这样的方案,加锁时,先设置一个预估的过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行续期,重新设置过期时间
。
Redisson是一个已封装好这些工作的库,可以说是一种非常优秀的解决方案。Redisson是一个Java语言实现的Redis SDK客户端,在使用分布式锁时,它就采用了自动续期的方案来避免锁过期,这个守护线程我们一般叫它看门狗线程。这个SDK提供的API非常友好,它可以像操作本地锁一样操作分布式锁。客户端一旦加锁成功,就会启动一个watch dog看门狗线程,它是一个后台线程,会每隔一段时间(这段时间的长度与设置的锁的过期时间有关)检查一下,如果检查时客户端还持有锁key(也就是说还在操作共享资源),那么就会延长锁key的生存时间。
那如果客户端在加锁成功后就宕机了呢?宕机了那么看门狗任务就不存在了,也就无法为锁续期了,锁到期自动失效。
上面讨论的情况,都是锁在单个Redis 实例中可能产生的问题,并没有涉及到Redis的部署架构细节。
Redis发展到现在,几种常见的部署架构有:
단일 머신 모드,
마스터-슬레이브 모드,
클러스터 모드,
일반적으로 마스터-슬레이브 모드를 사용합니다. e 클러스터+ 센티넬 모드 배포에서 센티널의 역할은 Redis 노드의 실행 상태를 모니터링하는 것입니다. 일반적인 마스터-슬레이브 모드에서는 마스터가 충돌할 때 슬레이브를 마스터로 만들기 위해 수동으로 전환해야 합니다. 마스터-슬레이브 + 센트리 조합을 사용하면 마스터가 비정상적으로 충돌할 때 센티널이 자동 장애 조치를 구현할 수 있다는 것입니다. 슬레이브를 새로운 마스터로 승격시키고 가용성을 보장하기 위해 서비스를 계속 제공합니다. 그렇다면 마스터-슬레이브 전환이 발생해도 분산 잠금은 여전히 안전할까요?
Imagine 이러한 시나리오: 一般会采用主从集群+哨兵的模式部署,哨兵的作用就是监测redis节点的运行状态。普通的主从模式,当master崩溃时,需要手动切换让slave成为master,使用主从+哨兵结合的好处在于,当master异常宕机时,哨兵可以实现故障自动切换,把slave提升为新的master,继续提供服务,以此保证可用性
。那么当主从发生切换时,分布式锁依旧安全吗?
想像这样的场景:
客户端1在master上执行SET命令,加锁成功
此时,master异常宕机,SET命令还未同步到slave上(主从复制是异步的)
哨兵将slave提升为新的master,但这个锁在新的master上丢失了,导致客户端2来加锁成功了,两个客户端共同操作共享资源
可见,当引入Redis副本后,分布式锁还是可能受到影响。即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况。
集群模式+Redlock实现高可靠的分布式锁
为了避免Redis实例故障而导致的锁无法工作的问题,Redis的开发者 Antirez提出了分布式锁算法Redlock。Redlock算法的基本思路,是让客户端和多个独立的Redis实例依次请求加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁了,否则加锁失败
。这样一来,即使有单个Redis实例发生故障,因为锁变量在其它实例上也有保存,所以,客户端仍然可以正常地进行锁操作,锁变量并不会丢失。
来具体看下Redlock算法的执行步骤。Redlock算法的实现要求Redis采用集群部署模式,无哨兵节点,需要有N个独立的Redis实例(官方推荐至少5个实例)。接下来,我们可以分成3步来完成加锁操作。
第一步是,客户端获取当前时间。
第二步是,客户端按顺序依次向N个Redis实例执行加锁操作。
这里的加锁操作和在单实例上执行的加锁操作一样,使用SET命令,带上NX、EX/PX选项,以及带上客户端的唯一标识。当然,如果某个Redis实例发生故障了,为了保证在这种情况下,Redlock算法能够继续运行,我们需要给加锁操作设置一个超时时间。如果客户端在和一个Redis实例请求加锁时,一直到超时都没有成功,那么此时,客户端会和下一个Redis实例继续请求加锁。一般需要将加锁操作的超时时间设置为锁的有效时间的一小部分,通常约为几十毫秒。
第三步是,一旦客户端完成了和所有Redis实例的加锁操作,客户端就要计算整个加锁过程的总耗时。
客户端只有在满足两个条件时,才能认为是加锁成功,条件一是客户端从超过半数(大于等于 N/2+1)的Redis实例上成功获取到了锁;条件二是客户端获取锁的总耗时没有超过锁的有效时间。
为何只有在大多数实例加锁成功时才能算操作成功?事实上,多个Redis实例一起使用组成了一个分布式系统。在分布式系统中总会出现异常节点,所以在谈论分布式系统时,需要考虑异常节点达到多少个,也依旧不影响整个系统的正确运行。这是一个分布式系统的容错问题,这个问题的结论是:如果只存在故障节点,只要大多数节点正常,那么整个系统依旧可以提供正确服务。
在满足了这两个条件后,我们需要重新计算这把锁的有效时间,计算的结果是锁的最初有效时间减去客户端为获取锁的总耗时。如果锁的有效时间已经来不及完成共享数据的操作了,我们可以释放锁,以免出现还没完成共享资源操作,锁就过期了的情况
클라이언트는 두 가지 조건이 충족되는 경우에만 잠금이 성공한 것으로 간주할 수 있습니다. 첫 번째 조건은 클라이언트가 절반 이상(N/2+1 이상)에서 잠금을 성공적으로 획득했다는 것입니다. Redis 인스턴스 두 번째 조건은 클라이언트가 잠금을 획득하는 데 걸리는 총 시간이 잠금의 유효 시간을 초과하지 않음을 의미합니다.
🎜🎜대부분의 인스턴스가 성공적으로 잠긴 경우에만 작업이 성공한 것으로 간주될 수 있는 이유는 무엇입니까? 실제로 여러 Redis 인스턴스가 함께 사용되어 분산 시스템을 구성합니다. 분산 시스템에는 항상 비정상 노드가 있기 때문에 분산 시스템에 대해 이야기할 때 전체 시스템의 올바른 작동에 영향을 주지 않고 비정상 노드가 몇 개 있는지 고려해야 합니다. 이는 분산 시스템의 내결함성 문제입니다. 이 문제의 결론은 다음과 같습니다. 결함이 있는 노드만 있으면 대부분의 노드가 정상인 한 전체 시스템은 여전히 올바른 서비스를 제공할 수 있습니다. 🎜🎜이 두 가지 조건을 충족한 후 잠금 유효 시간을 다시 계산해야 합니다. 계산 결과는 잠금의 초기 유효 시간에서 클라이언트가 잠금을 획득하는 데 소요한 총 시간을 뺀 값입니다. 잠금의 유효 시간이 너무 늦어서 공유 데이터 작업을 완료할 수 없는 경우 공유 리소스 작업이 완료되기 전에 잠금이 만료되는 것을 방지하기 위해 잠금을 해제할 수 있습니다
. 🎜当然,如果客户端在和所有实例执行完加锁操作后,没能同时满足这两个条件,那么,客户端就要向所有Redis节点发起释放锁的操作
。为什么释放锁,要操作所有的节点呢,不能只操作那些加锁成功的节点吗?因为在某一个Redis节点加锁时,可能因为网络原因导致加锁失败,例如一个客户端在一个Redis实例上加锁成功,但在读取响应结果时由于网络问题导致读取失败,那这把锁其实已经在Redis上加锁成功了。所以释放锁时,不管之前有没有加锁成功,需要释放所有节点上的锁以保证清理节点上的残留的锁
。
在Redlock算法中,释放锁的操作和在单实例上释放锁的操作一样,只要执行释放锁的 Lua脚本就可以了。如果N个Redis实例中超过一半的实例正常工作,就能确保分布式锁正常运作。为了提高分布式锁的可靠性,您可以在实际业务应用中使用Redlock算法。
<!-- springboot整合redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
package com.example.redisdemo.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @description: Redis配置类 * @author Keson * @date 21:20 2022/11/14 * @Param * @return * @version 1.0 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // 设置序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer);// key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化 redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
package com.example.redisdemo.service; import com.example.redisdemo.entity.CustomerBalance; import java.util.concurrent.Callable; /** * @author Keson * @version 1.0 * @description: TODO * @date 2022/11/14 15:12 */ public interface RedisService { <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception; }
package com.example.redisdemo.service.impl; import com.example.redisdemo.entity.CustomerBalance; import com.example.redisdemo.service.RedisService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.types.Expiration; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * @author Keson * @version 1.0 * @description: TODO Redis实现分布式锁 * @date 2022/11/14 15:13 */ @Service @Slf4j public class RedisServiceImpl implements RedisService { //设置默认过期时间 private final static int DEFAULT_LOCK_EXPIRY_TIME = 20; //自定义lock key前缀 private final static String LOCK_PREFIX = "LOCK:CUSTOMER_BALANCE"; @Autowired private RedisTemplate redisTemplate; @Override public <T> T callWithLock(CustomerBalance customerBalance, Callable<T> callable) throws Exception{ //自定义lock key String lockKey = getLockKey(customerBalance.getCustomerNumber(), customerBalance.getSubAccountNumber(), customerBalance.getCurrencyCode()); //将UUID当做value,确保唯一性 String lockReference = UUID.randomUUID().toString(); try { if (!lock(lockKey, lockReference, DEFAULT_LOCK_EXPIRY_TIME, TimeUnit.SECONDS)) { throw new Exception("lock加锁失败"); } return callable.call(); } finally { unlock(lockKey, lockReference); } } //定义lock key String getLockKey(String customerNumber, String subAccountNumber, String currencyCode) { return String.format("%s:%s:%s:%s", LOCK_PREFIX, customerNumber, subAccountNumber, currencyCode); } //redis加锁 private boolean lock(String key, String value, long timeout, TimeUnit timeUnit) { Boolean locked; try { //SET_IF_ABSENT --> NX: Only set the key if it does not already exist. //SET_IF_PRESENT --> XX: Only set the key if it already exist. locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.set(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), Expiration.from(timeout, timeUnit), RedisStringCommands.SetOption.SET_IF_ABSENT)); } catch (Exception e) { log.error("Lock failed for redis key: {}, value: {}", key, value); locked = false; } return locked != null && locked; } //redis解锁 private boolean unlock(String key, String value) { try { //使用lua脚本保证删除的原子性,确保解锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] " + "then return redis.call('del', KEYS[1]) " + "else return 0 end"; Boolean unlockState = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.eval(script.getBytes(), ReturnType.BOOLEAN, 1, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8))); return unlockState == null || !unlockState; } catch (Exception e) { log.error("unLock failed for redis key: {}, value: {}", key, value); return false; } } }
@Override public int updateById(CustomerBalance customerBalance) throws Exception { return redisService.callWithLock(customerBalance, ()-> customerBalanceMapper.updateById(customerBalance)); }
위 내용은 Redis를 사용하여 SpringBoot에서 분산 잠금을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!