令牌桶和漏桶限流算法的 Java 实现

漏桶限流算法

漏桶限流算法的介绍

  • 漏桶限流算法
    • 原理:漏桶算法维护了一个固定容量的漏桶,请求以固定的速率流入漏桶。当请求到达时,如果漏桶未满,则允许请求通过,如果漏桶已满,则拒绝请求。漏桶以恒定的速率漏水,即使系统没有请求,漏桶也会持续漏水。
    • 使用场景:适用于需要固定的请求达到速率的场景,比如对网络流量进行限制,确保不会出现突发流量导致系统瘫痪。
    • 实现方式:在代码中维护一个固定容量的漏桶,定期漏水,并在每次请求到达时检查漏桶的剩余容量是否足够。

漏桶限流算法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class LeakyBucketDemo {

private final long capacity; // 漏桶的容量
private final long rate; // 漏水的速率,单位:请求数/秒
private final AtomicLong waterLevel; // 漏桶的当前水位
private final ScheduledExecutorService scheduler; // 任务调度器

public LeakyBucketDemo(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.waterLevel = new AtomicLong(0);
this.scheduler = Executors.newScheduledThreadPool(1);
initScheduler();
}

/**
* 初始化任务调度器
*/
private void initScheduler() {
scheduler.scheduleAtFixedRate(() -> {
long currWaterLevel = waterLevel.get();
// 控制漏桶以恒定的速率漏水,漏水的量被限制在当前水位和漏水速率之间的较小值
long leakAmount = Math.min(currWaterLevel, rate);
waterLevel.set(currWaterLevel - leakAmount);
}, 0, 1, TimeUnit.SECONDS); // 每秒执行一次
}

/**
* 消费请求
*/
public boolean tryConsume() {
long currWaterLevel = waterLevel.get();
// 判断漏桶是否已经满了
if (currWaterLevel < capacity) {
// 将请求流入漏桶
waterLevel.incrementAndGet();
return true;
} else {
return false;
}
}

public static void main(String[] args) throws InterruptedException {
// 初始化漏桶,容量为10,漏水速率为每秒处理5个请求
LeakyBucketDemo leakyBucket = new LeakyBucketDemo(10, 5);

// 模拟多个并发请求
for (int i = 0; i < 20; i++) {
new Thread(() -> {
// 消费请求
if (leakyBucket.tryConsume()) {
System.out.println(Thread.currentThread().getName() + " 请求被接受处理");
} else {
System.out.println(Thread.currentThread().getName() + " 请求被拒绝处理");
}
}, "T-" + i).start();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
T-15 请求被拒绝处理
T-7 请求被接受处理
T-2 请求被接受处理
T-9 请求被接受处理
T-1 请求被接受处理
T-8 请求被接受处理
T-11 请求被拒绝处理
T-12 请求被拒绝处理
T-0 请求被接受处理
T-6 请求被接受处理
T-13 请求被拒绝处理
T-17 请求被拒绝处理
T-5 请求被接受处理
T-3 请求被接受处理
T-18 请求被拒绝处理
T-14 请求被拒绝处理
T-10 请求被拒绝处理
T-4 请求被接受处理
T-19 请求被拒绝处理
T-16 请求被拒绝处理

令牌桶限流算法

令牌桶限流算法的介绍

  • 令牌桶限流算法
    • 原理:令牌桶算法维护一个固定容量的桶,其中以固定的速率产生令牌,每个令牌代表一个允许通过的请求。当请求到来时,需要从桶中获取一个令牌,如果桶中没有足够的令牌,则拒绝请求。
    • 使用场景:适用于需要平滑限流的场景,比如对服务的请求进行限制,但允许短时间内突发请求。
    • 实现方式:在代码中维护一个令牌桶,定期向其中添加令牌,并在每次请求到达时检查是否有足够的令牌可用。

令牌桶限流算法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TokenBucketDemo {

private final long capacity; // 令牌桶的容量
private final long rate; // 令牌的生成速率,单位:令牌数/秒
private final AtomicLong tokens; // 当前的令牌数量
private final ScheduledExecutorService scheduler; // 任务调度器

public TokenBucketDemo(long capacity, long rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = new AtomicLong(0);
this.scheduler = Executors.newScheduledThreadPool(1);
initScheduler();
}

/**
* 初始化任务调度器
*/
private void initScheduler() {
scheduler.scheduleAtFixedRate(() -> {
long currTokens = tokens.get();
// 维护一个固定容量的桶,并以固定的速率产生令牌
long newTokens = Math.min(currTokens + rate, capacity);
tokens.set(newTokens);
}, 0, 1, TimeUnit.SECONDS); // 每秒执行一次
}

/**
* 消费令牌
*/
public boolean tryConsume() {
long currTokens = tokens.get();
if (currTokens > 0) {
// CAS 操作
return tokens.compareAndSet(currTokens, currTokens - 1);
} else {
return false;
}
}

public static void main(String[] args) throws InterruptedException {
// 初始化令牌桶,固定容量为10,生成速率为每秒生成5个令牌
TokenBucketDemo tokenBucket = new TokenBucketDemo(10, 5);

// 模拟多个并发请求
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 获取令牌
if (tokenBucket.tryConsume()) {
System.out.println(Thread.currentThread().getName() + " 请求被接受处理");
} else {
System.out.println(Thread.currentThread().getName() + " 请求被拒绝处理");
}
}, "T-" + i).start();
}
}

}

程序执行的输出结果:

1
2
3
4
5
6
7
8
9
10
T-7 请求被拒绝处理
T-1 请求被接受处理
T-4 请求被接受处理
T-9 请求被拒绝处理
T-8 请求被拒绝处理
T-2 请求被接受处理
T-5 请求被拒绝处理
T-6 请求被拒绝处理
T-3 请求被接受处理
T-0 请求被接受处理

第三方 Java 限流库

在企业的项目开发中,考虑到代码的稳定性、健壮性、可用性,一般推荐直接使用第三方限流库来实现限流策略。

Bucket4J

Bucket4J 是一款基于令牌桶算法的 Java 限流库。