Java中提供了种类丰富的锁,每种锁因其特性的不同,在适当的场景下合理选择能展现非常高的效率

分布式锁一般有三种实现方式:数据库乐观锁Redis分布式锁Zookeeper分布式锁

本篇博客主要详细介绍redis分布式锁的进化,进化所解决的场景问题

可靠性

首先,为了确保分布式锁的可用性,我们需要确保锁能同时满足以下四个条件

  1. 互斥性:在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁:即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性:只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

实现原理

在单机下可以用synchronized同步代码块加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Autowired
private StringRedisTemplate redisTemplate;
@Test
public void deductStock(){
synchronized (this){
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}
}

但是在真实的场景下,往往是多台服务器做负载,而synchronized是进程级别的,只在当前进程有效

如果是做了负载的服务器,往往控制不住请求,依然会有并发问题,在并发低的情况下可能不会出现问题

这种情况下就得用分布式锁

在redis中,利用setnx命令的特性可以轻松实现分布式锁

只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。

返回值:命令在设置成功时返回 1 ,设置失败时返回 0 。

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Autowired
private StringRedisTemplate redisTemplate;
@Test
public void deductStock(){
String locKey = "lockey";
// 如果存在则true,成功则新增返回false
Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock");
// 如果存在,直接返回失败
if(!result){
System.out.println("error_code");
return
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
// 删除key,表示已经解锁
redisTemplate.delete(locKey);
}

以上代码是简单的redis锁,如果在加锁和解锁中间,出现了异常,则redis里面会一直加上这把锁

image-20220302215144457

所以升级一下,让我们的代码更健壮,使用try catch finally 保证一定会释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Autowired
private StringRedisTemplate redisTemplate;
@Test
public void deductStock(){
String locKey = "lockey";
// 如果存在则true,成功则新增返回false
Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock");
// 如果存在,直接返回失败
if(!result){
System.out.println("error_code");
}
try {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
// 删除key,表示已经解锁
redisTemplate.delete(locKey);
}
}

如果在这个时候,redis宕机了,依然会出现一直锁死的情况

image-20220302215511149

这个时候我们可以给锁设置一个超时时间,这样可以保证锁自动释放,不会一直锁死

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void deductStock(){
String locKey = "lockey";
// 如果存在则true,成功则新增返回false
Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock");
// 设置超时时间,防止锁一直不释放
redisTemplate.expire(locKey,10, TimeUnit.SECONDS);
// 如果存在,直接返回失败
if(!result){
System.out.println("error_code");
}
try {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
// 删除key,表示已经解锁
redisTemplate.delete(locKey);
}
}

上面代码看起来没有问题,但是实际会存在原子性问题

image-20220302220141345

这时候我们可以把

1
redisTemplate.opsForValue().setIfAbsent(locKey, "lock");

替换为

1
redisTemplate.opsForValue().setIfAbsent(locKey, "lock",10,TimeUnit.SECONDS);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void deductStock(){
String locKey = "lockey";
// 如果存在则true,成功则新增返回false,设置超时时间,防止锁一直不释放
Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock",10,TimeUnit.SECONDS);
// 如果存在,直接返回失败
if(!result){
System.out.println("error_code");
}
try {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
// 删除key,表示已经解锁
redisTemplate.delete(locKey);
}
}

上面的代码在高并发场景会存在问题

image-20220302221610591

如果存在上面这种情况,则永远加不上锁,前面的用户,会一直释放后面用户的锁,会存在超卖的问题

这时我们就需要用到uuid来进行加锁解铃还须系铃人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void deductStock(){
String locKey = "lockey";
// 增加uuid防止被其他用户释放锁
String clientId = UUID.randomUUID().toString();
// 如果存在则true,成功则新增返回false,设置超时时间,防止锁一直不释放
Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, clientId,10,TimeUnit.SECONDS);
// 如果存在,直接返回失败
if(!result){
System.out.println("error_code");
}
try {
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
// 如果uuid相同则释放锁
if(clientId.equals(redisTemplate.opsForValue().get(locKey))){
// 删除key,表示已经解锁
redisTemplate.delete(locKey);
}
}
}

以上代码解决了锁被其他人释放的问题,但是还是会存在一些问题

image-20220302224427117

很多人想的是对于超时时间,可以设置长一点,但是这种情况治标不治本,还是会有可能出现

以上这种情况需要用到锁续命,后台开一个定时任务,每过一段时间检查锁的业务是否还在执行,如果没执行完,就重置锁的超时时间

市面上有很多大牛对于这种情况提供了 解决方案

Redisson

我们可以使用redisson来使用分布式锁,简化了我们加锁的步骤

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.8</version>
</dependency>
1
2
3
4
5
6
7
@Bean
public Redisson redisson(){
// 单机模式
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
return (Redisson)Redisson.create(config);
}

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Autowired
private Redisson redisson;

@Autowired
private StringRedisTemplate redisTemplate;

@Test
public void deductStock(){
String locKey = "lockey";
RLock lock = redisson.getLock(locKey);
try {
// 加锁
lock.lock();
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock",realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
// 解锁
lock.unlock();
}
}

主要是通过lua脚本实现代码的原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

img

1.加锁机制:

线程去获取锁,获取成功:执行lua脚本,保存数据到redis数据库

线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。

2.看门狗

在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。

1
2
3
4
5
6
7
8
9
10
11
12
 //设置锁1秒过去
redissonLock.lock("redisson", 1);
/**
* 业务逻辑需要咨询2秒
*/
redissonLock.release("redisson");

/**
* 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了
* 那么这个时候 线程2 进来了。那么就存在 线程1和线程2 同时在这段业务逻辑里执行代码,这当然是不合理的。
* 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。
*/

所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。

注意 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。

3.为啥使用lua脚本

如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性

4.分布式锁的缺陷

Redis分布式锁会有个缺陷,就是在Redis哨兵模式下:

客户端1 对某个master节点写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。

这时客户端2 来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。

这时系统在业务语义上一定会出现问题,导致各种脏数据的产生

缺陷在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。