使用redis实现分布式锁

2020-10-273025

分布式锁简介

有这样一个情境,线程A和线程B都共享某个变量X,如何对其进行互斥访问。

如果是单机情况下(单JVM),线程之间共享内存,只要使用线程锁(synchronized和lock)就可以解决并发问题。

如果是分布式情况下(多JVM),线程A和线程B很可能不是在同一JVM中,这样线程锁就无法起到作用了,这时候就要用到分布式锁来解决。

REDIS分布式锁

分布式锁实现的关键是在分布式的应用服务器外,搭建一个存储服务器,存储锁信息,这时候我们很容易就想到了Redis。首先我们要搭建一个Redis服务器,用Redis服务器来存储锁信息。

在实现的时候要注意的几个关键点:

1、锁信息必须是会过期超时的,不能让一个线程长期占有一个锁而导致死锁;

2、同一时刻只能有一个线程获取到锁。

几个redis命令:

setnx(key, value):“set if not exits”,若该key-value不存在,则成功加入缓存并且返回1,否则返回0。

get(key):获得key对应的value值,若不存在则返回nil。

getset(key, value):先获取key对应的value值,若不存在则返回nil,然后将旧的value更新为新的value。原子操作。

expire(key, seconds):设置key-value的有效期为seconds秒。

可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

REDIS分布式锁简单实现(不可重入)

利用的是jedis提供的原子操作

首先我们要通过Maven引入Jedis开源组件,在pom.xml文件加入下面的代码:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

加锁代码

public class RedisTool {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

}

可以看到,我们加锁就一行代码:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

  • 第一个为key,我们使用key来当锁,因为key是唯一的。
  • 第二个为value,我们传的是requestId,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
  • 第五个为time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。

set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端解锁的时候就可以进行校验是否是同一个客户端。

解锁代码

public class RedisTool {

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

}

将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为requestId。eval()方法是将Lua代码交给Redis服务端执行。

那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。在eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。

上面一种实现利用的是jedis提供的原子操作,加锁思路是将设置键值和设置过期时间组装为一个原子操作执行,解锁思路是将判断锁的拥有者是否是当前线程和删除redis键的逻辑封装成lua代码,也相当于组装成一个原子操作。 缺点是不可重入,要用redis实现可重入锁,redis上要存更多的信息,比如重入次数,然后为了保证原子性,加解锁逻辑判断都要封装到lua脚本传给redis执行,而不能在自己的业务代码逻辑里面进行,比如像锁的重入次数加一等操作。

REDIS分布式锁利用REDISSON实现(可重入)

上面说了要用redis实现可重入锁,redis上要存更多的信息,下面的图可以看出要存哪些数据。

Screenshot_20.png

redis存储的数据类型用的是哈希类型,相当于java的 <key,<key1,value>> 类型。key是锁的名字,也就是哈希表名字。key1是哈希表“真正”的key,key1的组成是:guid + 当前线程的ID。后面的value是重入次数。

使用方法

首先引入相关maven依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

封装redisson,配置好redisson相关信息,比如要连接哪个redis服务器

public class RedissonManager {
    private static Config config = new Config();
    //声明redisso对象
    private static Redisson redisson = null;
    //实例化redisson
    static{
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        //得到redisson对象
        redisson = (Redisson) Redisson.create(config);
    }
    //获取redisson对象的方法
    public static Redisson getRedisson(){
        return redisson;
    }
}

封装Lock工具类,提供加锁和解锁两个函数

public class DistributedRedissonLock {
    private static Redisson redisson = RedissonManager.getRedisson();
    private static final String LOCK_TITLE = "redisLock_";
    //加锁
    public static boolean acquire(String lockName) throws InterruptedException {
        //声明key对象
        String key = LOCK_TITLE + lockName;
        //获取锁对象
        RLock mylock = redisson.getLock(key);
        //加锁,并且设置锁过期时间为200秒,防止死锁的产生,尝试2秒钟,加锁失败返回false
        boolean b = mylock.tryLock(2, 200, TimeUnit.SECONDS);
        System.err.println("======lock======"+Thread.currentThread().getName());
        //加锁成功
        return  b;
    }
    //锁的释放
    public static void release(String lockName){
        //必须是和加锁时的同一个key
        String key = LOCK_TITLE + lockName;
        //获取所对象
        RLock mylock = redisson.getLock(key);
        //释放锁(解锁)
        mylock.unlock();
        System.err.println("======unlock======"+Thread.currentThread().getName());
    }
}

接下来就可以在业务代码里面使用Lock了

@GetMapping("/redisson")
    public void redisson() throws IOException, InterruptedException {
        String key = "test123";
        System.out.println("firstLock");
        DistributedRedissonLock.acquire(key);//第一次获取锁
        Thread.sleep(5000);//模拟第一段业务耗时
        System.out.println("secondLock");
        DistributedRedissonLock.acquire(key);//重入
        Thread.sleep(5000);//模拟第二段业务耗时
        System.out.println("firstUnLock");
        DistributedRedissonLock.release(key);//第一次解锁
        System.out.println("secondUnLock");
        DistributedRedissonLock.release(key);//第二次解锁
    }

第一次加锁后redis里面的数据长这样

Screenshot_5.png

底层原理简析

前面也说过了,要保证加解锁的原子性,就要把加解锁逻辑封装成lua代码丢给redis执行。

上面获取到的RLock是一个接口,实际上得到的是一个RedissonLock,RedissonLock里面主要的加锁逻辑:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1,ARGV[1]是过期时间。

因此,这段脚本的意思是

1、判断有没有一个叫“abc”的key

2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

4、返回“abc”的生存时间(毫秒)

这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

在key存在的情况下,所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

RedissonLock里面主要的解锁逻辑:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

我们还是假设name=abc,假设线程ID是Thread-1

同理,我们可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存时间

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面脚本的意思是:

1、判断是否存在一个叫“abc”的key

2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1

3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

4、若字段不存在,返回空,若字段存在,则字段值减1

5、若减完以后,字段值仍大于0,则返回0

6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

RedissonLock的lockInterruptibly方法尝试获取锁时,不成功则订阅释放锁的消息,并阻塞。在得到释放通知后再被唤醒去循环获取锁。

参考文章:

https://blog.csdn.net/yb223731/article/details/90349502

https://www.cnblogs.com/webwangbao/p/9247318.html

https://www.cnblogs.com/cjsblog/p/9831423.html

分享
点赞1
打赏
上一篇:Docker常用命令笔记(一)
下一篇:RabbitMq学习笔记