


Mari bercakap tentang kunci teragih berasaskan Redis dalam sistem teragih
Dikunci, adakah terdapat sebarang isu konkurensi? RedisAdakah anda benar-benar memahami kunci yang diedarkan? Artikel berikut akan bercakap dengan anda tentang kunci teragih berasaskan Redis dalam sistem yang diedarkan saya harap ia akan membantu anda!
Dalam projek yang baru diambil alih, kadangkala akan ada masalah dengan baki akaun. Penjelasan yang diberikan oleh bos teknikal sebelum ini adalah: Selepas siasatan, puncanya tidak dijumpai Selepas itu, saya terlalu sibuk dan tidak menyelesaikannya > Sekarang projek itu dihantar, masalah seperti itu adalah satu kemestian untuk diselesaikan. Selepas menyusun semua logik pemprosesan perakaunan, kami akhirnya menemui sebabnya: ia disebabkan oleh operasi pangkalan data serentak pada akaun panas. Mengenai isu ini, mari kita bincangkan tentang kunci teragih berdasarkan Redis dalam sistem teragih. Dengan cara ini, kami juga memecahkan punca dan penyelesaian kepada masalah tersebut. [Cadangan berkaitan:
Tutorial video RedisAnalisis sebabKonkurensi sistem tidak tinggi, dan terdapat akaun hangat, tetapi ia adalah tidak begitu serius. Punca masalahnya terletak pada reka bentuk seni bina sistem, yang secara buatan mencipta konkurensi. Senario ini adalah seperti berikut: pedagang mengimport sekumpulan data dalam kelompok, dan sistem akan melaksanakan pra-pemprosesan dan menambah atau mengurangkan baki akaun.
Pada masa ini, satu lagi tugas berjadual juga akan mengimbas dan mengemas kini akaun. Selain itu, operasi pada akaun yang sama diedarkan di antara pelbagai sistem, dan akaun panas muncul.
Untuk menyelesaikan masalah ini, dari peringkat seni bina, kami boleh mempertimbangkan untuk memisahkan sistem perakaunan dan memusatkannya dalam satu sistem untuk pemprosesan Semua transaksi pangkalan data dan urutan pelaksanaan diselaraskan dan diproses oleh sistem perakaunan. Dari perspektif teknikal, akaun tempat liputan boleh dikunci melalui mekanisme kunci.
Artikel ini menerangkan secara terperinci pelaksanaan kunci yang diedarkan untuk akaun panas.
Analisis kunciDalam persekitaran berbilang benang Java, biasanya terdapat beberapa jenis kunci yang boleh digunakan:
JVM tahap model memori Kunci yang biasa digunakan termasuk: kunci disegerakkan, Kunci, dsb.;- kunci pangkalan data, seperti kunci optimistik, kunci pesimis, dll.; Kunci Tahap memori JVM boleh memastikan keselamatan benang di bawah satu perkhidmatan, seperti apabila berbilang benang mengakses/mengubah suai pembolehubah global. Tetapi apabila sistem digunakan dalam kelompok, kunci tempatan peringkat JVM tidak berkuasa.
- Penguncian pesimis dan penguncian optimistik
Seperti kes di atas, akaun tempat liputan ialah sumber kongsi dalam sistem yang diedarkan dan kami biasanya menggunakan
kunci pangkalan dataatau kunci yang diedarkan untuk menyelesaikan masalah.
Kunci pangkalan data terbahagi kepada kunci optimistik dan kunci pesimis.
Kunci pesimis dilaksanakan berdasarkan kunci eksklusif yang disediakan oleh pangkalan data (Mysql's InnoDB). Semasa menjalankan operasi transaksi, melalui pilihan...untuk penyata kemas kini, MySQL akan menambah kunci eksklusif pada setiap baris data dalam set hasil pertanyaan, dan urutan lain akan menyekat kemas kini dan memadam operasi rekod. Ini mencapai pelaksanaan berurutan (pengubahsuaian) sumber yang dikongsi;
Kunci optimistikadalah relatif kepada kunci pesimis mengandaikan bahawa data secara amnya tidak akan menyebabkan konflik, jadi data diserahkan dan dikemas kini . Hanya selepas itu konflik data akan dikesan secara rasmi. Jika terdapat konflik, maklumat pengecualian dikembalikan kepada pengguna, membolehkan pengguna memutuskan perkara yang perlu dilakukan. Penguncian optimistik sesuai untuk senario di mana terdapat lebih banyak bacaan dan kurang penulisan, yang boleh meningkatkan daya pengeluaran program. Penguncian optimis biasanya dilaksanakan berdasarkan status rakaman atau menambah versi.
Senario kegagalan kunci pesimisKunci pesimis telah digunakan dalam projek, tetapi kunci pesimis gagal. Ini juga merupakan salah faham biasa apabila menggunakan penguncian yang pesimis. Mari analisanya di bawah.
Penggunaan biasa proses penguncian pesimis:
Kunci rekod melalui pilih... untuk kemas kini; Kira baki baharu, ubah suai jumlah dan simpannya ;Lepaskan kunci selepas pelaksanaan selesai;
- Proses pemprosesan yang sering melakukan kesilapan:
- Soal baki akaun dan kira baki baharu;
- Dalam proses yang salah, seperti pertanyaan perkhidmatan A dan B Baki semuanya 100, A tolak 50, B tolak 40, kemudian A kunci rekod dan kemas kini pangkalan data kepada 50 selepas A keluarkan kunci, B kunci rekod dan kemas kini pangkalan data kepada 60. Jelas sekali, yang terakhir telah menimpa kemas kini yang pertama. Penyelesaiannya adalah untuk mengembangkan skop kunci dan memajukan kunci sebelum mengira baki baru.
- Biasanya penguncian yang pesimis memberi banyak tekanan kepada pangkalan data Dalam amalan, ia biasanya dilaksanakan menggunakan penguncian optimistik atau penguncian teragih berdasarkan senario.
- Mari kita ke inti dan bercakap tentang pelaksanaan kunci yang diedarkan berdasarkan Redis.
- Latihan praktikal kunci edaran Redis
Di sini kami mengambil skrip Spring Boot, Redis dan Lua sebagai contoh untuk menunjukkan pelaksanaan kunci yang diedarkan. Untuk memudahkan pemprosesan, Redis dalam contoh menganggap kedua-dua fungsi kunci teragih dan fungsi pangkalan data.
Pembinaan senario
Dalam persekitaran kluster, kendalikan jumlah akaun yang sama ialah:
- Baca jumlah pengguna daripada pangkalan data;
- Atur cara mengubah suai amaun
- dan kemudian menyimpan jumlah terkini dalam pangkalan data
Yang berikut tidak mengunci atau menyegerakkan proses daripada permulaan, dan secara beransur-ansur menyimpulkan Kunci pengedaran akhir.
Penyepaduan asas dan pembinaan kelas
Sediakan persekitaran perniagaan asas tanpa mengunci.
Mula-mula memperkenalkan kebergantungan yang berkaitan dalam projek Spring Boot:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
Akaun kelas entiti yang sepadan UserAccount:
public class UserAccount { //用户ID private String userId; //账户内金额 private int amount; //添加账户金额 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略构造方法和getter/setter }
Buat kelas pelaksanaan urutan AkaunOperationThread:
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate<Object, Object> redisTemplate; public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加锁 */ private void noLock() { try { Random random = new Random(); // 模拟线程进行业务处理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金额+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } }
Instasiasi RedisTemplate diserahkan kepada Spring Boot:
@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
Akhir sekali, sediakan TestController untuk mencetuskan operasi berbilang benang:
@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate<Object, Object> redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用户user_001到Redis,账户金额为0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 开启10个线程进行同步测试,每个线程为账户增加1元 for (int i = 0; i < 10; i++) { logger.info("创建线程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主线程休眠1秒等待线程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查询Redis中的user_001账户 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
Pelaksanaan Dalam program di atas, biasanya terdapat 10 utas, setiap utas menambah 1, dan hasilnya harus 10. Tetapi jika anda melaksanakannya beberapa kali, anda akan mendapati bahawa hasilnya sangat berbeza, dan pada dasarnya lebih kecil daripada 10.
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
Mengambil log di atas sebagai contoh, empat utas pertama semuanya menukar nilai kepada 1, yang bermaksud bahawa tiga utas seterusnya menimpa pengubahsuaian sebelumnya, menyebabkan hasil akhir bukan 10, tetapi hanya 5. Ini jelas bermasalah.
Pelaksanaan kunci penyegerakan Redis
Memandangkan situasi di atas, dalam JVM yang sama, kita boleh melengkapkannya melalui penguncian benang. Walau bagaimanapun, dalam persekitaran yang diedarkan, kunci tahap JVM tidak boleh dilaksanakan di sini.
Idea asas: Apabila urutan pertama masuk, rekodkannya dalam Redis Apabila urutan berikutnya datang untuk meminta, ia akan dinilai sama ada rekod itu wujud dalam Redis, ini bermakna ia berada dalam keadaan terkunci , tunggu atau kembali. Jika ia tidak wujud, pemprosesan perniagaan susulan akan dilakukan.
/** * 1.抢占资源时判断是否被锁。 * 2.如未锁则抢占成功且加锁,否则等待锁释放。 * 3.业务完成后释放锁,让给其它线程。 * <p> * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 获得锁 -> 加锁 -> 跳出循环 logger.info(Thread.currentThread().getName() + ":获得锁"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
Dalam blok kod while, tentukan dahulu sama ada ID pengguna yang sepadan wujud dalam Redis Jika ia tidak wujud, tetapkan dan kuncinya Jika wujud, lompat keluar dari gelung dan teruskan menunggu.
Kod di atas nampaknya melaksanakan fungsi mengunci, tetapi apabila program dilaksanakan, anda akan mendapati masih terdapat masalah konkurensi seolah-olah ia tidak dikunci. Sebabnya ialah: operasi memperoleh dan mengunci bukan atom. Sebagai contoh, dua utas mendapati bahawa kunci kedua-duanya adalah batal dan menguncinya Pada masa ini, masalah konkurensi masih wujud.
Redis Atomic Synchronization Lock
Untuk menangani masalah di atas, proses memperoleh dan mengunci boleh diatomkan. Ia boleh dilaksanakan berdasarkan API pengabusan yang disediakan oleh spring-boot-data-redis:
// 该方法使用了redis的指令:SETNX key value // 1.key不存在,设置成功返回value,setIfAbsent返回true; // 2.key存在,则设置失败返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
Operasi pengabusan kaedah di atas ialah enkapsulasi perintah setnx Redis Redis adalah seperti berikut:
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
Apabila menetapkan mykey buat kali pertama, ia tidak wujud, maka 1 dikembalikan, menunjukkan bahawa tetapan itu berjaya apabila menetapkan mykey untuk kali kedua, ia sudah wujud; , dan 0 dikembalikan, menunjukkan bahawa tetapan gagal. Tanya nilai yang sepadan dengan mykey sekali lagi dan anda akan mendapati bahawa ia masih nilai yang ditetapkan untuk kali pertama. Dalam erti kata lain, setnx redis memastikan bahawa kunci unik hanya boleh ditetapkan dengan jayanya oleh satu perkhidmatan.
Setelah memahami API di atas dan prinsip asas, mari kita lihat kod kaedah pelaksanaan dalam urutan seperti berikut:
/** * 1.原子操作加锁 * 2.竞争线程循环重试获得锁 * 3.业务完成释放锁 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":获得锁"); try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
Laksanakan kod itu sekali lagi dan anda akan mendapati bahawa hasilnya adalah betul, yang bermaksud ia boleh berjaya Utas yang diedarkan dikunci.
Kebuntuan kunci edaran Redis
Walaupun hasil pelaksanaan kod di atas adalah baik, jika aplikasi ranap secara tidak normal, tidak akan ada masa untuk melaksanakan kaedah melepaskan kunci masuk akhirnya. Kemudian benang lain tidak akan dapat memperoleh kunci ini.
Kaedah setIfAbsent yang terlebih beban boleh digunakan pada masa ini:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
Berdasarkan kaedah ini, masa tamat tempoh kunci boleh ditetapkan. Dengan cara ini, walaupun utas yang memperoleh kunci terputus, utas lain boleh memperoleh kunci seperti biasa selepas data dalam Redis tamat tempoh.
Kod sampel adalah seperti berikut:
private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并设置5秒过期时间 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁--------"); // 应用在这里宕机,进程退出,无法执行 finally; Thread.currentThread().interrupt(); // 业务逻辑... } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } } }
Tamat masa perniagaan dan benang daemon
Tempoh tamat masa Redis ditambahkan di atas, yang nampaknya untuk menyelesaikan masalah, tetapi ia juga memperkenalkan masalah baru.
Sebagai contoh, dalam keadaan biasa, urutan A boleh menyelesaikan perniagaan dalam masa 5 saat, tetapi kadangkala ia mungkin mengambil masa lebih daripada 5 saat. Jika tamat masa ditetapkan kepada 5 saat, benang A memperoleh kunci, tetapi pemprosesan logik perniagaan mengambil masa 6 saat. Pada masa ini, utas A masih menjalankan logik perniagaan biasa, dan utas B telah memperoleh kuncinya. Apabila benang A selesai diproses, kunci benang B boleh dilepaskan.
Terdapat dua masalah dalam senario di atas:
- Pertama, utas A dan utas B boleh dilaksanakan pada masa yang sama, menyebabkan isu konkurensi.
- Kedua, benang A mungkin melepaskan kunci benang B, yang membawa kepada satu siri kitaran ganas.
Sudah tentu, anda boleh menentukan sama ada kunci kepunyaan benang A atau benang B dengan menetapkan nilai dalam Redis. Walau bagaimanapun, analisis yang teliti akan mendedahkan bahawa intipati masalah ini ialah benang A mengambil masa lebih lama untuk melaksanakan logik perniagaan daripada tamat masa kunci.
Kemudian terdapat dua penyelesaian:
- Pertama, tetapkan tamat masa yang cukup lama untuk memastikan kod perniagaan boleh dilaksanakan sebelum kunci dilepaskan; , tambahkan benang daemon untuk kunci dan tambahkan masa untuk kunci yang hampir tamat tempoh tetapi belum dikeluarkan Kaedah pertama memerlukan logik perniagaan yang memakan masa dalam kebanyakan kes seluruh bank; Tetapan tamat masa.
Cara kedua ialah meningkatkan masa tamat kunci secara dinamik melalui kaedah benang daemon berikut.
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守护 主线程关闭则结束守护线程 private volatile boolean daemon = true; // 守护锁 private String lockKey; private RedisTemplate<Object, Object> redisTemplate; public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒则续命 if (time < 1000) { logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主线程主动调用结束 public void stop() { daemon = false; } }
上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。
主线程中相关代码实现:
private void deamonRedisLock() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁-----"); } //关闭守护线程 if (daemonThread != null) { daemonThread.stop(); } } }
其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。
基于Lua脚本的实现
在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。
首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript
对象,在RedisConfig
配置类中添加如下实例化代码:
@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代码 @Bean public DefaultRedisScript<Boolean> lockRedisScript() { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript<Long> unlockRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
再通过在AccountOperationThread
类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate
来调用了,改造之后的代码实现如下:
private void deamonRedisLockWithLua() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("异常", e); } finally { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //关闭守护线程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":释放锁---"); } } } }
其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。
Redis锁的其他因素
除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。
Redis锁的不可重入
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。
可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。
等待锁释放
有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:
- 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
- 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。
集群中的主备切换和脑裂
在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。
当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。
小结
通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。
同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。
Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。
源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
更多编程相关知识,请访问:编程入门!!
Atas ialah kandungan terperinci Mari bercakap tentang kunci teragih berasaskan Redis dalam sistem teragih. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



Mod Redis cluster menyebarkan contoh Redis ke pelbagai pelayan melalui sharding, meningkatkan skalabilitas dan ketersediaan. Langkah -langkah pembinaan adalah seperti berikut: Buat contoh Redis ganjil dengan pelabuhan yang berbeza; Buat 3 contoh sentinel, memantau contoh redis dan failover; Konfigurasi fail konfigurasi sentinel, tambahkan pemantauan maklumat contoh dan tetapan failover; Konfigurasi fail konfigurasi contoh Redis, aktifkan mod kluster dan tentukan laluan fail maklumat kluster; Buat fail nodes.conf, yang mengandungi maklumat setiap contoh Redis; Mulakan kluster, laksanakan perintah Buat untuk membuat kluster dan tentukan bilangan replika; Log masuk ke kluster untuk melaksanakan perintah maklumat kluster untuk mengesahkan status kluster; buat

Menggunakan Arahan Redis memerlukan langkah -langkah berikut: Buka klien Redis. Masukkan arahan (nilai kunci kata kerja). Menyediakan parameter yang diperlukan (berbeza dari arahan ke arahan). Tekan Enter untuk melaksanakan arahan. Redis mengembalikan tindak balas yang menunjukkan hasil operasi (biasanya OK atau -r).

Redis menggunakan satu seni bina berulir untuk memberikan prestasi tinggi, kesederhanaan, dan konsistensi. Ia menggunakan I/O multiplexing, gelung acara, I/O yang tidak menyekat, dan memori bersama untuk meningkatkan keserasian, tetapi dengan batasan batasan konkurensi, satu titik kegagalan, dan tidak sesuai untuk beban kerja yang berintensifkan.

Untuk melihat semua kunci di Redis, terdapat tiga cara: Gunakan perintah kunci untuk mengembalikan semua kunci yang sepadan dengan corak yang ditentukan; Gunakan perintah imbasan untuk melangkah ke atas kunci dan kembalikan satu set kunci; Gunakan arahan maklumat untuk mendapatkan jumlah kunci.

Langkah -langkah untuk memulakan pelayan Redis termasuk: Pasang Redis mengikut sistem operasi. Mulakan perkhidmatan Redis melalui Redis-server (Linux/macOS) atau redis-server.exe (Windows). Gunakan redis-cli ping (linux/macOS) atau redis-cli.exe ping (windows) perintah untuk memeriksa status perkhidmatan. Gunakan klien Redis, seperti redis-cli, python, atau node.js untuk mengakses pelayan.

Menggunakan REDIS untuk mengunci operasi memerlukan mendapatkan kunci melalui arahan SETNX, dan kemudian menggunakan perintah luput untuk menetapkan masa tamat tempoh. Langkah-langkah khusus adalah: (1) Gunakan arahan SETNX untuk cuba menetapkan pasangan nilai utama; (2) Gunakan perintah luput untuk menetapkan masa tamat tempoh untuk kunci; (3) Gunakan perintah DEL untuk memadam kunci apabila kunci tidak lagi diperlukan.

Cara Mengosongkan Data Redis: Gunakan perintah Flushall untuk membersihkan semua nilai utama. Gunakan perintah flushdb untuk membersihkan nilai utama pangkalan data yang dipilih sekarang. Gunakan Pilih untuk menukar pangkalan data, dan kemudian gunakan FlushDB untuk membersihkan pelbagai pangkalan data. Gunakan perintah DEL untuk memadam kunci tertentu. Gunakan alat REDIS-CLI untuk membersihkan data.

Redis menggunakan jadual hash untuk menyimpan data dan menyokong struktur data seperti rentetan, senarai, jadual hash, koleksi dan koleksi yang diperintahkan. Redis berterusan data melalui snapshots (RDB) dan menambah mekanisme tulis sahaja (AOF). Redis menggunakan replikasi master-hamba untuk meningkatkan ketersediaan data. Redis menggunakan gelung acara tunggal untuk mengendalikan sambungan dan arahan untuk memastikan atom dan konsistensi data. Redis menetapkan masa tamat tempoh untuk kunci dan menggunakan mekanisme memadam malas untuk memadamkan kunci tamat tempoh.
