Redis分布式锁实现原理
阅读原文时间:2023年07月13日阅读:1

关于Redis分布式锁网上有很多优秀的博文,这篇文章仅作为我这段时间遇到的新问题的记录。

1.什么是分布式锁:

  在单机部署的情况下,为了保证数据的一致性,不出现脏数据等,就需要使用synchronized关键字、semaphore、ReentrantLock或者我们可以基于AQS定制锁。锁是在多线程间共享;在分布式部署情况下,锁是在多进程间共享的;所以为了保证锁在多进程之间的唯一性,就需要实现锁在多进程之间的共享。

2.分布式锁的特性:

2.1要保证某个时刻中只有一个服务的一个方法获取到这个锁

2.2要保证是可重入锁(避免死锁)

2.3要保证锁的获取和释放的高可用。

3.分布式锁考虑的要点:

3.1需要在何时释放锁(finally)

3.2锁超时设置

3.3锁刷新设置(timeOut)

3.4如果锁超时了,为了避免误删了其他其他线程的锁,可以将当前线程的id存入redis中,当前线程释放锁的时候,需要判断存入redis的值是否为当前线程的id

3.5可重入

4.Redis分布式锁:

RedisLockRegistry是spring-integration-redis中提供Redis的实现类;主要通过redis锁+本地锁两个锁方式实现。

4.1在pomx.xml文件中导入spring-integration-redis的依赖:


org.springframework.boot spring-boot-starter-integration
org.springframework.integration spring-integration-redis
org.springframework.boot spring-boot-starter-data-redis

    

4.2 RedisLockRegistry类主要内部结构如图:

RedisLockRegistry类的静态String的常量OBTAIN_LOCK_SCRIPT是RedisLockRegistry类的一个上锁的lua脚本。KEYS[1]代表当前锁的key值,ARGV[1]代表当前的客户端标识,ARGV[2]代表过期时间。

private static final String OBTAIN_LOCK_SCRIPT = "local lockClientId = redis.call('GET', KEYS[1])\n"+
"if lockClientId == ARGV[1] then\n"+
"redis.call('PEXPIRE', KEYS[1], ARGV[2])\n"+
"return true\n"+
"elseif not lockClientId then\n"+
"redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n"+
"return true\n"+
"end\n"+
"return false";

基本逻辑就是:拿着KEYS[1]去redis中获取值,如果值等于ARGV[1]就表示这条数据已经被上锁了,并且延长锁的过期时间,如果想要获取锁锁就要等待拿到锁的进程释放锁;如果这个键KEYS[1]不存在,那么设置KEYS[1]的值为ARGV[1],并且设置过期时间为ARGV[2],即当前进程就获取到这个数据的锁,并设置过期时间。(对lua脚本和redis命令不熟悉的可以上redis中文网)

4.3RedisLockRegistry类的内部类RedisLock的结构如下:

RedisLockRegistry类中获取锁的方法:

……
private final Map locks;
……
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String)lockKey;
return (Lock)this.locks.computeIfAbsent(path, (x$0) -> {
return new RedisLockRegistry.RedisLock(x$0);
});
}

如上面代码显示,locks是RedisLockRegistry类的Map类型的常量,以String类型作为key,以RedisLockRegistry的内部类RedisLock作为value;

拿着lockKey作为key去这个map中查找是否已经存在(即这条数据是否已经上锁),如果存在就返回这个lockKey对应的RedisLock,如果不存在就创建一个RedisLock并将其以此lockKey为key放入map中。

每个分布式部署的应用都会自己创建一个RedisLockRegistry实例,到这里,同一个应用的多个线程都可以获取到这条共享数据的RedisLock对象,本地锁+Redis锁真正开始于调用通过RedisLockRegistry实例.obtain(lockKey)方法获取到的RedisLock实例对象.trylock()方法,参见下文。

4.4RedisLockRegistry类的内部类的属性和部分构造方法:

private final class RedisLock implements Lock {
private final String lockKey;
private final ReentrantLock localLock;
//用于记录上锁的时间
private volatile long lockedAt;

    private RedisLock(String path) {  
        this.localLock = new ReentrantLock();  
        this.lockKey = this.constructLockKey(path);  
    }  
    ......  

}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
if (!this.localLock.tryLock(time, unit)) {
return false;
} else {
try {
long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);

                boolean acquired;  
                while(!(acquired = this.obtainLock()) && System.currentTimeMillis() < expire) {  
                    Thread.sleep(100L);  
                }

                if (!acquired) {  
                    this.localLock.unlock();  
                }

                return acquired;  
            } catch (Exception var9) {  
                this.localLock.unlock();  
                this.rethrowAsLockException(var9);  
                return false;  
            }  
        }  
    }

    private boolean obtainLock() {  
        Boolean success = (Boolean)RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), new Object\[\]{RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)});  
        boolean result = Boolean.TRUE.equals(success);  
        if (result) {  
            this.lockedAt = System.currentTimeMillis();  
        }

        return result;  
    }

redisTemplate的execute方法参数:

第一个参数就是要执行的lua脚本;

第二个参数就是表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推);

第三个参数那些不是键名参数的附加参数 arg [arg …] ,可以在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)

分析tryLock源码可以看出,首先获取本地锁,如果获取失败,即表示某个请求线程已经获取到了锁,直接返回false;如果获取成功,就调用obtainLock方法执行OBTAIN_LOCK_SCRIPT这段lua脚本来获取redis锁,判断其他进程的某个请求线程获取到了这个redis锁,如果获取redis失败,则acquired变量为false,同时释放本地锁,tryLock方法直接返回false,获取锁失败。

为什么要用本地锁?一个是为了可重入,另一个是为了减轻redis服务器的压力。

4.5 释放锁:

public void unlock() {
if (!this.localLock.isHeldByCurrentThread()) {
throw new IllegalStateException("You do not own lock at " + this.lockKey);
} else if (this.localLock.getHoldCount() > 1) {
this.localLock.unlock();
} else {
try {
if (!this.isAcquiredInThisProcess()) {
throw new IllegalStateException("Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised.");
}

                if (Thread.currentThread().isInterrupted()) {  
                    RedisLockRegistry.this.executor.execute(this::removeLockKey);  
                } else {  
                    this.removeLockKey();  
                }

                if (RedisLockRegistry.logger.isDebugEnabled()) {  
                    RedisLockRegistry.logger.debug("Released lock; " + this);  
                }  
            } catch (Exception var5) {  
                ReflectionUtils.rethrowRuntimeException(var5);  
            } finally {  
                this.localLock.unlock();  
            }

        }  
    }

释放锁的过程也比较简单,第一步通过本地锁判断当前线程是否持有锁,第二步通过本地锁判断当前线程持有锁的计数。

如果当前线程持有锁的计数 > 1,说明本地锁被当前线程多次获取,这时只释放本地锁(释放之后当前线程持有锁的计数-1)。

如果当前线程持有锁的计数 = 1,释放本地锁和redis锁。

redis分布式锁的使用参见另一篇博文:springboot实现分布式锁

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章