ホームページ > データベース > Redis > Redis分散ロックの実装方法の紹介

Redis分散ロックの実装方法の紹介

リリース: 2019-12-05 17:09:38
転載
3597 人が閲覧しました

Redis分散ロックの実装方法の紹介

1. 分散ロックを使用するには、いくつかの条件を満たす必要があります:

1. システムは分散システムです (キーは分散されており、 ReentrantLock または同期されたコード ブロックを使用して単一のマシンを実装できます)

2. 共有リソース (各システムは同じリソースにアクセスし、リソースのキャリアは従来のリレーショナル データベースまたは NoSQL である可能性があります)

3 、同期アクセス (つまり、多くのプロセスが同じ共有リソースに同時にアクセスします。同期アクセスがなければ、リソースを奪い合うかどうかは誰にも気にされません)

2. アプリケーションシナリオの例

管理バックグラウンドのデプロイメント アーキテクチャ (複数の Tomcat サーバー redis [複数の Tomcat サーバーが 1 つの redis にアクセス] mysql [複数の Tomcat サーバーが 1 つのサーバー上の mysql にアクセス]) は、分散ロックを使用するための条件を満たしています。 。複数のサーバーが Redis グローバル キャッシュのリソースにアクセスする必要があるため、分散ロックが使用されていない場合は問題が発生します。次の疑似コードを見てください:

long N=0L;
//N从redis获取值
if(N<5){
N++;
//N写回redis
}
ログイン後にコピー

上記のコードの主な機能は次のとおりです:

redis から値 N を取得し、値 N の境界チェックを実行し、値を 1 ずつ増分します。そして N を redis に書き戻します。このアプリケーション シナリオは、フラッシュ セール、グローバル増分 ID、IP アクセス制限など、非常に一般的です。

IP アクセス制限の観点からは、悪意のある攻撃者が無制限のアクセスを開始する可能性があり、同時実行の量が比較的多いため、分散環境では N の境界チェックは信頼できません。redis から読み取られた N がすでにダーティである可能性があるためです。 。 データ。

従来のロック方法 (Java の synchronized や Lock など) は役に立ちません。これは分散環境であり、この同期問題と闘っている消防士は無力であるためです。この生死を分ける重要な時期に、分散ロックがついに登場します。

分散ロックは、Zookeeper、Redis など、さまざまな方法で実装できます。いずれの場合でも、基本原則は同じです。つまり、ロックを表すために状態値が使用され、ロックの占有と解放は状態値によって識別されます。

ここでは主に、redis を使用して分散ロックを実装する方法について説明します。

3. Redis の setNX コマンドを使用して分散ロックを実装します

1. 実装原理

Redis はシングル プロセス シングル スレッド モードですこのモードは同時アクセスをシリアル アクセスに変換し、複数のクライアントの Redis への接続間に競合はありません。 Redis の SETNX コマンドを使用すると、簡単に分散ロックを実装できます。

2. 基本的なコマンド分析

1) setNX (存在しない場合は SET)

構文:

SETNX key value
ログイン後にコピー

キーの値を値に設定します。キーが存在しない場合のみ。

指定されたキーがすでに存在する場合、SETNX は何も実行しません。

SETNX は「SET if Not eXists」の略です (存在しない場合は SET)

戻り値:

設定に成功した場合は 1 を返します。

設定できませんでした。0 を返します。

例:

redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 设置成功
(integer) 1

redis> SETNX job "code-farmer"   # 尝试覆盖 job ,失败
(integer) 0

redis> GET job                   # 没有被覆盖
"programmer"
ログイン後にコピー

そこで、次のコマンドを実行します。

SETNX lock.foo <current Unix time + lock timeout + 1>
ログイン後にコピー

1 が返された場合、クライアントはロックを取得し、lock.foo のキー値を時刻に設定します。この値は、キーがロックされており、クライアントが最終的に DEL lock.foo を介してロックを解放できることを示します。

0 が返された場合は、他のクライアントによってロックが取得されたことを意味します。このとき、先に戻るか、再試行して相手の完了を待つか、ロックがタイムアウトするまで待つことができます。

2) getSET

構文:

GETSET key value
ログイン後にコピー

指定されたキーの値を value に設定し、キーの古い値を返します。

キーは存在するが文字列型ではない場合、エラーが返されます。

戻り値:

指定されたキーの古い値を返します。

key に古い値がない場合、つまり key が存在しない場合、nil が返されます。

3) get

構文:

GET key
ログイン後にコピー

戻り値:

キーが存在しない場合は nil を返し、存在しない場合はキーの値を返します。

キーが文字列型ではない場合、エラーが返されます

4. デッドロックを解決する

上記のロック ロジックには問題があります。ロックを保持しているクライアントが失敗またはクラッシュし、ロックを解放できない場合、問題を解決するにはどうすればよいですか?

これが発生したかどうかは、ロック キーに対応するタイムスタンプによって判断できます。現在時刻が lock.foo の値より大きい場合は、ロックの有効期限が切れており、再利用できることを意味します。

これが発生した場合、単純に DEL を使用してロックを削除し、次に SETNX を再度実行することはできません (合理的に考えると、ロックの削除操作はロックの所有者によって実行される必要があります。必要なのは、次のことだけです。ここで待ちます。タイムアウト)、複数のクライアントがロックのタイムアウトを検出すると、ロックを解放しようとします。ここで競合状態が発生する可能性があります。このシナリオをシミュレートしてみましょう:

C0 操作has timed out. ですが、まだロックが保持されているため、C1 と C2 が lock.foo を読み込んでタイムスタンプを確認したところ、次々とタイムアウトしていることがわかりました。
C1 は DEL lock.foo を送信します。
C1 は SETNX lock.foo を送信し、成功します。
C2 は DEL lock.foo を送信します。
C2 は SETNX lock.foo を送信し、成功します。
このようにして、C1 と C2 の両方がロックを取得します。大問題!

幸いなことに、この問題は回避できます。C3 クライアントがどのように行うかを見てみましょう:

C0 以降、C3 は SETNX lock.foo を送信してロックを取得します。ロックがまだ保持されている場合、Redis は 0 から C3 を返します

C3发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。

反之,如果已超时,C3通过下面的操作来尝试获得锁:

GETSET lock.foo

通过GETSET,C3拿到的时间戳如果仍然是超时的,那就说明,C3如愿以偿拿到锁了。

如果在C3之前,有个叫C4的客户端比C3快一步执行了上面的操作,那么C3拿到的时间戳是个未超时的值,这时,C3没有如期获得锁,需要再次等待或重试。留意一下,尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。

注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。

五、代码实现

expireMsecs 锁持有超时,防止线程在入锁以后,无限的执行下去,让锁无法释放
timeoutMsecs 锁等待超时,防止线程饥饿,永远没有入锁执行代码的机会

注意:项目里面需要先搭建好redis的相关配置

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * Redis distributed lock implementation.
 *
 * @author zhengcanrui
 */
public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    private RedisTemplate redisTemplate;

    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    /**
     * Lock key path.
     */
    private String lockKey;

    /**
     * 锁超时时间,防止线程在入锁以后,无限的执行等待
     */
    private int expireMsecs = 60 * 1000;

    /**
     * 锁等待时间,防止线程饥饿
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    /**
     * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
     *
     * @param lockKey lock key (ex. account:1, ...)
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * Detailed constructor with default lock expiration of 60000 msecs.
     *
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) {
        this(redisTemplate, lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * Detailed constructor.
     *
     */
    public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) {
        this(redisTemplate, lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    /**
     * @return lock key
     */
    public String getLockKey() {
        return lockKey;
    }

    private String get(final String key) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] data = connection.get(serializer.serialize(key));
                    connection.close();
                    if (data == null) {
                        return null;
                    }
                    return serializer.deserialize(data);
                }
            });
        } catch (Exception e) {
            logger.error("get redis error, key : {}", key);
        }
        return obj != null ? obj.toString() : null;
    }

    private boolean setNX(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return success;
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (Boolean) obj : false;
    }

    private String getSet(final String key, final String value) {
        Object obj = null;
        try {
            obj = redisTemplate.execute(new RedisCallback<Object>() {
                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    StringRedisSerializer serializer = new StringRedisSerializer();
                    byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value));
                    connection.close();
                    return serializer.deserialize(ret);
                }
            });
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return obj != null ? (String) obj : null;
    }

    /**
     * 获得 lock.
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            if (this.setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = this.get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = this.getSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }


    /**
     * Acqurired lock release.
     */
    public synchronized void unlock() {
        if (locked) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

}
ログイン後にコピー

调用:

RedisLock lock = new RedisLock(redisTemplate, key, 10000, 20000);
 try {
            if(lock.lock()) {
                   //需要加锁的代码
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,
            //操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。 ————这里没有做
            lock.unlock();
        }
ログイン後にコピー

六、一些问题

1、为什么不直接使用expire设置超时时间,而将时间的毫秒数其作为value放在redis中?

如下面的方式,把超时的交给redis处理:

lock(key, expireSec){
isSuccess = setnx key
if (isSuccess)
expire key expireSec
}
ログイン後にコピー

这种方式貌似没什么问题,但是假如在setnx后,redis崩溃了,expire就没有执行,结果就是死锁了。锁永远不会超时。

 2、为什么前面的锁已经超时了,还要用getSet去设置新的时间戳的时间获取旧的值,然后和外面的判断超时时间的时间戳比较呢?

Redis分散ロックの実装方法の紹介

因为是分布式的环境下,可以在前一个锁失效的时候,有两个进程进入到锁超时的判断。如:

C0超时了,还持有锁,C1/C2同时请求进入了方法里面

C1/C2获取到了C0的超时时间

C1使用getSet方法

C2也执行了getSet方法

假如我们不加 oldValueStr.equals(currentValueStr) 的判断,将会C1/C2都将获得锁,加了之后,能保证C1和C2只能一个能获得锁,一个只能继续等待。

注意:这里可能导致超时时间不是其原本的超时时间,C1的超时时间可能被C2覆盖了,但是他们相差的毫秒及其小,这里忽略了。

更多redis知识请关注redis入门教程栏目。

以上がRedis分散ロックの実装方法の紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:cnblogs.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート