Rumah > pangkalan data > Redis > teks badan

Mari bercakap tentang kunci teragih berasaskan Redis dalam sistem teragih

青灯夜游
Lepaskan: 2021-10-29 10:52:11
ke hadapan
1561 orang telah melayarinya

Dikunci, adakah terdapat sebarang isu konkurensi? RedisAdakah anda benar-benar memahami kunci yang diedarkan? Artikel berikut akan bercakap dengan anda tentang kunci teragih berasaskan Redis dalam sistem yang diedarkan saya harap ia akan membantu anda!

Mari bercakap tentang kunci teragih berasaskan Redis dalam sistem teragih

Dalam projek yang baru diambil alih, kadangkala akan ada masalah dengan baki akaun. Penjelasan yang diberikan oleh bos teknikal sebelum ini adalah: Selepas siasatan, puncanya tidak dijumpai Selepas itu, saya terlalu sibuk dan tidak menyelesaikannya > Sekarang projek itu dihantar, masalah seperti itu adalah satu kemestian untuk diselesaikan. Selepas menyusun semua logik pemprosesan perakaunan, kami akhirnya menemui sebabnya: ia disebabkan oleh operasi pangkalan data serentak pada akaun panas. Mengenai isu ini, mari kita bincangkan tentang kunci teragih berdasarkan Redis dalam sistem teragih. Dengan cara ini, kami juga memecahkan punca dan penyelesaian kepada masalah tersebut. [Cadangan berkaitan:

Tutorial video Redis

]

Analisis sebab

Konkurensi sistem tidak tinggi, dan terdapat akaun hangat, tetapi ia adalah tidak begitu serius. Punca masalahnya terletak pada reka bentuk seni bina sistem, yang secara buatan mencipta konkurensi. Senario ini adalah seperti berikut: pedagang mengimport sekumpulan data dalam kelompok, dan sistem akan melaksanakan pra-pemprosesan dan menambah atau mengurangkan baki akaun.

Pada masa ini, satu lagi tugas berjadual juga akan mengimbas dan mengemas kini akaun. Selain itu, operasi pada akaun yang sama diedarkan di antara pelbagai sistem, dan akaun panas muncul.

Untuk menyelesaikan masalah ini, dari peringkat seni bina, kami boleh mempertimbangkan untuk memisahkan sistem perakaunan dan memusatkannya dalam satu sistem untuk pemprosesan Semua transaksi pangkalan data dan urutan pelaksanaan diselaraskan dan diproses oleh sistem perakaunan. Dari perspektif teknikal, akaun tempat liputan boleh dikunci melalui mekanisme kunci.

Artikel ini menerangkan secara terperinci pelaksanaan kunci yang diedarkan untuk akaun panas.

Analisis kunci

Dalam persekitaran berbilang benang Java, biasanya terdapat beberapa jenis kunci yang boleh digunakan:

JVM tahap model memori Kunci yang biasa digunakan termasuk: kunci disegerakkan, Kunci, dsb.;
  • kunci pangkalan data, seperti kunci optimistik, kunci pesimis, dll.;
  • Kunci Tahap memori JVM boleh memastikan keselamatan benang di bawah satu perkhidmatan, seperti apabila berbilang benang mengakses/mengubah suai pembolehubah global. Tetapi apabila sistem digunakan dalam kelompok, kunci tempatan peringkat JVM tidak berkuasa.
  • Penguncian pesimis dan penguncian optimistik

Seperti kes di atas, akaun tempat liputan ialah sumber kongsi dalam sistem yang diedarkan dan kami biasanya menggunakan

kunci pangkalan data

atau kunci yang diedarkan untuk menyelesaikan masalah.

Kunci pangkalan data terbahagi kepada kunci optimistik dan kunci pesimis.

Kunci pesimis dilaksanakan berdasarkan kunci eksklusif yang disediakan oleh pangkalan data (Mysql's InnoDB). Semasa menjalankan operasi transaksi, melalui pilihan...untuk penyata kemas kini, MySQL akan menambah kunci eksklusif pada setiap baris data dalam set hasil pertanyaan, dan urutan lain akan menyekat kemas kini dan memadam operasi rekod. Ini mencapai pelaksanaan berurutan (pengubahsuaian) sumber yang dikongsi;

Kunci optimistik

adalah relatif kepada kunci pesimis mengandaikan bahawa data secara amnya tidak akan menyebabkan konflik, jadi data diserahkan dan dikemas kini . Hanya selepas itu konflik data akan dikesan secara rasmi. Jika terdapat konflik, maklumat pengecualian dikembalikan kepada pengguna, membolehkan pengguna memutuskan perkara yang perlu dilakukan. Penguncian optimistik sesuai untuk senario di mana terdapat lebih banyak bacaan dan kurang penulisan, yang boleh meningkatkan daya pengeluaran program. Penguncian optimis biasanya dilaksanakan berdasarkan status rakaman atau menambah versi.

Senario kegagalan kunci pesimis

Kunci pesimis telah digunakan dalam projek, tetapi kunci pesimis gagal. Ini juga merupakan salah faham biasa apabila menggunakan penguncian yang pesimis. Mari analisanya di bawah.

Penggunaan biasa proses penguncian pesimis:

Kunci rekod melalui pilih... untuk kemas kini;

Kira baki baharu, ubah suai jumlah dan simpannya ;

Lepaskan kunci selepas pelaksanaan selesai;
  • Proses pemprosesan yang sering melakukan kesilapan:
  • Soal baki akaun dan kira baki baharu;
Melalui pilih .. . untuk rekod kunci kemas kini;

ubah suai jumlah dan simpannya;
    Dalam proses yang salah, seperti pertanyaan perkhidmatan A dan B Baki semuanya 100, A tolak 50, B tolak 40, kemudian A kunci rekod dan kemas kini pangkalan data kepada 50 selepas A keluarkan kunci, B kunci rekod dan kemas kini pangkalan data kepada 60. Jelas sekali, yang terakhir telah menimpa kemas kini yang pertama. Penyelesaiannya adalah untuk mengembangkan skop kunci dan memajukan kunci sebelum mengira baki baru.
  • Biasanya penguncian yang pesimis memberi banyak tekanan kepada pangkalan data Dalam amalan, ia biasanya dilaksanakan menggunakan penguncian optimistik atau penguncian teragih berdasarkan senario.
  • Mari kita ke inti dan bercakap tentang pelaksanaan kunci yang diedarkan berdasarkan Redis.
  • Latihan praktikal kunci edaran Redis

Di sini kami mengambil skrip Spring Boot, Redis dan Lua sebagai contoh untuk menunjukkan pelaksanaan kunci yang diedarkan. Untuk memudahkan pemprosesan, Redis dalam contoh menganggap kedua-dua fungsi kunci teragih dan fungsi pangkalan data.

Pembinaan senario

Dalam persekitaran kluster, kendalikan jumlah akaun yang sama ialah:

  • Baca jumlah pengguna daripada pangkalan data;
  • Atur cara mengubah suai amaun
  • dan kemudian menyimpan jumlah terkini dalam pangkalan data

Yang berikut tidak mengunci atau menyegerakkan proses daripada permulaan, dan secara beransur-ansur menyimpulkan Kunci pengedaran akhir.

Penyepaduan asas dan pembinaan kelas

Sediakan persekitaran perniagaan asas tanpa mengunci.

Mula-mula memperkenalkan kebergantungan yang berkaitan dalam projek Spring Boot:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
Salin selepas log masuk

Akaun kelas entiti yang sepadan UserAccount:

public class UserAccount {

  //用户ID
  private String userId;
  //账户内金额
  private int amount;

  //添加账户金额
  public void addAmount(int amount) {
    this.amount = this.amount + amount;
  }
  // 省略构造方法和getter/setter 
}
Salin selepas log masuk

Buat kelas pelaksanaan urutan AkaunOperationThread:

public class AccountOperationThread implements Runnable {

  private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

  private static final Long RELEASE_SUCCESS = 1L;

  private String userId;

  private RedisTemplate<Object, Object> redisTemplate;

  public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
    this.userId = userId;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    noLock();
  }

  /**
   * 不加锁
   */
  private void noLock() {
    try {
      Random random = new Random();
      // 模拟线程进行业务处理
      TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //模拟数据库中获取用户账号
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
    // 金额+1
    userAccount.addAmount(1);
    logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
    //模拟存回数据库
    redisTemplate.opsForValue().set(userId, userAccount);
  }
}
Salin selepas log masuk

Instasiasi RedisTemplate diserahkan kepada Spring Boot:

@Configuration
public class RedisConfig {

  @Bean
  public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    // 设置value的序列化规则和 key的序列化规则
    redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.afterPropertiesSet();
    return redisTemplate;
  }
}
Salin selepas log masuk

Akhir sekali, sediakan TestController untuk mencetuskan operasi berbilang benang:

@RestController
public class TestController {

  private final static Logger logger = LoggerFactory.getLogger(TestController.class);

  private static ExecutorService executorService = Executors.newFixedThreadPool(10);

  @Autowired
  private RedisTemplate<Object, Object> redisTemplate;

  @GetMapping("/test")
  public String test() throws InterruptedException {
    // 初始化用户user_001到Redis,账户金额为0
    redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
    // 开启10个线程进行同步测试,每个线程为账户增加1元
    for (int i = 0; i < 10; i++) {
      logger.info("创建线程i=" + i);
      executorService.execute(new AccountOperationThread("user_001", redisTemplate));
    }

    // 主线程休眠1秒等待线程跑完
    TimeUnit.MILLISECONDS.sleep(1000);
    // 查询Redis中的user_001账户
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
    logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
    return "success";
  }
}
Salin selepas log masuk

Pelaksanaan Dalam program di atas, biasanya terdapat 10 utas, setiap utas menambah 1, dan hasilnya harus 10. Tetapi jika anda melaksanakannya beberapa kali, anda akan mendapati bahawa hasilnya sangat berbeza, dan pada dasarnya lebih kecil daripada 10.

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5
Salin selepas log masuk

Mengambil log di atas sebagai contoh, empat utas pertama semuanya menukar nilai kepada 1, yang bermaksud bahawa tiga utas seterusnya menimpa pengubahsuaian sebelumnya, menyebabkan hasil akhir bukan 10, tetapi hanya 5. Ini jelas bermasalah.

Pelaksanaan kunci penyegerakan Redis

Memandangkan situasi di atas, dalam JVM yang sama, kita boleh melengkapkannya melalui penguncian benang. Walau bagaimanapun, dalam persekitaran yang diedarkan, kunci tahap JVM tidak boleh dilaksanakan di sini.

Idea asas: Apabila urutan pertama masuk, rekodkannya dalam Redis Apabila urutan berikutnya datang untuk meminta, ia akan dinilai sama ada rekod itu wujud dalam Redis, ini bermakna ia berada dalam keadaan terkunci , tunggu atau kembali. Jika ia tidak wujud, pemprosesan perniagaan susulan akan dilakukan.

  /**
   * 1.抢占资源时判断是否被锁。
   * 2.如未锁则抢占成功且加锁,否则等待锁释放。
   * 3.业务完成后释放锁,让给其它线程。
   * <p>
   * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。
   */
  private void redisLock() {
    Random random = new Random();
    try {
      TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    while (true) {
      Object lock = redisTemplate.opsForValue().get(userId + ":syn");
      if (lock == null) {
        // 获得锁 -> 加锁 -> 跳出循环
        logger.info(Thread.currentThread().getName() + ":获得锁");
        redisTemplate.opsForValue().set(userId + ":syn", "lock");
        break;
      }
      try {
        // 等待500毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }
Salin selepas log masuk

Dalam blok kod while, tentukan dahulu sama ada ID pengguna yang sepadan wujud dalam Redis Jika ia tidak wujud, tetapkan dan kuncinya Jika wujud, lompat keluar dari gelung dan teruskan menunggu.

Kod di atas nampaknya melaksanakan fungsi mengunci, tetapi apabila program dilaksanakan, anda akan mendapati masih terdapat masalah konkurensi seolah-olah ia tidak dikunci. Sebabnya ialah: operasi memperoleh dan mengunci bukan atom. Sebagai contoh, dua utas mendapati bahawa kunci kedua-duanya adalah batal dan menguncinya Pada masa ini, masalah konkurensi masih wujud.

Redis Atomic Synchronization Lock

Untuk menangani masalah di atas, proses memperoleh dan mengunci boleh diatomkan. Ia boleh dilaksanakan berdasarkan API pengabusan yang disediakan oleh spring-boot-data-redis:

// 该方法使用了redis的指令:SETNX key value
// 1.key不存在,设置成功返回value,setIfAbsent返回true;
// 2.key存在,则设置失败返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);
Salin selepas log masuk

Operasi pengabusan kaedah di atas ialah enkapsulasi perintah setnx Redis Redis adalah seperti berikut:

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
Salin selepas log masuk

Apabila menetapkan mykey buat kali pertama, ia tidak wujud, maka 1 dikembalikan, menunjukkan bahawa tetapan itu berjaya apabila menetapkan mykey untuk kali kedua, ia sudah wujud; , dan 0 dikembalikan, menunjukkan bahawa tetapan gagal. Tanya nilai yang sepadan dengan mykey sekali lagi dan anda akan mendapati bahawa ia masih nilai yang ditetapkan untuk kali pertama. Dalam erti kata lain, setnx redis memastikan bahawa kunci unik hanya boleh ditetapkan dengan jayanya oleh satu perkhidmatan.

Setelah memahami API di atas dan prinsip asas, mari kita lihat kod kaedah pelaksanaan dalam urutan seperti berikut:

  /**
   * 1.原子操作加锁
   * 2.竞争线程循环重试获得锁
   * 3.业务完成释放锁
   */
  private void atomicityRedisLock() {
    //Spring data redis 支持的原子性操作
    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
      try {
        // 等待100毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    logger.info(Thread.currentThread().getName() + ":获得锁");
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }
Salin selepas log masuk

Laksanakan kod itu sekali lagi dan anda akan mendapati bahawa hasilnya adalah betul, yang bermaksud ia boleh berjaya Utas yang diedarkan dikunci.

Kebuntuan kunci edaran Redis

Walaupun hasil pelaksanaan kod di atas adalah baik, jika aplikasi ranap secara tidak normal, tidak akan ada masa untuk melaksanakan kaedah melepaskan kunci masuk akhirnya. Kemudian benang lain tidak akan dapat memperoleh kunci ini.

Kaedah setIfAbsent yang terlebih beban boleh digunakan pada masa ini:

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
Salin selepas log masuk

Berdasarkan kaedah ini, masa tamat tempoh kunci boleh ditetapkan. Dengan cara ini, walaupun utas yang memperoleh kunci terputus, utas lain boleh memperoleh kunci seperti biasa selepas data dalam Redis tamat tempoh.

Kod sampel adalah seperti berikut:

private void atomicityAndExRedisLock() {
    try {
      //Spring data redis 支持的原子性操作,并设置5秒过期时间
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
          System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁--------");
      // 应用在这里宕机,进程退出,无法执行 finally;
      Thread.currentThread().interrupt();
      // 业务逻辑...
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁
      if (!Thread.currentThread().isInterrupted()) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁");
      }
    }
  }
Salin selepas log masuk

Tamat masa perniagaan dan benang daemon

Tempoh tamat masa Redis ditambahkan di atas, yang nampaknya untuk menyelesaikan masalah, tetapi ia juga memperkenalkan masalah baru.

Sebagai contoh, dalam keadaan biasa, urutan A boleh menyelesaikan perniagaan dalam masa 5 saat, tetapi kadangkala ia mungkin mengambil masa lebih daripada 5 saat. Jika tamat masa ditetapkan kepada 5 saat, benang A memperoleh kunci, tetapi pemprosesan logik perniagaan mengambil masa 6 saat. Pada masa ini, utas A masih menjalankan logik perniagaan biasa, dan utas B telah memperoleh kuncinya. Apabila benang A selesai diproses, kunci benang B boleh dilepaskan.

Terdapat dua masalah dalam senario di atas:

  • Pertama, utas A dan utas B boleh dilaksanakan pada masa yang sama, menyebabkan isu konkurensi.
  • Kedua, benang A mungkin melepaskan kunci benang B, yang membawa kepada satu siri kitaran ganas.

Sudah tentu, anda boleh menentukan sama ada kunci kepunyaan benang A atau benang B dengan menetapkan nilai dalam Redis. Walau bagaimanapun, analisis yang teliti akan mendedahkan bahawa intipati masalah ini ialah benang A mengambil masa lebih lama untuk melaksanakan logik perniagaan daripada tamat masa kunci.

Kemudian terdapat dua penyelesaian:

  • Pertama, tetapkan tamat masa yang cukup lama untuk memastikan kod perniagaan boleh dilaksanakan sebelum kunci dilepaskan; , tambahkan benang daemon untuk kunci dan tambahkan masa untuk kunci yang hampir tamat tempoh tetapi belum dikeluarkan
  • Kaedah pertama memerlukan logik perniagaan yang memakan masa dalam kebanyakan kes seluruh bank; Tetapan tamat masa.

Cara kedua ialah meningkatkan masa tamat kunci secara dinamik melalui kaedah benang daemon berikut.

public class DaemonThread implements Runnable {
  private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

  // 是否需要守护 主线程关闭则结束守护线程
  private volatile boolean daemon = true;
  // 守护锁
  private String lockKey;

  private RedisTemplate<Object, Object> redisTemplate;

  public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
    this.lockKey = lockKey;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    try {
      while (daemon) {
        long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
        // 剩余有效期小于1秒则续命
        if (time < 1000) {
          logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒");
          redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
        }
        TimeUnit.MILLISECONDS.sleep(300);
      }
      logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  // 主线程主动调用结束
  public void stop() {
    daemon = false;
  }
}
Salin selepas log masuk

上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。

主线程中相关代码实现:

private void deamonRedisLock() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
      String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
      if (value.equals(result)) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁-----");
      }
      //关闭守护线程
      if (daemonThread != null) {
        daemonThread.stop();
      }
    }
  }
Salin selepas log masuk

其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。

基于Lua脚本的实现

在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。

首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript对象,在RedisConfig配置类中添加如下实例化代码:

@Configuration
public class RedisConfig {

  //lock script
  private static final String LOCK_SCRIPT = " if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 " +
      " then redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) " +
      " return 1 " +
      " else return 0 end ";
  private static final String UNLOCK_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call" +
      "(&#39;del&#39;, KEYS[1]) else return 0 end";

  // ... 省略部分代码
  
  @Bean
  public DefaultRedisScript<Boolean> lockRedisScript() {
    DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Boolean.class);
    defaultRedisScript.setScriptText(LOCK_SCRIPT);
    return defaultRedisScript;
  }

  @Bean
  public DefaultRedisScript<Long> unlockRedisScript() {
    DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Long.class);
    defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
    return defaultRedisScript;
  }
}
Salin selepas log masuk

再通过在AccountOperationThread类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate来调用了,改造之后的代码实现如下:

  private void deamonRedisLockWithLua() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
        // 等待1000毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      logger.error("异常", e);
    } finally {
      //使用Lua脚本:先判断是否是自己设置的锁,再执行删除
      // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0;
      Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
      logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result));
      if (RELEASE_SUCCESS.equals(result)) {
        if (daemonThread != null) {
          //关闭守护线程
          daemonThread.stop();
          logger.info(Thread.currentThread().getName() + ":释放锁---");
        }
      }
    }
  }
Salin selepas log masuk

其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。

Redis锁的其他因素

除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。

Redis锁的不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。

可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。

等待锁释放

有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:

  • 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
  • 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。

集群中的主备切换和脑裂

在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。

当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。

小结

通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。

同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。

Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。

源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

更多编程相关知识,请访问:编程入门!!

Atas ialah kandungan terperinci Mari bercakap tentang kunci teragih berasaskan Redis dalam sistem teragih. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:juejin.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan