概述
什么是限流
限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统 的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
比如场景:
某天小明突然发现自己的接口请求突然之间涨到了原来的10倍,接口几乎不能使用,产生了一系列连锁反应,导致了整个系统崩溃。这就好比,老电闸中都安装了保险丝,一旦使用大功率设备,保险丝就会熔断,保证各个电器不被强电流烧坏,系统也同样安装保险丝,防止非预期请求过大,引起系统瘫痪。
限流方法
常用的限流算法有:计数法,滑动窗口计数法,漏桶算法和令牌桶算法。
漏桶算法思路
水(请求)进入到漏桶里,漏桶以一定的速度流出,当水流的速度过大会直接溢出, 漏桶是强行限制了数据的传输速率。
令牌桶算法
除了要能够限制数据的平均传输速率外,还需要允许某种程度的突发请求,令牌桶更为合适。
令牌桶的思路是以一个恒定的速率往桶里放令牌,如果请求需要被处理,则需要从桶里取出一个令牌,如果没有令牌可取,那么就拒绝服务。
Google开源工具包Guava提供了限流工具类RateLimiter是基于令牌桶算法来实现的。
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
checkPermits(permits); //检查参数是否合法(是否大于0)
long microsToWait;
synchronized (mutex) { //应对并发情况需要同步
microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间
}
ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0
return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
}
private long reserveNextTicket(double requiredPermits, long nowMicros) {
resync(nowMicros); //补充令牌
long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌
return microsToNextFreeTicket;
}
private void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
storedPermits = Math.min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
nextFreeTicketMicros = nowMicros;
}
}
计数器
控制单位时间内的请求数量
import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
/**
* 最大访问数量
*/
private final int limit = 10;
/**
* 访问时间差
*/
private final long timeout = 1000;
/**
* 请求时间
*/
private long time;
/**
* 当前计数器
*/
private AtomicInteger reqCount = new AtomicInteger(0);
public boolean limit() {
long now = System.currentTimeMillis();
if (now < time + timeout) {
// 单位时间内
reqCount.getAndAdd(1);
return reqCount.get() <= limit;
} else {
// 超出单位时间
time = now;
reqCount = new AtomicInteger(0);
return true;
}
}
public static void main(String[] args) {
}
}
计数方式有没有问题?
假设每分钟请求数量为 60 个,每秒钟系统可以处理1个请求,用户在00:59 发送了60 个请求,然后在 1:00 发生了60个请求,此时 2 秒内有120个请求(每秒60)个请求,这样的方式并没有实现限制流量,因为每分钟可以处理60个,但是实际上这60个是一秒钟发过来的。
滑动窗口计数
滑动窗口是对计数方式对改进,增加一个时间粒度的度量单位。
把一分钟分成了若干等份,比如分成6份, 每份10s, 在一份独立计数器上,在 00:00-00:09 之间计数器累加1, 当等份数量越大,限流统计越详细。
package ratelimit;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;
public class TimeWindow {
private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
/**a
* 间隔秒数
*/
private int seconds;
/**
* 最大限流
*/
private int max;
public TimeWindow(int max, int seconds) {
this.seconds = seconds;
this.max = max;
new Thread(() -> {
while (true) {
try {
Thread.sleep((seconds - 1) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
clean();
}
}).start();
}
public static void main(String[] args) {
final TimeWindow timeWindow = new TimeWindow(10, 1);
IntStream.range(0, 3).forEach((i) -> {
new Thread(() -> {
try {
while (true) {
Thread.sleep(new Random().nextInt(20) * 100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
timeWindow.take();
}).start();
});
}
public void take() {
long start = System.currentTimeMillis();
int size = sizeOfValid();
if (size > max) {
System.out.println("超限");
}
synchronized (queue) {
if (sizeOfValid() > max) {
System.err.println("超限");
System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);
}
this.queue.offer(System.currentTimeMillis());
}
System.err.println("queue 中有:" + queue.size() + "最大数量:" + max);
}
private int sizeOfValid() {
Iterator<Long> it = queue.iterator();
Long ms = System.currentTimeMillis() - seconds * 1000;
int count = 0;
while (it.hasNext()) {
long t = it.next();
if (t > ms) {
//在时间窗口范围内
count++;
}
}
return count;
}
public void clean() {
Long c = System.currentTimeMillis() - seconds * 1000;
Long t1 = null;
while ((t1 = queue.peek()) != null && t1 < c) {
System.out.println("数据清理");
queue.poll();
}
}
}
令牌桶问题
令牌桶规定固定容量的桶,令牌 token 以固定速度往桶内填充,当桶填满时 token 不会继续放入,每过来一个请求把 token 从桶中移除,当没有 token 可以获取时,拒绝请求。
令牌桶算法
当网络设备衡量流量是否超过额定带宽时,需要查看令牌桶,而令牌桶中会放置一定数量的令牌,一个令牌允许接口发送或接收1bit数据(有时是1 Byte数据),当接口通过1bit数据后,同时也要从桶中移除一个令牌。当桶里没有令牌的时候,任何流量都被视为超过额定带宽,只有当桶中有令牌时,数据才可以通过接口。令牌桶中的令牌不仅仅可以被移除,同样也可以往里添加,所以为了保证接口随时有数据通过,就必须不停地往桶里加令牌,由此可见,往桶里加令牌的速度,就决定了数据通过接口的速度。 因此,我们通过控制往令牌桶里加令牌的速度从而控制用户流量的带宽。而设置的这个用户传输数据的速率被称为承诺信息速率(CIR),通常以秒为单位。比如我们设置用户的带宽为1000 bit每秒,只要保证每秒钟往桶里添加1000个令牌即可。
令牌桶可以用来保护自己,主要用来对调用者频率进行限流,为的是不让自己的系统垮掉。
令牌桶算法代码
package com.netease.datastream.util.flowcontrol;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* <pre>
* Created by inter12 on 15-3-18.
* </pre>
*/
public class TokenBucket {
// 默认桶大小个数 即最大瞬间流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一个桶的单位是1字节
private int everyTokenSize = 1;
// 瞬间最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
// 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已经满了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 获取足够的令牌个数
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
// Preconditions.checkNotNull(dataSize);
// Preconditions.checkArgument(isStart,
// "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
public void start() {
// 初始化桶队列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
TimeUnit.SECONDS);
isStart = true;
}
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException,
InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
.maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
String data = "xxxx";// 四个字节
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
令牌桶和漏桶的选择问题
如果要让自己的系统不被打垮,用令牌桶,如果保证别人的系统不被打垮,用漏桶算法。
内推链接:https://job.toutiao.com/referral/mobile/spring-referral?token=MzsxNjIwMzgzNzA2MzYyOzY5Mzk2OTkyMjAwODk2NjkxNTE7MA
欢迎关注公众号:程序员开发者社区
参考资料
- https://www.cnblogs.com/xuwc/p/9123078.html
最后
以上就是冷傲帽子为你收集整理的如何限流,具体实现有哪些?的全部内容,希望文章能够帮你解决如何限流,具体实现有哪些?所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复