Java中提供了种类丰富的锁,每种锁因其特性的不同,在适当的场景下合理选择能展现非常高的效率
分布式锁一般有三种实现方式:数据库乐观锁
,Redis分布式锁
,Zookeeper分布式锁
本篇博客主要详细介绍redis分布式锁的进化,进化所解决的场景问题
可靠性
首先,为了确保分布式锁的可用性,我们需要确保锁能同时满足以下四个条件
互斥性
:在任意时刻,只有一个客户端能持有锁。
不会发生死锁
:即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
具有容错性
:只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
解铃还须系铃人
:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
实现原理
在单机下可以用synchronized同步代码块加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Autowired private StringRedisTemplate redisTemplate; @Test public void deductStock(){ synchronized (this){ int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } } }
|
但是在真实的场景下,往往是多台服务器做负载,而synchronized是进程级别的,只在当前进程有效
如果是做了负载的服务器,往往控制不住请求,依然会有并发问题,在并发低的情况下可能不会出现问题
这种情况下就得用分布式锁
在redis中,利用setnx
命令的特性可以轻松实现分布式锁
只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。
返回值:命令在设置成功时返回 1 ,设置失败时返回 0 。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Autowired private StringRedisTemplate redisTemplate; @Test public void deductStock(){ String locKey = "lockey"; Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock"); if(!result){ System.out.println("error_code"); return } int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } redisTemplate.delete(locKey); }
|
以上代码是简单的redis锁,如果在加锁和解锁中间,出现了异常,则redis里面会一直加上这把锁

所以升级一下,让我们的代码更健壮,使用try catch finally 保证一定会释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Autowired private StringRedisTemplate redisTemplate; @Test public void deductStock(){ String locKey = "lockey"; Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock"); if(!result){ System.out.println("error_code"); } try { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } }finally { redisTemplate.delete(locKey); } }
|
如果在这个时候,redis宕机了,依然会出现一直锁死的情况

这个时候我们可以给锁设置一个超时时间,这样可以保证锁自动释放,不会一直锁死
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Test public void deductStock(){ String locKey = "lockey"; Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock"); redisTemplate.expire(locKey,10, TimeUnit.SECONDS); if(!result){ System.out.println("error_code"); } try { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } }finally { redisTemplate.delete(locKey); } }
|
上面代码看起来没有问题,但是实际会存在原子性问题

这时候我们可以把
1
| redisTemplate.opsForValue().setIfAbsent(locKey, "lock");
|
替换为
1
| redisTemplate.opsForValue().setIfAbsent(locKey, "lock",10,TimeUnit.SECONDS);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void deductStock(){ String locKey = "lockey"; Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, "lock",10,TimeUnit.SECONDS); if(!result){ System.out.println("error_code"); } try { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } }finally { redisTemplate.delete(locKey); } }
|
上面的代码在高并发场景会存在问题

如果存在上面这种情况,则永远加不上锁,前面的用户,会一直释放后面用户的锁,会存在超卖的问题
这时我们就需要用到uuid来进行加锁解铃还须系铃人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void deductStock(){ String locKey = "lockey"; String clientId = UUID.randomUUID().toString(); Boolean result = redisTemplate.opsForValue().setIfAbsent(locKey, clientId,10,TimeUnit.SECONDS); if(!result){ System.out.println("error_code"); } try { int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } }finally { if(clientId.equals(redisTemplate.opsForValue().get(locKey))){ redisTemplate.delete(locKey); } } }
|
以上代码解决了锁被其他人释放的问题,但是还是会存在一些问题

很多人想的是对于超时时间,可以设置长一点,但是这种情况治标不治本,还是会有可能出现
以上这种情况需要用到锁续命
,后台开一个定时任务,每过一段时间检查锁的业务是否还在执行,如果没执行完,就重置锁的超时时间
市面上有很多大牛对于这种情况提供了 解决方案
Redisson
我们可以使用redisson来使用分布式锁,简化了我们加锁的步骤
1 2 3 4 5
| <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.8</version> </dependency>
|
1 2 3 4 5 6 7
| @Bean public Redisson redisson(){ Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0); return (Redisson)Redisson.create(config); }
|
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Autowired private Redisson redisson;
@Autowired private StringRedisTemplate redisTemplate;
@Test public void deductStock(){ String locKey = "lockey"; RLock lock = redisson.getLock(locKey); try { lock.lock(); int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock")); if(stock > 0){ int realStock = stock - 1; redisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); }else{ System.out.println("扣减失败,库存不足"); } }finally { lock.unlock(); } }
|
主要是通过lua脚本实现代码的原子性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
|

1.加锁机制:
线程去获取锁,获取成功:执行lua脚本,保存数据到redis数据库
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
2.看门狗
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
1 2 3 4 5 6 7 8 9 10 11 12
| redissonLock.lock("redisson", 1);
redissonLock.release("redisson");
|
所以这个时候看门狗
就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。
注意
正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。
3.为啥使用lua脚本
如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
4.分布式锁的缺陷
Redis分布式锁会有个缺陷,就是在Redis哨兵模式下:
客户端1
对某个master节点
写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave节点从变为了 master节点。
这时客户端2
来尝试加锁的时候,在新的master节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
缺陷
在哨兵模式或者主从模式下,如果 master实例宕机的时候,可能导致多个客户端同时完成加锁。