Heim > Datenbank > Redis > Lassen Sie uns über Redis-basierte verteilte Sperren in verteilten Systemen sprechen

Lassen Sie uns über Redis-basierte verteilte Sperren in verteilten Systemen sprechen

青灯夜游
Freigeben: 2021-10-29 10:52:11
nach vorne
1620 Leute haben es durchsucht

Gesperrt, gibt es immer noch Probleme mit der Parallelität? RedisVerstehen Sie verteilte Sperren wirklich? Der folgende Artikel wird Sie über Redis-basierte verteilte Sperren in verteilten Systemen informieren. Ich hoffe, er wird Ihnen hilfreich sein!

Lassen Sie uns über Redis-basierte verteilte Sperren in verteilten Systemen sprechen

Bei neu übernommenen Projekten kommt es gelegentlich zu Problemen mit ungleichen Abrechnungen. Die Erklärung des vorherigen technischen Chefs vor der Abreise lautete: „Nach der Fehlerbehebung wurde die Ursache nicht gefunden. Danach war ich zu beschäftigt und habe es nicht gelöst. Es kann am Framework liegen ...

Jetzt ist das Projekt.“ geliefert wird, müssen solche Probleme gelöst werden. Nachdem wir die gesamte Buchhaltungsverarbeitungslogik geklärt hatten, fanden wir schließlich den Grund: Es wurde durch gleichzeitige Datenbankoperationen auf Hot-Konten verursacht. Lassen Sie uns zu diesem Thema über verteilte Sperren auf Basis von Redis in verteilten Systemen sprechen. Übrigens schlüsseln wir auch die Ursachen und Lösungen des Problems auf. [Verwandte Empfehlungen: Redis-Video-Tutorial]

Ursachenanalyse

Die Systemparallelität ist nicht hoch und es gibt heiße Konten, aber das ist nicht so ernst. Die Wurzel des Problems liegt im Design der Systemarchitektur, das künstlich Parallelität erzeugt. Das Szenario sieht wie folgt aus: Der Händler importiert stapelweise einen Datenstapel und das System führt eine Vorverarbeitung durch und erhöht oder verringert den Kontostand.

Zu diesem Zeitpunkt wird auch eine andere geplante Aufgabe das Konto scannen und aktualisieren. Darüber hinaus werden Vorgänge für dasselbe Konto auf verschiedene Systeme verteilt und es entstehen Hot-Accounts.

Um dieses Problem auf architektonischer Ebene zu lösen, können wir erwägen, das Buchhaltungssystem zu trennen und es zur Verarbeitung in einem System zu zentralisieren. Alle Datenbanktransaktionen und Ausführungssequenzen werden vom Buchhaltungssystem koordiniert und verarbeitet. Aus technischer Sicht können Hotspot-Konten durch einen Sperrmechanismus gesperrt werden.

In diesem Artikel wird die Implementierung verteilter Sperren für Hot-Accounts ausführlich erläutert.

Analyse von Sperren

In der Multithread-Umgebung von Java gibt es normalerweise mehrere Arten von Sperren, die verwendet werden können:

  • Sperren auf JVM-Speichermodellebene, häufig verwendete sind: synchronisiert, Sperre usw.;
  • Datenbanksperren wie optimistische Sperren, pessimistische Sperren usw.;
  • verteilte Sperren;

JVM-Sperren auf Speicherebene können die Sicherheit von Threads unter einem einzelnen Dienst gewährleisten, z. B. wenn mehrere Threads auf eine globale Variable zugreifen/diese ändern. Wenn das System jedoch in einem Cluster bereitgestellt wird, sind lokale Sperren auf JVM-Ebene machtlos.

Pessimistische Sperre und optimistische Sperre

Wie im obigen Fall ist das Hotspot-Konto eine gemeinsam genutzte Ressource im verteilten System. Wir verwenden normalerweise Datenbanksperre oder Verteilte Sperre, um das Problem zu lösen.

Datenbanksperren werden in optimistische Sperren und pessimistische Sperren unterteilt.

Pessimistische Sperre wird basierend auf der exklusiven Sperre implementiert, die von der Datenbank (InnoDB von MySQL) bereitgestellt wird. Bei der Durchführung von Transaktionsvorgängen fügt MySQL über die Anweisung „select...for update“ eine exklusive Sperre für jede Datenzeile im Abfrageergebnissatz hinzu, und andere Threads blockieren die Aktualisierungs- und Löschvorgänge des Datensatzes. Um die sequentielle Ausführung (Änderung) gemeinsam genutzter Ressourcen zu erreichen, wird bei der optimistischen Sperre davon ausgegangen, dass Daten im Allgemeinen keine Konflikte verursachen. Daher werden die Daten offiziell aktualisiert, wenn die Daten übermittelt werden aktualisiert. Überprüfen Sie, ob der Konflikt besteht. Bei einem Konflikt werden Ausnahmeinformationen an den Benutzer zurückgegeben, sodass dieser entscheiden kann, was zu tun ist. Optimistisches Sperren eignet sich für Szenarien, in denen mehr gelesen und weniger geschrieben wird, was den Durchsatz des Programms verbessern kann. Optimistisches Sperren wird normalerweise basierend auf dem Aufzeichnungsstatus oder dem Hinzufügen von Versionen implementiert.

Pessimistisches Sperrfehlerszenario

Im Projekt wurde eine pessimistische Sperre verwendet, aber die pessimistische Sperre schlug fehl. Dies ist auch ein häufiges Missverständnis bei der Verwendung pessimistischer Sperren. Lassen Sie uns es unten analysieren. Normale Verwendung des pessimistischen Sperrvorgangs:

Sperren Sie den Datensatz über „Auswählen“ zur Aktualisierung.

Berechnen Sie den neuen Kontostand, ändern Sie den Betrag und speichern Sie ihn.
  • Freigeben Sie die Sperre, nachdem die Ausführung abgeschlossen ist Verarbeitungsprozess bei häufigen Fehlern:
  • Kontostand abfragen und neuen Kontostand berechnen;
  • Datensatz durch Auswählen sperren;

Betrag ändern und speichern;

    Sperre freigeben, nachdem die Ausführung abgeschlossen ist; ;
  • Bei falschen Prozessen wie A- und B-Serviceanfragen beträgt der erreichte Saldo 100, A zieht 50 ab, B zieht 40 ab, dann sperrt A den Datensatz und aktualisiert die Datenbank auf 50, nachdem A die Sperre aufgehoben hat den Datensatz und aktualisiert die Datenbank auf 60. Offensichtlich hat Letzteres die Aktualisierungen von Ersterem überschrieben. Die Lösung besteht darin, den Umfang der Sperre zu erweitern und die Sperre voranzutreiben, bevor der neue Saldo berechnet wird.
  • Normalerweise übt pessimistisches Sperren großen Druck auf die Datenbank aus. In der Praxis wird es je nach Szenario normalerweise mit optimistischem Sperren oder verteiltem Sperren implementiert.
  • Kommen wir zum Punkt und sprechen wir über die Implementierung verteilter Sperren auf Basis von Redis.

Praktische Übung für verteilte Redis-Sperren

Hier werden Spring Boot-, Redis- und Lua-Skripte als Beispiele verwendet, um die Implementierung verteilter Sperren zu demonstrieren. Um die Verarbeitung zu vereinfachen, übernimmt Redis im Beispiel sowohl die Funktion einer verteilten Sperre als auch die Funktion einer Datenbank.

Szenariokonstruktion

In einer Clusterumgebung bestehen die grundlegenden Schritte darin, den Betrag desselben Kontos zu verwalten:

  • Lesen Sie den Benutzerbetrag aus der Datenbank;
  • Das Programm ändert den Betrag;
  • Dann speichern Sie den neuesten Betrag Menge in der Datenbank;

Das Folgende beginnt ohne Sperre und asynchrone Verarbeitung und leitet schrittweise die endgültige verteilte Sperre ab.

Grundlegende Integration und Klassenkonstruktion

Bereiten Sie eine grundlegende Geschäftsumgebung ohne Sperren vor.

Führen Sie zunächst relevante Abhängigkeiten im Spring Boot-Projekt ein:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
Nach dem Login kopieren

Die dem Konto entsprechende Entitätsklasse UserAccount:

public class UserAccount {

  //用户ID
  private String userId;
  //账户内金额
  private int amount;

  //添加账户金额
  public void addAmount(int amount) {
    this.amount = this.amount + amount;
  }
  // 省略构造方法和getter/setter 
}
Nach dem Login kopieren

Erstellen Sie eine Thread-Implementierungsklasse AccountOperationThread:

public class AccountOperationThread implements Runnable {

  private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);

  private static final Long RELEASE_SUCCESS = 1L;

  private String userId;

  private RedisTemplate<Object, Object> redisTemplate;

  public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
    this.userId = userId;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    noLock();
  }

  /**
   * 不加锁
   */
  private void noLock() {
    try {
      Random random = new Random();
      // 模拟线程进行业务处理
      TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //模拟数据库中获取用户账号
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
    // 金额+1
    userAccount.addAmount(1);
    logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
    //模拟存回数据库
    redisTemplate.opsForValue().set(userId, userAccount);
  }
}
Nach dem Login kopieren

Die Instanziierung von RedisTemplate wird an Spring Boot übergeben:

@Configuration
public class RedisConfig {

  @Bean
  public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    // 设置value的序列化规则和 key的序列化规则
    redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.afterPropertiesSet();
    return redisTemplate;
  }
}
Nach dem Login kopieren

Endlich , Bereiten Sie einen TestController vor, um die Multithread-Ausführung auszulösen:

@RestController
public class TestController {

  private final static Logger logger = LoggerFactory.getLogger(TestController.class);

  private static ExecutorService executorService = Executors.newFixedThreadPool(10);

  @Autowired
  private RedisTemplate<Object, Object> redisTemplate;

  @GetMapping("/test")
  public String test() throws InterruptedException {
    // 初始化用户user_001到Redis,账户金额为0
    redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
    // 开启10个线程进行同步测试,每个线程为账户增加1元
    for (int i = 0; i < 10; i++) {
      logger.info("创建线程i=" + i);
      executorService.execute(new AccountOperationThread("user_001", redisTemplate));
    }

    // 主线程休眠1秒等待线程跑完
    TimeUnit.MILLISECONDS.sleep(1000);
    // 查询Redis中的user_001账户
    UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
    logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
    return "success";
  }
}
Nach dem Login kopieren

Führen Sie das obige Programm aus. Normalerweise gibt es 10 Threads, jeder Thread fügt 1 hinzu und das Ergebnis sollte 10 sein. Wenn Sie es jedoch mehrmals ausführen, werden Sie feststellen, dass die Ergebnisse stark variieren und grundsätzlich kleiner als 10 sind.

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread  : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread  : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread  : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread  : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread  : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController      : user id : user_001 amount : 5
Nach dem Login kopieren

Am Beispiel des obigen Protokolls haben die ersten vier Threads den Wert alle auf 1 geändert, was bedeutet, dass die nächsten drei Threads die vorherigen Änderungen überschrieben haben, was dazu führte, dass das Endergebnis nicht 10, sondern nur 5 war. Das ist offensichtlich problematisch.

Implementierung der Redis-Synchronisationssperre

Angesichts der oben genannten Situation können wir sie in derselben JVM durch Thread-Sperre abschließen. In einer verteilten Umgebung können hier jedoch keine Redis-Synchronisationssperren implementiert werden.

Grundlegende Idee: Wenn der erste Thread eintritt, wird ein Datensatz in Redis eingegeben. Wenn nachfolgende Threads eine Anfrage stellen, wird beurteilt, ob der Datensatz in Redis vorhanden ist oder retourniert. Ist dies nicht der Fall, wird eine geschäftsmäßige Folgeverarbeitung durchgeführt.

  /**
   * 1.抢占资源时判断是否被锁。
   * 2.如未锁则抢占成功且加锁,否则等待锁释放。
   * 3.业务完成后释放锁,让给其它线程。
   * <p>
   * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。
   */
  private void redisLock() {
    Random random = new Random();
    try {
      TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    while (true) {
      Object lock = redisTemplate.opsForValue().get(userId + ":syn");
      if (lock == null) {
        // 获得锁 -> 加锁 -> 跳出循环
        logger.info(Thread.currentThread().getName() + ":获得锁");
        redisTemplate.opsForValue().set(userId + ":syn", "lock");
        break;
      }
      try {
        // 等待500毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }
Nach dem Login kopieren

Bestimmen Sie im while-Codeblock zunächst, ob die entsprechende Benutzer-ID in Redis vorhanden ist. Wenn sie nicht vorhanden ist, verlassen Sie die Schleife und warten Sie weiter.

Der obige Code scheint die Sperrfunktion zu implementieren, aber wenn das Programm ausgeführt wird, werden Sie feststellen, dass es immer noch Probleme mit der Parallelität gibt, als ob es nicht gesperrt wäre. Der Grund dafür ist, dass die Vorgänge des Erfassens und Sperrens nicht atomar sind. Beispielsweise stellen zwei Threads fest, dass die Sperren beide null sind, und sperren sie. Zu diesem Zeitpunkt besteht immer noch das Problem der Parallelität.

Redis Atomic Synchronization Lock

Angesichts der oben genannten Probleme kann der Prozess des Erwerbs und Sperrens atomisiert werden. Basierend auf der von spring-boot-data-redis bereitgestellten Atomisierungs-API kann Folgendes realisiert werden:

// 该方法使用了redis的指令:SETNX key value
// 1.key不存在,设置成功返回value,setIfAbsent返回true;
// 2.key存在,则设置失败返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);
Nach dem Login kopieren

Die atomare Operation der obigen Methode ist eine Kapselung des setnx-Befehls von Redis. Die Verwendung von setnx in Redis ist wie folgt:

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
Nach dem Login kopieren

Wenn mykey zum ersten Mal festgelegt wird und nicht vorhanden ist, wird 1 zurückgegeben, was darauf hinweist, dass die Einstellung erfolgreich ist. Wenn mykey zum zweiten Mal festgelegt wird, ist er bereits vorhanden, und 0 wird zurückgegeben, was anzeigt dass die Einstellung fehlgeschlagen ist. Fragen Sie den Wert, der mykey entspricht, erneut ab und Sie werden feststellen, dass es sich immer noch um den beim ersten Mal festgelegten Wert handelt. Mit anderen Worten: setnx von redis stellt sicher, dass ein eindeutiger Schlüssel nur von einem Dienst erfolgreich festgelegt werden kann.

Nachdem wir die obige API und die zugrunde liegenden Prinzipien verstanden haben, werfen wir einen Blick auf den Implementierungsmethodencode im Thread wie folgt:

  /**
   * 1.原子操作加锁
   * 2.竞争线程循环重试获得锁
   * 3.业务完成释放锁
   */
  private void atomicityRedisLock() {
    //Spring data redis 支持的原子性操作
    while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
      try {
        // 等待100毫秒重试获得锁
        TimeUnit.MILLISECONDS.sleep(100);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    logger.info(Thread.currentThread().getName() + ":获得锁");
    try {
      //模拟数据库中获取用户账号
      UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
      if (userAccount != null) {
        //设置金额
        userAccount.addAmount(1);
        logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
        //模拟存回数据库
        redisTemplate.opsForValue().set(userId, userAccount);
      }
    } finally {
      //释放锁
      redisTemplate.delete(userId + ":syn");
      logger.info(Thread.currentThread().getName() + ":释放锁");
    }
  }
Nach dem Login kopieren

Führen Sie den Code erneut aus und Sie werden feststellen, dass das Ergebnis korrekt ist, was bedeutet, dass der verteilte Thread dies kann erfolgreich gesperrt werden.

Deadlock der verteilten Redis-Sperre

Obwohl das Ausführungsergebnis des obigen Codes in Ordnung ist, können andere Threads niemals dazu in der Lage sein, wenn die Anwendung ungewöhnlich abstürzt und keine Zeit hat, die Methode zum Aufheben der Sperre endgültig auszuführen um das Schloss zu erhalten.

Die überladene Methode von setIfAbsent kann zu diesem Zeitpunkt verwendet werden:

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
Nach dem Login kopieren

Basierend auf dieser Methode kann die Ablaufzeit der Sperre festgelegt werden. Selbst wenn der Thread, der die Sperre erhalten hat, ausfällt, können auf diese Weise andere Threads die Sperre normal erhalten, nachdem die Daten in Redis abgelaufen sind.

Der Beispielcode lautet wie folgt:

private void atomicityAndExRedisLock() {
    try {
      //Spring data redis 支持的原子性操作,并设置5秒过期时间
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
          System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁--------");
      // 应用在这里宕机,进程退出,无法执行 finally;
      Thread.currentThread().interrupt();
      // 业务逻辑...
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁
      if (!Thread.currentThread().isInterrupted()) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁");
      }
    }
  }
Nach dem Login kopieren

Business-Timeout und Daemon-Thread

Der Timeout-Zeitraum von Redis wurde oben hinzugefügt, was das Problem zu lösen scheint, aber auch neue Probleme mit sich bringt.

Unter normalen Umständen kann Thread A beispielsweise das Geschäft innerhalb von 5 Sekunden abschließen, gelegentlich kann es jedoch auch länger als 5 Sekunden dauern. Wenn das Zeitlimit auf 5 Sekunden eingestellt ist, erhält Thread A die Sperre, die Verarbeitung der Geschäftslogik dauert jedoch 6 Sekunden. Zu diesem Zeitpunkt führt Thread A noch die normale Geschäftslogik aus und Thread B hat die Sperre erhalten. Wenn Thread A die Verarbeitung beendet, ist es möglich, die Sperre von Thread B aufzuheben.

Im obigen Szenario gibt es zwei Probleme:

  • Erstens können Thread A und Thread B gleichzeitig ausgeführt werden, was zu Parallelitätsproblemen führt.
  • Zweitens kann Thread A die Sperre von Thread B aufheben, was eine Reihe von Teufelskreisen verursacht.

Natürlich können Sie feststellen, ob die Sperre zu Thread A oder Thread B gehört, indem Sie den Wert in Redis festlegen. Eine sorgfältige Analyse zeigt jedoch, dass der Kern dieses Problems darin besteht, dass Thread A länger für die Ausführung der Geschäftslogik benötigt als das Sperrzeitlimit.

Dann gibt es zwei Lösungen:

  • Stellen Sie zunächst das Timeout so lange ein, dass der Geschäftscode ausgeführt werden kann, bevor die Sperre aufgehoben wird.
  • Zweitens fügen Sie einen Daemon-Thread für die Sperre hinzu, um zu verhindern, dass sie abläuft Sperren, die freigegeben, aber nicht freigegeben werden, verlängern die Zeit.

Die erste Methode erfordert in den meisten Fällen die zeitaufwändige Geschäftslogik der gesamten Bank, und das Zeitlimit ist festgelegt.

Die zweite Möglichkeit besteht darin, das Sperrzeitlimit mithilfe der folgenden Daemon-Thread-Methode dynamisch zu erhöhen.

public class DaemonThread implements Runnable {
  private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);

  // 是否需要守护 主线程关闭则结束守护线程
  private volatile boolean daemon = true;
  // 守护锁
  private String lockKey;

  private RedisTemplate<Object, Object> redisTemplate;

  public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
    this.lockKey = lockKey;
    this.redisTemplate = redisTemplate;
  }

  @Override
  public void run() {
    try {
      while (daemon) {
        long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
        // 剩余有效期小于1秒则续命
        if (time < 1000) {
          logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒");
          redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
        }
        TimeUnit.MILLISECONDS.sleep(300);
      }
      logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  // 主线程主动调用结束
  public void stop() {
    daemon = false;
  }
}
Nach dem Login kopieren

上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。

主线程中相关代码实现:

private void deamonRedisLock() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
        // 等待100毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
      String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
      if (value.equals(result)) {
        redisTemplate.delete(userId + ":syn");
        logger.info(Thread.currentThread().getName() + ":释放锁-----");
      }
      //关闭守护线程
      if (daemonThread != null) {
        daemonThread.stop();
      }
    }
  }
Nach dem Login kopieren

其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。

基于Lua脚本的实现

在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。

首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript对象,在RedisConfig配置类中添加如下实例化代码:

@Configuration
public class RedisConfig {

  //lock script
  private static final String LOCK_SCRIPT = " if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 " +
      " then redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) " +
      " return 1 " +
      " else return 0 end ";
  private static final String UNLOCK_SCRIPT = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call" +
      "(&#39;del&#39;, KEYS[1]) else return 0 end";

  // ... 省略部分代码
  
  @Bean
  public DefaultRedisScript<Boolean> lockRedisScript() {
    DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Boolean.class);
    defaultRedisScript.setScriptText(LOCK_SCRIPT);
    return defaultRedisScript;
  }

  @Bean
  public DefaultRedisScript<Long> unlockRedisScript() {
    DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(Long.class);
    defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
    return defaultRedisScript;
  }
}
Nach dem Login kopieren

再通过在AccountOperationThread类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate来调用了,改造之后的代码实现如下:

  private void deamonRedisLockWithLua() {
    //守护线程
    DaemonThread daemonThread = null;
    //Spring data redis 支持的原子性操作,并设置5秒过期时间
    String uuid = UUID.randomUUID().toString();
    String value = Thread.currentThread().getId() + ":" + uuid;
    try {
      while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
        // 等待1000毫秒重试获得锁
        logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
        TimeUnit.MILLISECONDS.sleep(1000);
      }
      logger.info(Thread.currentThread().getName() + ":获得锁----");
      // 开启守护线程
      daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
      Thread thread = new Thread(daemonThread);
      thread.start();
      // 业务逻辑执行10秒...
      TimeUnit.MILLISECONDS.sleep(10000);
    } catch (InterruptedException e) {
      logger.error("异常", e);
    } finally {
      //使用Lua脚本:先判断是否是自己设置的锁,再执行删除
      // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0;
      Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
      logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result));
      if (RELEASE_SUCCESS.equals(result)) {
        if (daemonThread != null) {
          //关闭守护线程
          daemonThread.stop();
          logger.info(Thread.currentThread().getName() + ":释放锁---");
        }
      }
    }
  }
Nach dem Login kopieren

其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。

Redis锁的其他因素

除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。

Redis锁的不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。

可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。

等待锁释放

有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:

  • 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
  • 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。

集群中的主备切换和脑裂

在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。

当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。

小结

通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。

同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。

Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。

源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

更多编程相关知识,请访问:编程入门!!

Das obige ist der detaillierte Inhalt vonLassen Sie uns über Redis-basierte verteilte Sperren in verteilten Systemen sprechen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:juejin.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage