Dikunci, adakah terdapat sebarang isu konkurensi? RedisAdakah anda benar-benar memahami kunci yang diedarkan? Artikel berikut akan bercakap dengan anda tentang kunci teragih berasaskan Redis dalam sistem yang diedarkan saya harap ia akan membantu anda!
Dalam projek yang baru diambil alih, kadangkala akan ada masalah dengan baki akaun. Penjelasan yang diberikan oleh bos teknikal sebelum ini adalah: Selepas siasatan, puncanya tidak dijumpai Selepas itu, saya terlalu sibuk dan tidak menyelesaikannya > Sekarang projek itu dihantar, masalah seperti itu adalah satu kemestian untuk diselesaikan. Selepas menyusun semua logik pemprosesan perakaunan, kami akhirnya menemui sebabnya: ia disebabkan oleh operasi pangkalan data serentak pada akaun panas. Mengenai isu ini, mari kita bincangkan tentang kunci teragih berdasarkan Redis dalam sistem teragih. Dengan cara ini, kami juga memecahkan punca dan penyelesaian kepada masalah tersebut. [Cadangan berkaitan:
Tutorial video RedisAnalisis sebabPada masa ini, satu lagi tugas berjadual juga akan mengimbas dan mengemas kini akaun. Selain itu, operasi pada akaun yang sama diedarkan di antara pelbagai sistem, dan akaun panas muncul.
Untuk menyelesaikan masalah ini, dari peringkat seni bina, kami boleh mempertimbangkan untuk memisahkan sistem perakaunan dan memusatkannya dalam satu sistem untuk pemprosesan Semua transaksi pangkalan data dan urutan pelaksanaan diselaraskan dan diproses oleh sistem perakaunan. Dari perspektif teknikal, akaun tempat liputan boleh dikunci melalui mekanisme kunci.
Artikel ini menerangkan secara terperinci pelaksanaan kunci yang diedarkan untuk akaun panas.
Analisis kunciSeperti kes di atas, akaun tempat liputan ialah sumber kongsi dalam sistem yang diedarkan dan kami biasanya menggunakan
kunci pangkalan dataKunci pangkalan data terbahagi kepada kunci optimistik dan kunci pesimis.
Kunci pesimis dilaksanakan berdasarkan kunci eksklusif yang disediakan oleh pangkalan data (Mysql's InnoDB). Semasa menjalankan operasi transaksi, melalui pilihan...untuk penyata kemas kini, MySQL akan menambah kunci eksklusif pada setiap baris data dalam set hasil pertanyaan, dan urutan lain akan menyekat kemas kini dan memadam operasi rekod. Ini mencapai pelaksanaan berurutan (pengubahsuaian) sumber yang dikongsi;
Kunci optimistikadalah relatif kepada kunci pesimis mengandaikan bahawa data secara amnya tidak akan menyebabkan konflik, jadi data diserahkan dan dikemas kini . Hanya selepas itu konflik data akan dikesan secara rasmi. Jika terdapat konflik, maklumat pengecualian dikembalikan kepada pengguna, membolehkan pengguna memutuskan perkara yang perlu dilakukan. Penguncian optimistik sesuai untuk senario di mana terdapat lebih banyak bacaan dan kurang penulisan, yang boleh meningkatkan daya pengeluaran program. Penguncian optimis biasanya dilaksanakan berdasarkan status rakaman atau menambah versi.
Senario kegagalan kunci pesimisKunci pesimis telah digunakan dalam projek, tetapi kunci pesimis gagal. Ini juga merupakan salah faham biasa apabila menggunakan penguncian yang pesimis. Mari analisanya di bawah.
Lepaskan kunci selepas pelaksanaan selesai;
Di sini kami mengambil skrip Spring Boot, Redis dan Lua sebagai contoh untuk menunjukkan pelaksanaan kunci yang diedarkan. Untuk memudahkan pemprosesan, Redis dalam contoh menganggap kedua-dua fungsi kunci teragih dan fungsi pangkalan data.
Dalam persekitaran kluster, kendalikan jumlah akaun yang sama ialah:
Yang berikut tidak mengunci atau menyegerakkan proses daripada permulaan, dan secara beransur-ansur menyimpulkan Kunci pengedaran akhir.
Sediakan persekitaran perniagaan asas tanpa mengunci.
Mula-mula memperkenalkan kebergantungan yang berkaitan dalam projek Spring Boot:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
Akaun kelas entiti yang sepadan UserAccount:
public class UserAccount { //用户ID private String userId; //账户内金额 private int amount; //添加账户金额 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略构造方法和getter/setter }
Buat kelas pelaksanaan urutan AkaunOperationThread:
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate<Object, Object> redisTemplate; public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加锁 */ private void noLock() { try { Random random = new Random(); // 模拟线程进行业务处理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金额+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } }
Instasiasi RedisTemplate diserahkan kepada Spring Boot:
@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
Akhir sekali, sediakan TestController untuk mencetuskan operasi berbilang benang:
@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate<Object, Object> redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用户user_001到Redis,账户金额为0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 开启10个线程进行同步测试,每个线程为账户增加1元 for (int i = 0; i < 10; i++) { logger.info("创建线程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主线程休眠1秒等待线程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查询Redis中的user_001账户 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
Pelaksanaan Dalam program di atas, biasanya terdapat 10 utas, setiap utas menambah 1, dan hasilnya harus 10. Tetapi jika anda melaksanakannya beberapa kali, anda akan mendapati bahawa hasilnya sangat berbeza, dan pada dasarnya lebih kecil daripada 10.
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
Mengambil log di atas sebagai contoh, empat utas pertama semuanya menukar nilai kepada 1, yang bermaksud bahawa tiga utas seterusnya menimpa pengubahsuaian sebelumnya, menyebabkan hasil akhir bukan 10, tetapi hanya 5. Ini jelas bermasalah.
Memandangkan situasi di atas, dalam JVM yang sama, kita boleh melengkapkannya melalui penguncian benang. Walau bagaimanapun, dalam persekitaran yang diedarkan, kunci tahap JVM tidak boleh dilaksanakan di sini.
Idea asas: Apabila urutan pertama masuk, rekodkannya dalam Redis Apabila urutan berikutnya datang untuk meminta, ia akan dinilai sama ada rekod itu wujud dalam Redis, ini bermakna ia berada dalam keadaan terkunci , tunggu atau kembali. Jika ia tidak wujud, pemprosesan perniagaan susulan akan dilakukan.
/** * 1.抢占资源时判断是否被锁。 * 2.如未锁则抢占成功且加锁,否则等待锁释放。 * 3.业务完成后释放锁,让给其它线程。 * <p> * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 获得锁 -> 加锁 -> 跳出循环 logger.info(Thread.currentThread().getName() + ":获得锁"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
Dalam blok kod while, tentukan dahulu sama ada ID pengguna yang sepadan wujud dalam Redis Jika ia tidak wujud, tetapkan dan kuncinya Jika wujud, lompat keluar dari gelung dan teruskan menunggu.
Kod di atas nampaknya melaksanakan fungsi mengunci, tetapi apabila program dilaksanakan, anda akan mendapati masih terdapat masalah konkurensi seolah-olah ia tidak dikunci. Sebabnya ialah: operasi memperoleh dan mengunci bukan atom. Sebagai contoh, dua utas mendapati bahawa kunci kedua-duanya adalah batal dan menguncinya Pada masa ini, masalah konkurensi masih wujud.
Untuk menangani masalah di atas, proses memperoleh dan mengunci boleh diatomkan. Ia boleh dilaksanakan berdasarkan API pengabusan yang disediakan oleh spring-boot-data-redis:
// 该方法使用了redis的指令:SETNX key value // 1.key不存在,设置成功返回value,setIfAbsent返回true; // 2.key存在,则设置失败返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
Operasi pengabusan kaedah di atas ialah enkapsulasi perintah setnx Redis Redis adalah seperti berikut:
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
Apabila menetapkan mykey buat kali pertama, ia tidak wujud, maka 1 dikembalikan, menunjukkan bahawa tetapan itu berjaya apabila menetapkan mykey untuk kali kedua, ia sudah wujud; , dan 0 dikembalikan, menunjukkan bahawa tetapan gagal. Tanya nilai yang sepadan dengan mykey sekali lagi dan anda akan mendapati bahawa ia masih nilai yang ditetapkan untuk kali pertama. Dalam erti kata lain, setnx redis memastikan bahawa kunci unik hanya boleh ditetapkan dengan jayanya oleh satu perkhidmatan.
Setelah memahami API di atas dan prinsip asas, mari kita lihat kod kaedah pelaksanaan dalam urutan seperti berikut:
/** * 1.原子操作加锁 * 2.竞争线程循环重试获得锁 * 3.业务完成释放锁 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":获得锁"); try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
Laksanakan kod itu sekali lagi dan anda akan mendapati bahawa hasilnya adalah betul, yang bermaksud ia boleh berjaya Utas yang diedarkan dikunci.
Walaupun hasil pelaksanaan kod di atas adalah baik, jika aplikasi ranap secara tidak normal, tidak akan ada masa untuk melaksanakan kaedah melepaskan kunci masuk akhirnya. Kemudian benang lain tidak akan dapat memperoleh kunci ini.
Kaedah setIfAbsent yang terlebih beban boleh digunakan pada masa ini:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
Berdasarkan kaedah ini, masa tamat tempoh kunci boleh ditetapkan. Dengan cara ini, walaupun utas yang memperoleh kunci terputus, utas lain boleh memperoleh kunci seperti biasa selepas data dalam Redis tamat tempoh.
Kod sampel adalah seperti berikut:
private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并设置5秒过期时间 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁--------"); // 应用在这里宕机,进程退出,无法执行 finally; Thread.currentThread().interrupt(); // 业务逻辑... } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } } }
Tempoh tamat masa Redis ditambahkan di atas, yang nampaknya untuk menyelesaikan masalah, tetapi ia juga memperkenalkan masalah baru.
Sebagai contoh, dalam keadaan biasa, urutan A boleh menyelesaikan perniagaan dalam masa 5 saat, tetapi kadangkala ia mungkin mengambil masa lebih daripada 5 saat. Jika tamat masa ditetapkan kepada 5 saat, benang A memperoleh kunci, tetapi pemprosesan logik perniagaan mengambil masa 6 saat. Pada masa ini, utas A masih menjalankan logik perniagaan biasa, dan utas B telah memperoleh kuncinya. Apabila benang A selesai diproses, kunci benang B boleh dilepaskan.
Terdapat dua masalah dalam senario di atas:
Sudah tentu, anda boleh menentukan sama ada kunci kepunyaan benang A atau benang B dengan menetapkan nilai dalam Redis. Walau bagaimanapun, analisis yang teliti akan mendedahkan bahawa intipati masalah ini ialah benang A mengambil masa lebih lama untuk melaksanakan logik perniagaan daripada tamat masa kunci.
Kemudian terdapat dua penyelesaian:
Cara kedua ialah meningkatkan masa tamat kunci secara dinamik melalui kaedah benang daemon berikut.
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守护 主线程关闭则结束守护线程 private volatile boolean daemon = true; // 守护锁 private String lockKey; private RedisTemplate<Object, Object> redisTemplate; public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒则续命 if (time < 1000) { logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主线程主动调用结束 public void stop() { daemon = false; } }
上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。
主线程中相关代码实现:
private void deamonRedisLock() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁-----"); } //关闭守护线程 if (daemonThread != null) { daemonThread.stop(); } } }
其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。
在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。
首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript
对象,在RedisConfig
配置类中添加如下实例化代码:
@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代码 @Bean public DefaultRedisScript<Boolean> lockRedisScript() { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript<Long> unlockRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
再通过在AccountOperationThread
类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate
来调用了,改造之后的代码实现如下:
private void deamonRedisLockWithLua() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("异常", e); } finally { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //关闭守护线程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":释放锁---"); } } } }
其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。
除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。
可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。
有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:
在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。
当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。
通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。
同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。
Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。
源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
更多编程相关知识,请访问:编程入门!!
Atas ialah kandungan terperinci Mari bercakap tentang kunci teragih berasaskan Redis dalam sistem teragih. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!