聊一聊分散式系統下基於Redis的分散式鎖
加鎖了,還有並發問題? Redis分散式鎖定你真的了解?以下這篇文章就來跟大家聊聊分散式系統下基於Redis的分散式鎖,希望對大家有幫助!
新接手的項目,偶爾會出現帳不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之後太忙就沒再解決,可能是框架的原因……
#既然項目交付到手中,這樣的問題是必須要解決的。整理了所有帳務處理邏輯,最終找到了原因:資料庫並發作業熱點帳戶導致。就這這個問題,來聊聊分散式系統下基於Redis的分散式鎖定。順便也分解一下問題形成原因及解決方案。 【相關推薦:Redis影片教學】
原因分析
系統並發量不高,有熱點帳戶,但也不至於那麼嚴重。問題的根源在於系統架構設計,人為的製造了並發。場景是這樣的:商家批量匯入一批數據,系統會進行前置處理,並對帳戶餘額進行增減。
此時,另外一個定時任務,也會對帳戶進行掃描更新。而且同一帳戶的操作分散到各個系統當中,熱點帳戶也就出現了。
針對此問題的解決方案,從架構層面可以考慮將帳務系統進行抽離,集中在一個系統中進行處理,所有的資料庫事務及執行順序由帳務系統來統籌處理。從技術方面來講,則可以透過鎖定機制來對熱點帳戶進行加鎖。
本篇文章就針對熱點帳戶基於分散式鎖定的實作方式進行詳細的講解。
鎖定的分析
在Java的多執行緒環境下,通常有幾類鎖定可以使用:
- JVM記憶體模型級別的鎖,常用的有:synchronized、Lock等;
- 資料庫鎖,例如樂觀鎖,悲觀鎖等;
- 分散式鎖定;
悲觀鎖定與樂觀鎖定
在像上述案例中,熱點帳戶就屬於分散式系統中的共享資源,我們通常會採用資料庫鎖定或分散式鎖定來進行解決。
資料庫鎖定,又分為樂觀鎖定和悲觀鎖定。
悲觀鎖定是基於資料庫(Mysql的InnoDB)提供的排他鎖定來實現的。在進行事務操作時,透過select ... for update語句,MySQL會對查詢結果集中每行資料都新增排他鎖,其他執行緒對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執行(修改);
樂觀鎖是相對悲觀鎖而言的,樂觀鎖定假設資料一般情況不會造成衝突,所以在資料提交更新的時候,才會正式對資料的衝突與否進行檢測。如果衝突則傳回給使用者異常訊息,讓使用者決定如何做。樂觀鎖適用於讀取多寫少的場景,這樣可以提高程式的吞吐量。在樂觀鎖實作時通常會基於記錄狀態或新增version版本來實作。
悲觀鎖定失效場景
專案中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區,以下來分析一下。 正常使用悲觀鎖的流程:- 透過select ... for update鎖定記錄;
- 計算新餘額,修改金額並儲存;
- 執行完成釋放鎖定;
- 查詢帳戶餘額,計算新餘額;
- 透過select .. . for update鎖定記錄;
- 修改金額並儲存;
- 執行完成釋放鎖定;
Redis分散式鎖定戰演習
這裡以Spring Boot、Redis、Lua腳本為例來示範分散式鎖定的實作。為了簡化處理,範例中Redis既承擔了分散式鎖的功能,也承擔了資料庫的功能。場景建構
叢集環境下,對同一個帳戶的金額進行操作,基本步驟:
- 從資料庫讀取使用者金額;
- 程式修改金額;
- 再將最新金額儲存到資料庫;
#下面從最初不加鎖,不同步處理,逐步推演出最終的分散式鎖。
基礎整合及類別建置
準備一個不加鎖處理的基礎業務環境。
首先在Spring Boot專案中引入相關依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
帳戶對應實體類別UserAccount:
public class UserAccount { //用户ID private String userId; //账户内金额 private int amount; //添加账户金额 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略构造方法和getter/setter }
建立一個執行緒實作類別AccountOperationThread:
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate<Object, Object> redisTemplate; public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加锁 */ private void noLock() { try { Random random = new Random(); // 模拟线程进行业务处理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金额+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } }
其中RedisTemplate的實例化交給了Spring Boot:
@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
最後,再準備一個TestController來進行觸發多線程的運行:
@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate<Object, Object> redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用户user_001到Redis,账户金额为0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 开启10个线程进行同步测试,每个线程为账户增加1元 for (int i = 0; i < 10; i++) { logger.info("创建线程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主线程休眠1秒等待线程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查询Redis中的user_001账户 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
執行上述程序,正常來說10個線程,每個線程加1,結果應該是10。但多執行幾次,會發現,結果變化很大,基本上都要比10小。
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
以上述日誌為例,前四個執行緒都將值改為1,也就是後面三個執行緒都將前面的修改進行了覆蓋,導致最終結果不是10,只有5。這顯然是有問題的。
Redis同步鎖定實作
針對上面的情況,在同一個JVM當中,我們可以透過執行緒加鎖來完成。但在分散式環境下,JVM層級的鎖是沒辦法實現的,這裡可以採用Redis同步鎖實作。
基本想法:第一個執行緒進入時,在Redis中進記錄,當後續執行緒過來請求時,判斷Redis是否存在該記錄,如果存在則說明處於鎖定狀態,進行等待或傳回。如果不存在,則進行後續業務處理。
/** * 1.抢占资源时判断是否被锁。 * 2.如未锁则抢占成功且加锁,否则等待锁释放。 * 3.业务完成后释放锁,让给其它线程。 * <p> * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 获得锁 -> 加锁 -> 跳出循环 logger.info(Thread.currentThread().getName() + ":获得锁"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
在while程式碼區塊中,先判斷對應使用者ID是否在Redis中存在,如果不存在,則進行set加鎖,如果存在,則跳出迴圈繼續等待。
上述程式碼,看起來實現了加鎖的功能,但當執行程式時,會發現與未加鎖一樣,依舊存在並發問題。原因是:取得鎖和加鎖的操作並不是原子的。例如兩個執行緒發現lock都是null,都進行了加鎖,此時並發問題依舊存在。
Redis原子性同步鎖定
針對上述問題,可將取得鎖定和加鎖的過程原子化處理。基於spring-boot-data-redis提供的原子化API可以實現:
// 该方法使用了redis的指令:SETNX key value // 1.key不存在,设置成功返回value,setIfAbsent返回true; // 2.key存在,则设置失败返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
上述方法的原子化操作是對Redis的setnx指令的封裝,在Redis中setnx的使用如下實例:
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
第一次,設定mykey時,不存在,則回傳1,表示設定成功;第二次設定mykey時,已經存在,則回傳0,表示設定失敗。再次查詢mykey對應的值,會發現依舊是第一次設定的值。也就是說redis的setnx保證了唯一的key只能被一個服務設定成功。
了解上述API及底層原理,來看看執行緒中的實作方法程式碼如下:
/** * 1.原子操作加锁 * 2.竞争线程循环重试获得锁 * 3.业务完成释放锁 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":获得锁"); try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
再次執行程式碼,會發現結果正確了,也就是說可以成功的對分佈式線程進行了加鎖。
Redis分散式鎖定的死鎖
雖然上述程式碼執行結果沒問題,但如果應用異常宕機,沒來得及執行finally中釋放鎖的方法,那麼其他線程就永遠無法獲得這個鎖。
此時可採用setIfAbsent的重載方法:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
基於此方法,可設定鎖定的過期時間。這樣即便取得鎖的執行緒宕機,在Redis中資料過期之後,其他執行緒可正常取得該鎖。
範例程式碼如下:
private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并设置5秒过期时间 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁--------"); // 应用在这里宕机,进程退出,无法执行 finally; Thread.currentThread().interrupt(); // 业务逻辑... } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } } }
業務超時及守護執行緒
上面新增了Redis所的超時時間,看似解決了問題,但又引入了新的問題。
例如,正常情況下執行緒A在5秒內可正常處理完業務,但偶發會出現超過5秒的狀況。如果將超時時間設定為5秒,則線程A獲得了鎖,但業務邏輯處理需要6秒。此時,線程A還在正常業務邏輯,線程B已經獲得了鎖。當線程A處理完時,有可能將線程B的鎖給釋放掉。
在上述場景中有兩個問題點:
- 第一,執行緒A和執行緒B可能會同時在執行,存在並發問題。
- 第二,線程A可能會把線程B的鎖釋放掉,導致一系列的惡性循環。
#當然,可以透過在Redis中設定value值來判斷鎖定是屬於執行緒A還是執行緒B。但仔細分析會發現,這個問題的本質是因為執行緒A執行業務邏輯耗時超出了鎖定逾時的時間。
那麼就有兩個解決方案了:
- 第一,將超時時間設定的足夠長,確保業務程式碼能夠在鎖定釋放之前執行完成;
- 第二,為鎖添加守護線程,為將要過期釋放但未釋放的鎖增加時間;
第一種方式需要全行大多數情況下業務邏輯的耗時,進行超時時間的設定。
第二種方式,可透過以下守護執行緒的方式來動態增加鎖定逾時時間。
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守护 主线程关闭则结束守护线程 private volatile boolean daemon = true; // 守护锁 private String lockKey; private RedisTemplate<Object, Object> redisTemplate; public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒则续命 if (time < 1000) { logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主线程主动调用结束 public void stop() { daemon = false; } }
上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。
主线程中相关代码实现:
private void deamonRedisLock() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁-----"); } //关闭守护线程 if (daemonThread != null) { daemonThread.stop(); } } }
其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。
基于Lua脚本的实现
在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。
首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript
对象,在RedisConfig
配置类中添加如下实例化代码:
@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代码 @Bean public DefaultRedisScript<Boolean> lockRedisScript() { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript<Long> unlockRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
再通过在AccountOperationThread
类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate
来调用了,改造之后的代码实现如下:
private void deamonRedisLockWithLua() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("异常", e); } finally { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //关闭守护线程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":释放锁---"); } } } }
其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。
Redis锁的其他因素
除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。
Redis锁的不可重入
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。
可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。
等待锁释放
有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:
- 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
- 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。
集群中的主备切换和脑裂
在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。
当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。
小结
通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。
同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。
Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。
源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
更多编程相关知识,请访问:编程入门!!
以上是聊一聊分散式系統下基於Redis的分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

熱門話題

Redis集群模式通過分片將Redis實例部署到多個服務器,提高可擴展性和可用性。搭建步驟如下:創建奇數個Redis實例,端口不同;創建3個sentinel實例,監控Redis實例並進行故障轉移;配置sentinel配置文件,添加監控Redis實例信息和故障轉移設置;配置Redis實例配置文件,啟用集群模式並指定集群信息文件路徑;創建nodes.conf文件,包含各Redis實例的信息;啟動集群,執行create命令創建集群並指定副本數量;登錄集群執行CLUSTER INFO命令驗證集群狀態;使

Redis 使用哈希表存儲數據,支持字符串、列表、哈希表、集合和有序集合等數據結構。 Redis 通過快照 (RDB) 和追加只寫 (AOF) 機制持久化數據。 Redis 使用主從復制來提高數據可用性。 Redis 使用單線程事件循環處理連接和命令,保證數據原子性和一致性。 Redis 為鍵設置過期時間,並使用 lazy 刪除機制刪除過期鍵。

解決redis-server找不到問題的步驟:檢查安裝,確保已正確安裝Redis;設置環境變量REDIS_HOST和REDIS_PORT;啟動Redis服務器redis-server;檢查服務器是否運行redis-cli ping。

要查看 Redis 中的所有鍵,共有三種方法:使用 KEYS 命令返回所有匹配指定模式的鍵;使用 SCAN 命令迭代鍵並返回一組鍵;使用 INFO 命令獲取鍵的總數。

理解 Redis 源碼的最佳方法是逐步進行:熟悉 Redis 基礎知識。選擇一個特定的模塊或功能作為起點。從模塊或功能的入口點開始,逐行查看代碼。通過函數調用鏈查看代碼。熟悉 Redis 使用的底層數據結構。識別 Redis 使用的算法。

要從 Redis 讀取隊列,需要獲取隊列名稱、使用 LPOP 命令讀取元素,並處理空隊列。具體步驟如下:獲取隊列名稱:以 "queue:" 前綴命名,如 "queue:my-queue"。使用 LPOP 命令:從隊列頭部彈出元素並返回其值,如 LPOP queue:my-queue。處理空隊列:如果隊列為空,LPOP 返回 nil,可先檢查隊列是否存在再讀取元素。

Redis計數器是一種使用Redis鍵值對存儲來實現計數操作的機制,包含以下步驟:創建計數器鍵、增加計數、減少計數、重置計數和獲取計數。 Redis計數器的優勢包括速度快、高並發、持久性和簡單易用。它可用於用戶訪問計數、實時指標跟踪、遊戲分數和排名以及訂單處理計數等場景。

使用 Redis 指令需要以下步驟:打開 Redis 客戶端。輸入指令(動詞 鍵 值)。提供所需參數(因指令而異)。按 Enter 執行指令。 Redis 返迴響應,指示操作結果(通常為 OK 或 -ERR)。
