Maison > base de données > Redis > le corps du texte

Parlons des verrous distribués basés sur Redis dans les systèmes distribués

青灯夜游
Libérer: 2021-10-29 10:52:11
avant
1560 Les gens l'ont consulté

Verrouillé, y a-t-il encore des problèmes de concurrence ? RedisComprenez-vous vraiment les verrous distribués ? L'article suivant vous parlera des verrous distribués basés sur Redis dans les systèmes distribués. J'espère qu'il vous sera utile !

Parlons des verrous distribués basés sur Redis dans les systèmes distribués

Dans les projets nouvellement repris, il y aura parfois des problèmes avec des comptes inégaux. L'explication donnée par le précédent patron technique avant de partir était : Après dépannage, la cause n'a pas été trouvée. Après ça, j'étais trop occupé et je ne l'ai pas résolu c'est peut-être à cause du framework...

Maintenant que le projet. est livré, ces problèmes doivent être résolus. Après avoir trié toute la logique de traitement comptable, nous avons finalement trouvé la raison : cela était dû à des opérations simultanées de base de données sur des comptes chauds. Concernant ce problème, parlons des verrous distribués basés sur Redis dans les systèmes distribués. À propos, nous analysons également les causes et les solutions au problème. [Recommandations associées : Tutoriel vidéo Redis]

Analyse des causes

La concurrence du système n'est pas élevée et il y a des comptes chauds, mais ce n'est pas si grave. La racine du problème réside dans la conception de l’architecture du système, qui crée artificiellement la concurrence. Le scénario est le suivant : le commerçant importe un lot de données par lots, et le système effectuera un prétraitement et augmentera ou diminuera le solde du compte.

À ce moment, une autre tâche planifiée analysera et mettra également à jour le compte. De plus, les opérations sur un même compte sont réparties entre différents systèmes et des comptes chauds apparaissent.

Pour résoudre ce problème, du niveau architectural, nous pouvons envisager de détacher le système comptable et de le centraliser dans un seul système de traitement. Toutes les transactions de la base de données et les séquences d'exécution seront coordonnées et traitées par le système comptable. D'un point de vue technique, les comptes hotspot peuvent être verrouillés via un mécanisme de verrouillage.

Cet article explique en détail la mise en œuvre des verrous distribués pour les comptes chauds.

Analyse des verrous

Dans l'environnement multithread de Java, il existe généralement plusieurs types de verrous qui peuvent être utilisés :

  • Les verrous de niveau de modèle de mémoire JVM, les plus couramment utilisés sont : synchronisé, verrou, etc.
  • Les verrous de base de données, tels que les verrous optimistes, les verrous pessimistes, etc. ;
  • les verrous distribués ;
Les verrous au niveau de la mémoire JVM peuvent garantir la sécurité des threads sous un seul service, comme plusieurs threads accédant/modifiant une variable globale. Mais lorsque le système est déployé dans un cluster, les verrous locaux au niveau de la JVM sont impuissants.

Verrouillage pessimiste et verrouillage optimiste

Comme dans le cas ci-dessus, le compte hotspot est une ressource partagée dans le système distribué. Nous utilisons généralement le

Verrouillage de la base de données ou le Verrouillage distribué pour le résoudre.

Les verrous de base de données sont divisés en

verrous optimistes et verrous pessimistes.

Le verrouillage pessimiste est implémenté sur la base du verrouillage exclusif fourni par la base de données (InnoDB de MySQL). Lors de l'exécution d'opérations de transaction, via l'instruction select...for update, MySQL ajoutera un verrou exclusif à chaque ligne de données dans l'ensemble de résultats de la requête, et d'autres threads bloqueront les opérations de mise à jour et de suppression de l'enregistrement. Afin de réaliser l'exécution séquentielle (modification) des ressources partagées ;

Le verrouillage optimiste est relatif au verrouillage pessimiste. Le verrouillage optimiste suppose que les données ne provoqueront généralement pas de conflits, donc les données seront officiellement mises à jour lorsque les données seront soumises et mis à jour. Vérifiez si le conflit existe. En cas de conflit, les informations d'exception sont renvoyées à l'utilisateur, lui permettant de décider quoi faire. Le verrouillage optimiste convient aux scénarios dans lesquels il y a plus de lecture et moins d'écriture, ce qui peut améliorer le débit du programme. Le verrouillage optimiste est généralement mis en œuvre en fonction de l'état d'enregistrement ou de l'ajout de versions.

Scénario d'échec du verrouillage pessimiste

Un verrouillage pessimiste a été utilisé dans le projet, mais le verrouillage pessimiste a échoué. Il s’agit également d’un malentendu courant lors de l’utilisation du verrouillage pessimiste. Analysons-le ci-dessous.

Utilisation normale du processus de verrouillage pessimiste :

    Verrouillez l'enregistrement via la sélection... pour la mise à jour ;
  • Calculez le nouveau solde, modifiez le montant et stockez-le
  • Relâchez le verrou une fois l'exécution terminée ; Processus de traitement des erreurs fréquentes :
Interrogez le solde du compte et calculez le nouveau solde ;

Verrouillez l'enregistrement en sélectionnant... pour la mise à jour
  • Modifiez le montant et stockez-le
  • Relâchez le verrou une fois l'exécution terminée ; ;
  • Dans les mauvais processus, tels que les demandes de service A et B, les soldes obtenus sont tous de 100, A déduit 50, B déduit 40, puis A verrouille l'enregistrement et met à jour la base de données à 50 après que A a libéré le verrou, B verrouille ; l'enregistrement et met à jour la base de données à 60. Evidemment, ce dernier a écrasé les mises à jour du premier. La solution consiste à élargir la portée du verrou et à avancer le verrou avant de calculer le nouveau solde.
  • Les verrous généralement pessimistes exercent beaucoup de pression sur la base de données. En pratique, des verrous optimistes ou des verrous distribués sont généralement utilisés selon le scénario.

Entrons dans le vif du sujet et parlons de la mise en œuvre du verrouillage distribué basé sur Redis.

Exercice pratique sur le verrouillage distribué Redis

Ici, les scripts Spring Boot, Redis et Lua sont utilisés comme exemples pour démontrer la mise en œuvre de verrous distribués. Afin de simplifier le traitement, Redis dans l'exemple assume à la fois la fonction de verrouillage distribué et la fonction de base de données.

Construction de scénarios

Dans un environnement de cluster, les étapes de base sont d'exploiter le montant du même compte :

  • Lire le montant de l'utilisateur dans la base de données
  • Le programme modifie le montant
  • Puis stocker le dernier ; montant dans la base de données ;

Ce qui suit commence sans verrouillage ni traitement asynchrone, et en déduit progressivement le verrou distribué final.

Intégration de base et construction de classe

Préparez un environnement commercial de base sans verrouillage.

Introduisez d'abord les dépendances pertinentes dans le projet Spring Boot :

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
Copier après la connexion

La classe d'entité correspondante au compte UserAccount :

public class UserAccount {

  //用户ID
  private String userId;
  //账户内金额
  private int amount;

  //添加账户金额
  public void addAmount(int amount) {
    this.amount = this.amount + amount;
  }
  // 省略构造方法和getter/setter 
}
Copier après la connexion

Créez une classe d'implémentation de thread AccountOperationThread :

public class AccountOperationThread implements Runnable {

  private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

  private static final Long RELEASE_SUCCESS = 1L;

  private String userId;

  private RedisTemplate<Object, Object> redisTemplate;

  public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
    this.userId = userId;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    noLock();
  }

  /**
   * 不加锁
   */
  private void noLock() {
    try {
      Random random = new Random();
      // 模拟线程进行业务处理
      TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //模拟数据库中获取用户账号
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
    // 金额+1
    userAccount.addAmount(1);
    logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
    //模拟存回数据库
    redisTemplate.opsForValue().set(userId, userAccount);
  }
}
Copier après la connexion

L'instanciation de RedisTemplate est transmise à Spring Boot :

@Configuration
public class RedisConfig {

  @Bean
  public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    // 设置value的序列化规则和 key的序列化规则
    redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.afterPropertiesSet();
    return redisTemplate;
  }
}
Copier après la connexion

Enfin , Préparez un TestController pour déclencher l'exécution multi-thread :

@RestController
public class TestController {

  private final static Logger logger = LoggerFactory.getLogger(TestController.class);

  private static ExecutorService executorService = Executors.newFixedThreadPool(10);

  @Autowired
  private RedisTemplate<Object, Object> redisTemplate;

  @GetMapping("/test")
  public String test() throws InterruptedException {
    // 初始化用户user_001到Redis,账户金额为0
    redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
    // 开启10个线程进行同步测试,每个线程为账户增加1元
    for (int i = 0; i < 10; i++) {
      logger.info("创建线程i=" + i);
      executorService.execute(new AccountOperationThread("user_001", redisTemplate));
    }

    // 主线程休眠1秒等待线程跑完
    TimeUnit.MILLISECONDS.sleep(1000);
    // 查询Redis中的user_001账户
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
    logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
    return "success";
  }
}
Copier après la connexion

Exécutez le programme ci-dessus Normalement, il y a 10 threads, chaque thread en ajoute 1 et le résultat devrait être 10. Mais si vous l’exécutez plusieurs fois, vous constaterez que les résultats varient considérablement et sont fondamentalement inférieurs à 10.

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5
Copier après la connexion

En prenant le journal ci-dessus comme exemple, les quatre premiers threads ont tous changé la valeur à 1, ce qui signifie que les trois threads suivants ont écrasé les modifications précédentes, ce qui fait que le résultat final n'est pas 10, mais seulement 5. C'est évidemment problématique.

Implémentation du verrouillage de synchronisation Redis

Compte tenu de la situation ci-dessus, dans la même JVM, nous pouvons le compléter via le verrouillage des threads. Cependant, dans un environnement distribué, les verrous de niveau JVM ne peuvent pas être implémentés ici.

Idée de base : lorsque le premier thread entre, un enregistrement est entré dans Redis. Lorsque les threads suivants viennent demander, il est jugé si l'enregistrement existe dans Redis. S'il existe, cela signifie qu'il est dans un état verrouillé et attend. ou revient. S'il n'existe pas, un traitement commercial ultérieur sera effectué.

  /**
   * 1.抢占资源时判断是否被锁。
   * 2.如未锁则抢占成功且加锁,否则等待锁释放。
   * 3.业务完成后释放锁,让给其它线程。
   * <p>
   * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。
   */
  private void redisLock() {
    Random random = new Random();
    try {
      TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    while (true) {
      Object lock = redisTemplate.opsForValue().get(userId + ":syn");
      if (lock == null) {
        // 获得锁 -> 加锁 -> 跳出循环
        logger.info(Thread.currentThread().getName() + ":获得锁");
        redisTemplate.opsForValue().set(userId + ":syn", "lock");
        break;
      }
      try {
        // 等待500毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }
Copier après la connexion

Dans le bloc de code while, déterminez d'abord si l'ID utilisateur correspondant existe dans Redis. S'il n'existe pas, définissez le verrou. S'il existe, sortez de la boucle et continuez d'attendre.

Le code ci-dessus semble implémenter la fonction de verrouillage, mais lorsque le programme est exécuté, vous constaterez qu'il y a toujours des problèmes de concurrence comme s'il n'était pas verrouillé. La raison est la suivante : les opérations d’acquisition et de verrouillage ne sont pas atomiques. Par exemple, deux threads constatent que les verrous sont tous deux nuls et les verrouillent. À ce stade, le problème de concurrence existe toujours.

Redis Atomic Synchronization Lock

Compte tenu des problèmes ci-dessus, le processus d'acquisition et de verrouillage peut être atomisé. Sur la base de l'API d'atomisation fournie par spring-boot-data-redis, cela peut être réalisé :

// 该方法使用了redis的指令:SETNX key value
// 1.key不存在,设置成功返回value,setIfAbsent返回true;
// 2.key存在,则设置失败返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);
Copier après la connexion

L'opération atomique de la méthode ci-dessus est une encapsulation de la commande setnx de Redis. L'utilisation de setnx dans Redis est la suivante :

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
Copier après la connexion

Pour la première fois, lors de la définition de ma clé, si elle n'existe pas, 1 sera renvoyé, indiquant que la configuration est réussie ; lorsque ma clé est définie pour la deuxième fois, elle existe déjà, et 0 sera renvoyé, indiquant que le réglage a échoué. Recherchez à nouveau la valeur correspondant à mykey et vous constaterez qu'il s'agit toujours de la valeur définie pour la première fois. En d'autres termes, setnx de Redis garantit qu'une clé unique ne peut être définie avec succès que par un seul service.

Après avoir compris l'API ci-dessus et les principes sous-jacents, examinons le code de la méthode d'implémentation dans le fil comme suit :

  /**
   * 1.原子操作加锁
   * 2.竞争线程循环重试获得锁
   * 3.业务完成释放锁
   */
  private void atomicityRedisLock() {
    //Spring data redis 支持的原子性操作
    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
      try {
        // 等待100毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    logger.info(Thread.currentThread().getName() + ":获得锁");
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }
Copier après la connexion

Exécutez à nouveau le code et vous constaterez que le résultat est correct, ce qui signifie que le thread distribué peut être verrouillé avec succès.

Interblocage du verrouillage distribué Redis

Bien que le résultat de l'exécution du code ci-dessus soit correct, si l'application plante anormalement et n'a pas le temps d'exécuter finalement la méthode de libération du verrou, alors les autres threads ne pourront jamais pour obtenir la serrure.

La méthode surchargée de setIfAbsent peut être utilisée à ce moment :

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
Copier après la connexion

Sur la base de cette méthode, le délai d'expiration du verrou peut être défini. De cette façon, même si le thread qui a obtenu le verrou tombe en panne, d'autres threads peuvent obtenir le verrou normalement après l'expiration des données dans Redis.

L'exemple de code est le suivant :

private void atomicityAndExRedisLock() {
    try {
      //Spring data redis 支持的原子性操作,并设置5秒过期时间
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
          System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁--------");
      // 应用在这里宕机,进程退出,无法执行 finally;
      Thread.currentThread().interrupt();
      // 业务逻辑...
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁
      if (!Thread.currentThread().isInterrupted()) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁");
      }
    }
  }
Copier après la connexion

Business timeout et démon thread

Le délai d'attente de Redis est ajouté ci-dessus, ce qui semble résoudre le problème, mais introduit également de nouveaux problèmes.

Par exemple, dans des circonstances normales, le fil A peut terminer l'affaire en 5 secondes, mais cela peut parfois prendre plus de 5 secondes. Si le délai d'attente est défini sur 5 secondes, le thread A obtient le verrou, mais le traitement de la logique métier prend 6 secondes. À l'heure actuelle, le thread A exécute toujours une logique métier normale et le thread B a obtenu le verrou. Lorsque le thread A termine le traitement, il est possible de libérer le verrou du thread B.

Il y a deux problèmes dans le scénario ci-dessus :

  • Premièrement, le thread A et le thread B peuvent être exécutés en même temps, provoquant des problèmes de concurrence.
  • Deuxièmement, le thread A peut libérer le verrou du thread B, conduisant à une série de cercles vicieux.

Bien sûr, vous pouvez déterminer si le verrou appartient au thread A ou au thread B en définissant la valeur dans Redis. Mais une analyse minutieuse révélera que l'essence de ce problème est que le thread A met plus de temps à exécuter la logique métier que le délai d'expiration du verrouillage.

Ensuite, il existe deux solutions :

  • Premièrement, définissez le délai d'attente suffisamment long pour garantir que le code métier peut être exécuté avant que le verrou ne soit libéré. ​​
  • Deuxièmement, ajoutez un thread démon pour le verrou afin de l'empêcher d'expirer. les verrous qui sont libérés mais non libérés augmentent le temps ;

La première méthode nécessite dans la plupart des cas la logique métier fastidieuse de l'ensemble de la banque, et le délai d'attente est défini.

La deuxième méthode consiste à augmenter dynamiquement le délai d'expiration du verrouillage via la méthode de thread démon suivante.

public class DaemonThread implements Runnable {
  private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

  // 是否需要守护 主线程关闭则结束守护线程
  private volatile boolean daemon = true;
  // 守护锁
  private String lockKey;

  private RedisTemplate<Object, Object> redisTemplate;

  public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
    this.lockKey = lockKey;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    try {
      while (daemon) {
        long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
        // 剩余有效期小于1秒则续命
        if (time < 1000) {
          logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒");
          redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
        }
        TimeUnit.MILLISECONDS.sleep(300);
      }
      logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  // 主线程主动调用结束
  public void stop() {
    daemon = false;
  }
}
Copier après la connexion

上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。

主线程中相关代码实现:

private void deamonRedisLock() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
      String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
      if (value.equals(result)) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁-----");
      }
      //关闭守护线程
      if (daemonThread != null) {
        daemonThread.stop();
      }
    }
  }
Copier après la connexion

其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。

基于Lua脚本的实现

在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。

首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript对象,在RedisConfig配置类中添加如下实例化代码:

@Configuration
public class RedisConfig {

  //lock script
  private static final String LOCK_SCRIPT = " if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 " +
      " then redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) " +
      " return 1 " +
      " else return 0 end ";
  private static final String UNLOCK_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call" +
      "(&#39;del&#39;, KEYS[1]) else return 0 end";

  // ... 省略部分代码
  
  @Bean
  public DefaultRedisScript<Boolean> lockRedisScript() {
    DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Boolean.class);
    defaultRedisScript.setScriptText(LOCK_SCRIPT);
    return defaultRedisScript;
  }

  @Bean
  public DefaultRedisScript<Long> unlockRedisScript() {
    DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Long.class);
    defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
    return defaultRedisScript;
  }
}
Copier après la connexion

再通过在AccountOperationThread类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate来调用了,改造之后的代码实现如下:

  private void deamonRedisLockWithLua() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
        // 等待1000毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      logger.error("异常", e);
    } finally {
      //使用Lua脚本:先判断是否是自己设置的锁,再执行删除
      // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0;
      Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
      logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result));
      if (RELEASE_SUCCESS.equals(result)) {
        if (daemonThread != null) {
          //关闭守护线程
          daemonThread.stop();
          logger.info(Thread.currentThread().getName() + ":释放锁---");
        }
      }
    }
  }
Copier après la connexion

其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。

Redis锁的其他因素

除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。

Redis锁的不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。

可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。

等待锁释放

有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:

  • 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
  • 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。

集群中的主备切换和脑裂

在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。

当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。

小结

通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。

同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。

Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。

源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

更多编程相关知识,请访问:编程入门!!

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:juejin.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal