分散システムにおける Redis ベースの分散ロックについて話しましょう
ロックされていますが、同時実行の問題はありますか? Redis分散ロックを本当に理解していますか?次の記事では、分散システムにおける Redis ベースの分散ロックについて説明します。
#新しく引き継いだプロジェクトでは、アカウントが不均等であるという問題が発生することがあります。退職前の前任技術上司の説明は、「調査した結果、原因が見つからなかった。その後、忙しくて解決できなかった。フレームワークのせいかもしれない…」でした。 #プロジェクトが納品された今、このような問題を解決する必要があります。すべての会計処理ロジックを整理した結果、最終的にその原因がわかりました。ホット アカウントでのデータベースの同時操作が原因でした。この問題に関して、分散システムにおける Redis に基づく分散ロックについて話しましょう。ちなみに、問題の原因と解決策も詳しく説明します。 [関連する推奨事項:
Redis ビデオ チュートリアル 原因分析システムの同時実行性は高くなく、ホット アカウントが存在しますが、そこまで深刻ではありません。問題の根本は、人為的に同時実行性を生み出すシステム アーキテクチャ設計にあります。シナリオは次のとおりです。販売者はデータのバッチをバッチでインポートし、システムは前処理を実行してアカウント残高を増減します。
この時点で、スケジュールされた別のタスクもアカウントをスキャンして更新します。さらに、同じアカウントに対する操作がさまざまなシステムに分散され、ホットアカウントが出現します。
この問題をアーキテクチャ レベルで解決するには、会計システムを切り離し、1 つのシステムに集中して処理することを検討できます。すべてのデータベース トランザクションと実行シーケンスは会計システムによって調整され、処理されます。技術的な観点から見ると、ホットスポット アカウントはロック メカニズムを通じてロックできます。
この記事では、ホット アカウントの分散ロックの実装について詳しく説明します。
ロックの分析Java のマルチスレッド環境では、通常、使用できるロックのタイプがいくつかあります。
JVMメモリ モデル レベルのロック。一般的に使用されるロックには、同期ロックなど;- データベース ロック (楽観的ロック、悲観的ロックなど);
- 分散ロック;
- JVM メモリ レベル ロックは、複数のスレッドがグローバル変数にアクセス/変更する場合など、単一サービスの下でスレッド セキュリティを確保できます。ただし、システムがクラスターにデプロイされている場合、JVM レベルのローカル ロックは無力です。
上記の場合と同様、ホット アカウントは分散システム内の共有リソースであり、通常は
データベース ロック#を使用します。 ## または分散ロック を使用して問題を解決します。 データベース ロックは、楽観的ロック
と悲観的ロックに分割されます。 悲観的ロック
は、データベース (Mysql の InnoDB) によって提供される排他的ロックに基づいて実装されます。 select ... for update ステートメントを通じてトランザクション操作を実行するとき、MySQL はクエリ結果セット内のデータの各行に排他的ロックを追加し、他のスレッドはレコードの更新および削除操作をブロックします。共有リソースの順次実行 (変更) を実現するために、オプティミスティック ロック
は悲観的ロックに対して相対的なものです。オプティミスティック ロックは、通常、データが競合を引き起こさないことを前提としているため、データは送信および更新されて初めて、データの競合が正式に検出されます。競合がある場合、例外情報がユーザーに返され、ユーザーが何をすべきかを決定できるようになります。オプティミスティック ロックは、読み取りが多く書き込みが少ないシナリオに適しており、プログラムのスループットを向上させることができます。楽観的ロックは通常、記録ステータスまたはバージョンの追加に基づいて実装されます。悲観的ロックの失敗シナリオ
プロジェクトで悲観的ロックが使用されましたが、悲観的ロックは失敗しました。これは悲観的ロックを使用するときによくある誤解でもあるので、以下で分析してみましょう。
悲観的ロック プロセスの通常の使用:更新のために選択...を通じてレコードをロックします;
- 新しい残高を計算し、金額を変更して保存します。 ;
- 実行完了後にロックを解除する;
- よくあるミスの処理手順:
口座残高を照会し、新しい残高を計算する;
- 更新ロック レコードの選択 .. . を通じて;
- 量を変更して保存します;
- 実行完了後にロックを解放します;
- #間違ったプロセス (A サービスと B サービスによってクエリされる内容など) 残高はすべて 100、A は 50 を差し引き、B は 40 を差し引き、A はレコードをロックしてデータベースを 50 に更新します。A がロックを解放した後、Bレコードをロックし、データベースを 60 に更新します。明らかに、後者は前者の更新を上書きしています。解決策は、ロックの範囲を拡張し、新しい残高を計算する前にロックを進めることです。
- 通常、悲観的ロックはデータベースに多大な負荷をかけますが、実際にはシナリオに応じて楽観的ロックまたは分散ロックが使用されます。
本題に入り、Redis に基づく分散ロックの実装について話しましょう。
Redis 分散ロックの実践演習ここでは、Spring Boot、Redis、Lua スクリプトを例として、分散ロックの実装を示します。処理を簡略化するために、例の Redis は分散ロックの機能とデータベースの機能の両方を想定しています。 シナリオ構築
クラスタ環境で同一アカウントの金額を操作する基本手順:
- データベースからユーザー金額を読み込み、同一アカウントの金額を操作します。
- プログラムは金額を変更します;
- その後、最新の金額がデータベースに保存されます;
以下はロックせず、最初から同期処理します。徐々に最終的な分布ロックを推測します。
基本的な統合とクラス構築
ロックのない基本的なビジネス環境を準備します。
まず、関連する依存関係を Spring Boot プロジェクトに導入します:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
ログイン後にコピーアカウントに対応するエンティティ クラス UserAccount:
public class UserAccount {
//用户ID
private String userId;
//账户内金额
private int amount;
//添加账户金额
public void addAmount(int amount) {
this.amount = this.amount + amount;
}
// 省略构造方法和getter/setter
}
ログイン後にコピースレッド実装クラス AccountOperationThread を作成します:
public class AccountOperationThread implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class);
private static final Long RELEASE_SUCCESS = 1L;
private String userId;
private RedisTemplate<Object, Object> redisTemplate;
public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
this.userId = userId;
this.redisTemplate = redisTemplate;
}
@Override
public void run() {
noLock();
}
/**
* 不加锁
*/
private void noLock() {
try {
Random random = new Random();
// 模拟线程进行业务处理
TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟数据库中获取用户账号
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
// 金额+1
userAccount.addAmount(1);
logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模拟存回数据库
redisTemplate.opsForValue().set(userId, userAccount);
}
}
ログイン後にコピー RedisTemplate のインスタンス化が Spring Boot に渡されます:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
ログイン後にコピー最後に、マルチスレッド操作をトリガーする TestController を準備します:
@RestController
public class TestController {
private final static Logger logger = LoggerFactory.getLogger(TestController.class);
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@GetMapping("/test")
public String test() throws InterruptedException {
// 初始化用户user_001到Redis,账户金额为0
redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
// 开启10个线程进行同步测试,每个线程为账户增加1元
for (int i = 0; i < 10; i++) {
logger.info("创建线程i=" + i);
executorService.execute(new AccountOperationThread("user_001", redisTemplate));
}
// 主线程休眠1秒等待线程跑完
TimeUnit.MILLISECONDS.sleep(1000);
// 查询Redis中的user_001账户
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001");
logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
return "success";
}
}
ログイン後にコピー上記のプログラムを実行します (通常は 10 スレッド)。各スレッドは追加します。 1 の場合、結果は 10 になるはずです。しかし、何度か実行すると、結果は大きく異なり、基本的には 10 未満であることがわかります。
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
ログイン後にコピー 上記のログを例にとると、最初の 4 つのスレッドはすべて値を 1 に変更しました。これは、次の 3 つのスレッドが以前の変更を上書きし、最終結果が 10 ではなく 5 だけになったことを意味します。 。これには明らかに問題があります。
Redis 同期ロックの実装
上記の状況を考慮すると、同じ JVM 内でスレッド ロックを通じて完了できます。ただし、分散環境では JVM レベルのロックを実装できませんが、ここでは Redis 同期ロックを使用できます。
基本的な考え方: 最初のスレッドが入ったとき、Redis にレコードが入ります。後続のスレッドがリクエストしてきたときに、そのレコードが Redis に存在するかどうかを判断します。存在する場合は、そのレコードが存在することを意味します。ロック状態になり、待機または戻ります。存在しない場合は、後続の業務処理が行われます。
/**
* 1.抢占资源时判断是否被锁。
* 2.如未锁则抢占成功且加锁,否则等待锁释放。
* 3.业务完成后释放锁,让给其它线程。
* <p>
* 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。
*/
private void redisLock() {
Random random = new Random();
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
Object lock = redisTemplate.opsForValue().get(userId + ":syn");
if (lock == null) {
// 获得锁 -> 加锁 -> 跳出循环
logger.info(Thread.currentThread().getName() + ":获得锁");
redisTemplate.opsForValue().set(userId + ":syn", "lock");
break;
}
try {
// 等待500毫秒重试获得锁
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//模拟数据库中获取用户账号
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
if (userAccount != null) {
//设置金额
userAccount.addAmount(1);
logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模拟存回数据库
redisTemplate.opsForValue().set(userId, userAccount);
}
} finally {
//释放锁
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
ログイン後にコピーwhile コード ブロックでは、まず対応するユーザー ID が Redis に存在するかどうかを確認し、存在しない場合はセット ロックを実行し、存在する場合はループを抜けて待機を続けます。
上記のコードはロック機能を実装しているように見えますが、プログラムを実行すると、ロックされていない場合と同様に同時実行の問題が依然として存在することがわかります。その理由は、取得とロックの操作がアトミックではないためです。たとえば、2 つのスレッドがロックが両方とも null であることを検出してロックするとしますが、この時点でも同時実行の問題は依然として存在します。
Redis アトミック同期ロック
上記の問題に対処するために、取得とロックのプロセスをアトミック化できます。これは、spring-boot-data-redis によって提供されるアトマイゼーション API に基づいて実装できます:
// 该方法使用了redis的指令:SETNX key value
// 1.key不存在,设置成功返回value,setIfAbsent返回true;
// 2.key存在,则设置失败返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);
ログイン後にコピー上記のメソッドのアトマイゼーション操作は、Redis の setnx コマンドのカプセル化です。Redis での setnx の使用は次のとおりです。次のように:
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
ログイン後にコピーmykey を初めて設定するときは、mykey が存在しないため、設定が成功したことを示す 1 が返されます。2 回目に mykey が設定されると、mykey はすでに存在し、0 が返されます。が返され、設定が失敗したことが示されます。 mykey に対応する値を再度クエリすると、それがまだ最初に設定された値であることがわかります。言い換えれば、redis の setnx は、一意のキーが 1 つのサービスによってのみ正常に設定されることを保証します。
上記の API と基礎となる原則を理解した後、次のようにスレッド内の実装メソッド コードを見てみましょう:
/**
* 1.原子操作加锁
* 2.竞争线程循环重试获得锁
* 3.业务完成释放锁
*/
private void atomicityRedisLock() {
//Spring data redis 支持的原子性操作
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
try {
// 等待100毫秒重试获得锁
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info(Thread.currentThread().getName() + ":获得锁");
try {
//模拟数据库中获取用户账号
UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId);
if (userAccount != null) {
//设置金额
userAccount.addAmount(1);
logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模拟存回数据库
redisTemplate.opsForValue().set(userId, userAccount);
}
} finally {
//释放锁
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
ログイン後にコピーコードを再度実行すると、結果が正しいことがわかります。これは、配布が正常に完了できることを意味します。スレッドはロックされています。
Redis 分散ロックのデッドロック
上記のコードの実行結果は問題ありませんが、アプリケーションが異常終了すると、メソッドを実行する時間がなくなります。最終的にロックを解放すると、他のスレッドはこのロックを取得できなくなります。
この時点で、オーバーロードされた setIfAbsent メソッドを使用できます:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
ログイン後にコピーこのメソッドに基づいて、ロックの有効期限を設定できます。これにより、ロックを取得したスレッドがダウンした場合でも、Redis 内のデータの有効期限が切れた後は、他のスレッドが正常にロックを取得できます。
サンプルコードは以下の通りです。
private void atomicityAndExRedisLock() {
try {
//Spring data redis 支持的原子性操作,并设置5秒过期时间
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
// 等待100毫秒重试获得锁
logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + ":获得锁--------");
// 应用在这里宕机,进程退出,无法执行 finally;
Thread.currentThread().interrupt();
// 业务逻辑...
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
if (!Thread.currentThread().isInterrupted()) {
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁");
}
}
}
ログイン後にコピービジネスタイムアウトとデーモンスレッド
上にRedisのタイムアウト時間を追加することで解決するようです問題は解決しましたが、新たな問題が発生しました。
たとえば、通常の状況では、スレッド A は 5 秒以内にビジネスを完了できますが、場合によっては 5 秒以上かかることがあります。タイムアウトが 5 秒に設定されている場合、スレッド A がロックを取得しますが、ビジネス ロジックの処理には 6 秒かかります。この時点では、スレッド A はまだ通常のビジネス ロジックを実行しており、スレッド B はロックを取得しています。スレッド A の処理が終了すると、スレッド B のロックを解放できます。
上記のシナリオには 2 つの問題があります。
- まず、スレッド A とスレッド B が同時に実行される可能性があり、同時実行の問題が発生します。
- 2 番目に、スレッド A がスレッド B のロックを解放し、一連の悪循環を引き起こす可能性があります。
もちろん、Redis に値を設定することで、ロックがスレッド A に属するかスレッド B に属するかを判断できます。ただし、注意深く分析すると、この問題の本質は、スレッド A がビジネス ロジックの実行にロック タイムアウトよりも長い時間がかかることであることがわかります。
その場合、解決策は 2 つあります。
- まず、ロックが解放される前にビジネス コードが実行できるように、タイムアウトを十分な長さに設定します。
- 2 番目、ロックのデーモン スレッドを追加し、有効期限が近づいているが解放されていないロックの時間を追加します。
最初の方法では、ほとんどの場合、銀行全体の時間のかかるビジネス ロジックが必要です。タイムアウト設定。
2 番目の方法は、次のデーモン スレッド メソッドを通じてロック タイムアウトを動的に増加することです。
public class DaemonThread implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class);
// 是否需要守护 主线程关闭则结束守护线程
private volatile boolean daemon = true;
// 守护锁
private String lockKey;
private RedisTemplate<Object, Object> redisTemplate;
public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
this.lockKey = lockKey;
this.redisTemplate = redisTemplate;
}
@Override
public void run() {
try {
while (daemon) {
long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
// 剩余有效期小于1秒则续命
if (time < 1000) {
logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒");
redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
}
TimeUnit.MILLISECONDS.sleep(300);
}
logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 主线程主动调用结束
public void stop() {
daemon = false;
}
}
ログイン後にコピー上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。
主线程中相关代码实现:
private void deamonRedisLock() {
//守护线程
DaemonThread daemonThread = null;
//Spring data redis 支持的原子性操作,并设置5秒过期时间
String uuid = UUID.randomUUID().toString();
String value = Thread.currentThread().getId() + ":" + uuid;
try {
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
// 等待100毫秒重试获得锁
logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + ":获得锁----");
// 开启守护线程
daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
Thread thread = new Thread(daemonThread);
thread.start();
// 业务逻辑执行10秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲
String result = (String) redisTemplate.opsForValue().get(userId + ":syn");
if (value.equals(result)) {
redisTemplate.delete(userId + ":syn");
logger.info(Thread.currentThread().getName() + ":释放锁-----");
}
//关闭守护线程
if (daemonThread != null) {
daemonThread.stop();
}
}
}
ログイン後にコピー其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。
基于Lua脚本的实现
在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。
首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript
对象,在RedisConfig
配置类中添加如下实例化代码:
@Configuration
public class RedisConfig {
//lock script
private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " +
" then redis.call('expire',KEYS[1],ARGV[2]) " +
" return 1 " +
" else return 0 end ";
private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" +
"('del', KEYS[1]) else return 0 end";
// ... 省略部分代码
@Bean
public DefaultRedisScript<Boolean> lockRedisScript() {
DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(Boolean.class);
defaultRedisScript.setScriptText(LOCK_SCRIPT);
return defaultRedisScript;
}
@Bean
public DefaultRedisScript<Long> unlockRedisScript() {
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(Long.class);
defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
return defaultRedisScript;
}
}
ログイン後にコピー再通过在AccountOperationThread
类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate
来调用了,改造之后的代码实现如下:
private void deamonRedisLockWithLua() {
//守护线程
DaemonThread daemonThread = null;
//Spring data redis 支持的原子性操作,并设置5秒过期时间
String uuid = UUID.randomUUID().toString();
String value = Thread.currentThread().getId() + ":" + uuid;
try {
while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
// 等待1000毫秒重试获得锁
logger.info(Thread.currentThread().getName() + ":尝试循环获取锁");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger.info(Thread.currentThread().getName() + ":获得锁----");
// 开启守护线程
daemonThread = new DaemonThread(userId + ":syn", redisTemplate);
Thread thread = new Thread(daemonThread);
thread.start();
// 业务逻辑执行10秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) {
logger.error("异常", e);
} finally {
//使用Lua脚本:先判断是否是自己设置的锁,再执行删除
// key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0;
Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result));
if (RELEASE_SUCCESS.equals(result)) {
if (daemonThread != null) {
//关闭守护线程
daemonThread.stop();
logger.info(Thread.currentThread().getName() + ":释放锁---");
}
}
}
}
ログイン後にコピー其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。
Redis锁的其他因素
除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。
Redis锁的不可重入
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。
可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。
等待锁释放
有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:
- 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
- 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。
集群中的主备切换和脑裂
在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。
当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。
小结
通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。
同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。
Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。
源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
更多编程相关知识,请访问:编程入门!!
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
public class UserAccount { //用户ID private String userId; //账户内金额 private int amount; //添加账户金额 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略构造方法和getter/setter }
public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate<Object, Object> redisTemplate; public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加锁 */ private void noLock() { try { Random random = new Random(); // 模拟线程进行业务处理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金额+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } }
@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate<Object, Object> redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用户user_001到Redis,账户金额为0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 开启10个线程进行同步测试,每个线程为账户增加1元 for (int i = 0; i < 10; i++) { logger.info("创建线程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主线程休眠1秒等待线程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查询Redis中的user_001账户 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
/** * 1.抢占资源时判断是否被锁。 * 2.如未锁则抢占成功且加锁,否则等待锁释放。 * 3.业务完成后释放锁,让给其它线程。 * <p> * 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 获得锁 -> 加锁 -> 跳出循环 logger.info(Thread.currentThread().getName() + ":获得锁"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
// 该方法使用了redis的指令:SETNX key value // 1.key不存在,设置成功返回value,setIfAbsent返回true; // 2.key存在,则设置失败返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
/** * 1.原子操作加锁 * 2.竞争线程循环重试获得锁 * 3.业务完成释放锁 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":获得锁"); try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并设置5秒过期时间 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁--------"); // 应用在这里宕机,进程退出,无法执行 finally; Thread.currentThread().interrupt(); // 业务逻辑... } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } } }
public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守护 主线程关闭则结束守护线程 private volatile boolean daemon = true; // 守护锁 private String lockKey; private RedisTemplate<Object, Object> redisTemplate; public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒则续命 if (time < 1000) { logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主线程主动调用结束 public void stop() { daemon = false; } }
private void deamonRedisLock() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁-----"); } //关闭守护线程 if (daemonThread != null) { daemonThread.stop(); } } }
DefaultRedisScript
对象,在RedisConfig
配置类中添加如下实例化代码:@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代码 @Bean public DefaultRedisScript<Boolean> lockRedisScript() { DefaultRedisScript<Boolean> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript<Long> unlockRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
AccountOperationThread
类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate
来调用了,改造之后的代码实现如下:private void deamonRedisLockWithLua() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("异常", e); } finally { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //关闭守护线程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":释放锁---"); } } } }
源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
以上が分散システムにおける Redis ベースの分散ロックについて話しましょうの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









Redisクラスターモードは、シャードを介してRedisインスタンスを複数のサーバーに展開し、スケーラビリティと可用性を向上させます。構造の手順は次のとおりです。異なるポートで奇妙なRedisインスタンスを作成します。 3つのセンチネルインスタンスを作成し、Redisインスタンスを監視し、フェールオーバーを監視します。 Sentinel構成ファイルを構成し、Redisインスタンス情報とフェールオーバー設定の監視を追加します。 Redisインスタンス構成ファイルを構成し、クラスターモードを有効にし、クラスター情報ファイルパスを指定します。各Redisインスタンスの情報を含むnodes.confファイルを作成します。クラスターを起動し、CREATEコマンドを実行してクラスターを作成し、レプリカの数を指定します。クラスターにログインしてクラスター情報コマンドを実行して、クラスターステータスを確認します。作る

Redis指令を使用するには、次の手順が必要です。Redisクライアントを開きます。コマンド(動詞キー値)を入力します。必要なパラメーターを提供します(指示ごとに異なります)。 Enterを押してコマンドを実行します。 Redisは、操作の結果を示す応答を返します(通常はOKまたは-ERR)。

Redisソースコードを理解する最良の方法は、段階的に進むことです。Redisの基本に精通してください。開始点として特定のモジュールまたは機能を選択します。モジュールまたは機能のエントリポイントから始めて、行ごとにコードを表示します。関数コールチェーンを介してコードを表示します。 Redisが使用する基礎となるデータ構造に精通してください。 Redisが使用するアルゴリズムを特定します。

Redisデータをクリアする方法:Flushallコマンドを使用して、すべての重要な値をクリアします。 FlushDBコマンドを使用して、現在選択されているデータベースのキー値をクリアします。 [選択]を使用してデータベースを切り替え、FlushDBを使用して複数のデータベースをクリアします。 DELコマンドを使用して、特定のキーを削除します。 Redis-CLIツールを使用してデータをクリアします。

Redisは、単一のスレッドアーキテクチャを使用して、高性能、シンプルさ、一貫性を提供します。 I/Oマルチプレックス、イベントループ、ノンブロッキングI/O、共有メモリを使用して同時性を向上させますが、並行性の制限、単一の障害、および書き込み集約型のワークロードには適していません。

Redisのすべてのキーを表示するには、3つの方法があります。キーコマンドを使用して、指定されたパターンに一致するすべてのキーを返します。スキャンコマンドを使用してキーを繰り返し、キーのセットを返します。情報コマンドを使用して、キーの総数を取得します。

Redisのキューを読むには、キュー名を取得し、LPOPコマンドを使用して要素を読み、空のキューを処理する必要があります。特定の手順は次のとおりです。キュー名を取得します:「キュー:キュー」などの「キュー:」のプレフィックスで名前を付けます。 LPOPコマンドを使用します。キューのヘッドから要素を排出し、LPOP Queue:My-Queueなどの値を返します。空のキューの処理:キューが空の場合、LPOPはnilを返し、要素を読む前にキューが存在するかどうかを確認できます。

Redisサーバーを起動する手順には、以下が含まれます。オペレーティングシステムに従ってRedisをインストールします。 Redis-Server(Linux/Macos)またはRedis-Server.exe(Windows)を介してRedisサービスを開始します。 Redis-Cli ping(Linux/macos)またはRedis-Cli.exePing(Windows)コマンドを使用して、サービスステータスを確認します。 Redis-Cli、Python、node.jsなどのRedisクライアントを使用して、サーバーにアクセスします。
