limiter限流

单机限流

思路介绍

采用 Google 的 Guava 中的 RateLimiter.create 来进行令牌桶的分发,但是我们需要动态控制限流的速率,所以在这上面利用单例模式的“ 双重检查锁 ”和 “懒汉式”来进行动态创建

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.tthk.inland.ticket.core.utils.limiters;

import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

/**
* @Description CN: 利用guava令牌桶限流【单机限流】
* @Description EN: Use the Guava token-bucket to limit the service
* @Description KR: Guava 토큰 버킷을 사용하여 서비스 제한
* @Author Foam
* @Date 2023/5/6
**/
@Slf4j
public class GuavaRateLimiterUtils {

/**
* @Description CN: 默认速率
* @Description EN: Default rate
* @Description KR: 기본 비율
**/
private final static Integer DEFAULT_RATE = 80;

private static RateLimiter instance = null;

private GuavaRateLimiterUtils(){}

/**
* @Description CN: 初始化令牌桶
* @Description EN: Initialize the Guava token-bucket
* @Description KR: Guava 토큰 버킷 초기화
* @Author Foam
* @Date 2023/5/6
* @param rate 限流速率【每秒生产的token】 RateLimiter: Tokens produced per second
* @return boolean true: 限流中 limiting | false: 未限流 pass
**/
public static boolean initRateLimiter(Integer rate){
try{
Integer limiter = Optional.ofNullable(rate).orElse(DEFAULT_RATE);
if(instance == null || limiter.equals(DEFAULT_RATE)){
synchronized (GuavaRateLimiterUtils.class){
if(instance == null || limiter.equals(DEFAULT_RATE)){
// 创建令牌桶 Create token-bucket
instance = RateLimiter.create(limiter);
}
}
}
if(!instance.tryAcquire(1)){
log.info("限流中");
return true;
}
}catch (Exception e){
log.info(e.getMessage());
}
return false;
}

}

分布式限流

思路介绍

采用 Redis 队列实现滑动窗口的方式来进行限流,利用 Redis 的 SETNX 特性来加分布式锁,保证执行顺序

1
2
3
4
5
6
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.9.3</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.tthk.inland.ticket.core.utils.limiters;

import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

/**
* @Description CN: Redis链接工具类
* @Description EN: Redis links the tool class
* @Description KR: Redis 는 도구 클래스를 연결합니다
* @Author Foam
* @Date 2023/5/6
**/
@Slf4j
public class RedisLinkUtils {
// Redis link
private static volatile RedissonClient redissonClient;
// redis地址 Redis url
private static final String URL = "";
// 端口 Redis port
private static final String PORT = "";
// redis密码 Redis password
private static final String PASSWORD = "";
// 对应DB Redis DB
private static final Integer DB = 17;
// Limit dequeue name
public final static String TASK_NAME = "REDIS_LIMIT";
// Lock name
public final static String TASK_LOCK = "REDIS_LOCK";

/**
* @Description CN: 获取redis客户端
* @Description EN: Get redis client
* @Description KR: Redis 클라이언트 가져오기
* @Author Foam
* @Date 2023/5/6
* @return RedissonClient
**/
public static RedissonClient getRedissonClient(){
if(redissonClient == null){
synchronized (RedisLinkUtils.class){
if(redissonClient == null){
Config config = new Config();
config.useSingleServer().setAddress("redis://"+URL+":"+PORT)
.setPassword(PASSWORD)
.setDatabase(DB);
redissonClient = Redisson.create(config);
// 清空队列 Queue clear
redissonClient.getBlockingDeque(TASK_NAME).clear();
}
}
}
return redissonClient;
}

/**
* @Description CN: 分布式锁
* @Description EN: Distributed locking
* @Description KR: 분산 잠금
* @Author Foam
* @Date 2023/5/6
* @param waitTime 等待时间(锁续命)
* @param leaseTime 强制解锁时间
* @return boolean true: 锁定 lock false:未锁定 unlock
**/
public static boolean acquireSecond(Integer waitTime, Integer leaseTime){
try {
RLock myLock = redissonClient.getLock(TASK_LOCK);
return myLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info(e.getMessage());
}
return false;
}

/**
* @Description CN: 解锁
* @Description EN: Release lock
* @Description KR: 잠금 해제
* @Author Foam
* @Date 2023/5/6
**/
public static void release(){
if(redissonClient != null) {
RLock myLock = redissonClient.getLock(TASK_LOCK);
if(myLock != null){
myLock.unlock();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.tthk.inland.ticket.core.utils.limiters;

import lombok.extern.slf4j.Slf4j;

import java.util.*;

/**
* @Description CN: Redis 滑动窗口限流
* @Description EN: Redis sliding window throttling
* @Description KR:
* @Author Foam
* @Date 2023/5/6
**/
@Slf4j
public class RedisRateLimiterUtils {

/**
* @Description CN: 默认速率
* @Description EN: Default rate
* @Description KR: 기본 비율
**/
private final static Integer DEFAULT_RATE = 80;

private final static Long timeout = 1L;

/**
* @Description CN: 初始化滑动窗口限流
* @Description EN: Initialize sliding window current limiting
* @Description KR: 슬라이딩 윈도우 전류 제한 초기화
* @Author Foam
* @Date 2023/5/6
* @param rate 速率
* @return Boolean true:限流中 false:未限流
**/
public static Boolean initRateLimiter(Integer rate){
Long currentTimeMillis = System.currentTimeMillis();
Integer limiter = Optional.ofNullable(rate).orElse(DEFAULT_RATE);
Deque<Long> stack = RedisLinkUtils.getRedissonClient().getBlockingDeque(RedisLinkUtils.TASK_NAME);
try{
if(stack.size() >= limiter){
// 获取分布式锁 Get distributed lock
if(RedisLinkUtils.acquireSecond(1,2)){
// 获取最早时间 Get the earliest time
Long firstTime = Optional.ofNullable(stack.peekFirst()).orElse(currentTimeMillis);
if((currentTimeMillis - firstTime) < (timeout * 1000)){
// 在限流时间内 During the current limit time
log.info("限流中");
return true;
}
// 弹出最早时间 Poll the earliest queue
while (stack.size() >= limiter){
stack.pollFirst();
}
// 分布式解锁 Release distributed lock
RedisLinkUtils.release();
}else {
// 未获取到分布式锁,限流!The distributed lock not obtained,current limit
log.info("未获取到分布式锁,限流!");
return true;
}
}
}catch (Exception e){
log.info(e.getMessage());
}
// 尾端插入当前时间戳 The trailer inserts the current timestamp
RedisLinkUtils.getRedissonClient().getBlockingDeque(RedisLinkUtils.TASK_NAME).offerLast(currentTimeMillis);
return false;
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
while (initRateLimiter(1)){
Thread.sleep(300);
};
System.out.println(i+"次");
}
}
}