目录
分布式锁概览
分布式锁的特点
分布式锁的实现方式
Redis普通分布式锁存在的问题
Redis高级分布式锁:Redlock
Redlock源码
1. Redlock依赖
2. Redlock用法
3. Redlock唯一ID
4. Redlock获取锁
5. Redlock释放锁
Redis实现的分布式锁轮子
1. 自定义注解
2. AOP拦截器实现
3. Redis实现分布式锁核心类
4. Controller层控制
首页 数据库 Redis Redis分布式锁实例分析

Redis分布式锁实例分析

May 31, 2023 pm 07:32 PM
redis

分布式锁概览

在多线程的环境下,为了保证一个代码块在同一时间只能由一个线程访问,Java中我们一般可以使用synchronized语法和ReetrantLock去保证,这实际上是本地锁的方式。但是现在公司都是流行分布式架构,在分布式环境下,如何保证不同节点的线程同步执行呢?因此就引出了分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。

在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,如果没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操作,导致数据重复插入,对于某些不允许有多余数据的业务来说,这就会造成问题。而分布式锁机制就是为了解决类似这类问题,保证多个服务之间互斥的访问共享资源,如果一个服务抢占了分布式锁,其他服务没获取到锁,就不进行后续操作。大致意思如下图所示:

Redis分布式锁实例分析

分布式锁的特点

分布式锁一般有如下的特点:

  • 互斥性: 同一时刻只能有一个线程持有锁

  • 可重入性: 同一节点上的同一个线程如果获取了锁之后能够再次获取锁

  • 锁超时:和J.U.C中的锁一样支持锁超时,防止死锁

  • 高性能和高可用: 加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效

  • 具备阻塞和非阻塞性:能够及时从阻塞状态中被唤醒

分布式锁的实现方式

我们一般实现分布式锁有以下几种方式:

  • 基于数据库

  • 基于Redis

  • 基于zookeeper

Redis普通分布式锁存在的问题

说到Redis分布式锁,大部分人都会想到:setnx lua(redis保证执行lua脚本时不执行其他操作,保证操作的原子性),或者知道set key value px milliseconds nx。后一种方式的核心实现命令如下:

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1])
else   
    return 0
end
登录后复制

这种实现方式有3大要点(也是面试概率非常高的地方):

  1. set命令要用set key value px milliseconds nx

  2. value要具有唯一性;

  3. 释放锁时要验证value值,不能误解锁;

事实上这类锁最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

  1. 在Redis的master节点上拿到了锁;

  2. 但是这个加锁的key还没有同步到slave节点;

  3. master故障,发生故障转移,slave节点升级为master节点;

  4. 导致锁丢失。

为了避免单点故障问题,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。

Redis高级分布式锁:Redlock

antirez提出的redlock算法大概是这样的:

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

  • 获取当前Unix时间,以毫秒为单位。

  • 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间TTL为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功

  • 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

  • 此处不讨论时钟漂移

Redis分布式锁实例分析

Redlock源码

redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。

1. Redlock依赖

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>
登录后复制

2. Redlock用法

首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
        .setMasterName("masterName")
        .setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
    isLock = redLock.tryLock();
    // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    if (isLock) {
        //TODO if get lock success, do something;
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    redLock.unlock();
}
登录后复制

3. Redlock唯一ID

实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:

protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
    return id + ":" + threadId;
}
登录后复制

4. Redlock获取锁

获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 获取锁时向5个redis实例发送的命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
              "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                  "redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
              "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 获取分布式锁的KEY的失效时间毫秒数
              "return redis.call(&#39;pttl&#39;, KEYS[1]);",
              // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
登录后复制

获取锁的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;

  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;

  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:

5. Redlock释放锁

释放锁的代码为redLock.unlock(),核心源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 向5个redis实例都执行如下命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call(&#39;del&#39;, KEYS[1]); " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}
登录后复制

Redis实现的分布式锁轮子

下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。

1. 自定义注解

自定义一个注解,被注解的方法会执行获取分布式锁的逻辑

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
    /**
     * 业务键
     *
     * @return
     */
    String key();
    /**
     * 锁的过期秒数,默认是5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 尝试加锁,最多等待时间
     *
     * @return
     */
    long waitTime() default Long.MIN_VALUE;
    /**
     * 锁的超时时间单位
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
登录后复制

2. AOP拦截器实现

在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:

@Aspect
@Component
public class LockMethodAspect {
    @Autowired
    private RedisLockHelper redisLockHelper;
    @Autowired
    private JedisUtil jedisUtil;
    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

    @Around("@annotation(com.redis.lock.annotation.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String value = UUID.randomUUID().toString();
        String key = redisLock.key();
        try {
            final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
            logger.info("isLock : {}",islock);
            if (!islock) {
                logger.error("获取锁失败");
                throw new RuntimeException("获取锁失败");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系统异常");
            }
        }  finally {
            logger.info("释放锁");
            redisLockHelper.unlock(jedis,key, value);
            jedis.close();
        }
    }
}
登录后复制

3. Redis实现分布式锁核心类

@Component
public class RedisLockHelper {
    private long sleepTime = 100;
    /**
     * 直接使用setnx + expire方式获取分布式锁
     * 非原子性
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
        Long result = jedis.setnx(key, value);
        // result = 1时,设置成功,否则设置失败
        if (result == 1L) {
            return jedis.expire(key, timeout) == 1L;
        } else {
            return false;
        }
    }

    /**
     * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
     *
     * @param jedis
     * @param key
     * @param UniqueId
     * @param seconds
     * @return
     */
    public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
        String lua_scripts = "if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 then" +
                "redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) return 1 else return 0 end";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(UniqueId);
        values.add(String.valueOf(seconds));
        Object result = jedis.eval(lua_scripts, keys, values);
        //判断是否成功
        return result.equals(1L);
    }

    /**
     * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 自定义获取锁的超时时间
     *
     * @param jedis
     * @param key
     * @param value
     * @param timeout
     * @param waitTime
     * @param timeUnit
     * @return
     * @throws InterruptedException
     */
    public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
        long seconds = timeUnit.toSeconds(timeout);
        while (waitTime >= 0) {
            String result = jedis.set(key, value, "nx", "ex", seconds);
            if ("OK".equals(result)) {
                return true;
            }
            waitTime -= sleepTime;
            Thread.sleep(sleepTime);
        }
        return false;
    }
    /**
     * 错误的解锁方法—直接删除key
     *
     * @param key
     */
    public void unlock_with_del(Jedis jedis,String key) {
        jedis.del(key);
    }

    /**
     * 使用Lua脚本进行解锁操纵,解锁的时候验证value值
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis,String key,String value) {
        String luaScript = "if redis.call(&#39;get&#39;,KEYS[1]) == ARGV[1] then " +
                "return redis.call(&#39;del&#39;,KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}
登录后复制

4. Controller层控制

定义一个TestController来测试我们实现的分布式锁

@RestController
public class TestController {
    @RedisLock(key = "redis_lock")
    @GetMapping("/index")
    public String index() {
        return "index";
    }
}
登录后复制

以上是Redis分布式锁实例分析的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

redis集群模式怎么搭建 redis集群模式怎么搭建 Apr 10, 2025 pm 10:15 PM

Redis集群模式通过分片将Redis实例部署到多个服务器,提高可扩展性和可用性。搭建步骤如下:创建奇数个Redis实例,端口不同;创建3个sentinel实例,监控Redis实例并进行故障转移;配置sentinel配置文件,添加监控Redis实例信息和故障转移设置;配置Redis实例配置文件,启用集群模式并指定集群信息文件路径;创建nodes.conf文件,包含各Redis实例的信息;启动集群,执行create命令创建集群并指定副本数量;登录集群执行CLUSTER INFO命令验证集群状态;使

redis数据怎么清空 redis数据怎么清空 Apr 10, 2025 pm 10:06 PM

如何清空 Redis 数据:使用 FLUSHALL 命令清除所有键值。使用 FLUSHDB 命令清除当前选定数据库的键值。使用 SELECT 切换数据库,再使用 FLUSHDB 清除多个数据库。使用 DEL 命令删除特定键。使用 redis-cli 工具清空数据。

redis怎么读取队列 redis怎么读取队列 Apr 10, 2025 pm 10:12 PM

要从 Redis 读取队列,需要获取队列名称、使用 LPOP 命令读取元素,并处理空队列。具体步骤如下:获取队列名称:以 "queue:" 前缀命名,如 "queue:my-queue"。使用 LPOP 命令:从队列头部弹出元素并返回其值,如 LPOP queue:my-queue。处理空队列:如果队列为空,LPOP 返回 nil,可先检查队列是否存在再读取元素。

redis指令怎么用 redis指令怎么用 Apr 10, 2025 pm 08:45 PM

使用 Redis 指令需要以下步骤:打开 Redis 客户端。输入指令(动词 键 值)。提供所需参数(因指令而异)。按 Enter 执行指令。Redis 返回响应,指示操作结果(通常为 OK 或 -ERR)。

redis怎么使用锁 redis怎么使用锁 Apr 10, 2025 pm 08:39 PM

使用Redis进行锁操作需要通过SETNX命令获取锁,然后使用EXPIRE命令设置过期时间。具体步骤为:(1) 使用SETNX命令尝试设置一个键值对;(2) 使用EXPIRE命令为锁设置过期时间;(3) 当不再需要锁时,使用DEL命令删除该锁。

redis怎么读源码 redis怎么读源码 Apr 10, 2025 pm 08:27 PM

理解 Redis 源码的最佳方法是逐步进行:熟悉 Redis 基础知识。选择一个特定的模块或功能作为起点。从模块或功能的入口点开始,逐行查看代码。通过函数调用链查看代码。熟悉 Redis 使用的底层数据结构。识别 Redis 使用的算法。

centos redis如何配置Lua脚本执行时间 centos redis如何配置Lua脚本执行时间 Apr 14, 2025 pm 02:12 PM

在CentOS系统上,您可以通过修改Redis配置文件或使用Redis命令来限制Lua脚本的执行时间,从而防止恶意脚本占用过多资源。方法一:修改Redis配置文件定位Redis配置文件:Redis配置文件通常位于/etc/redis/redis.conf。编辑配置文件:使用文本编辑器(例如vi或nano)打开配置文件:sudovi/etc/redis/redis.conf设置Lua脚本执行时间限制:在配置文件中添加或修改以下行,设置Lua脚本的最大执行时间(单位:毫秒)

redis命令行怎么用 redis命令行怎么用 Apr 10, 2025 pm 10:18 PM

使用 Redis 命令行工具 (redis-cli) 可通过以下步骤管理和操作 Redis:连接到服务器,指定地址和端口。使用命令名称和参数向服务器发送命令。使用 HELP 命令查看特定命令的帮助信息。使用 QUIT 命令退出命令行工具。

See all articles