阿里妹导读:不同的场景下所需的流控算法不尽相同,那应该如何选择适用的流控方案呢?本文分享单机及分布式流控场景下,简单窗口、滑动窗口、漏桶、令牌桶、滑动日志等几种流控算法的思路和代码实现,并总结了各自的复杂度和适用场景。较长,同学们可收藏后再看。
public interface Throttler {/*** 尝试申请一个配额** @param key 申请配额的key* @return 申请成功则返回true,否则返回false*/boolean tryAcquire(String key);}
简单窗口是我自己的命名,有些地方也叫做固定窗口,主要是为了跟后面的滑动窗口区分。
如果访问次数小于阈值,则代表允许访问,访问次数 +1。
如果访问次数超出阈值,则限制访问,访问次数不增。
如果超过了时间窗口,计数器清零,并重置清零后的首次成功访问时间为当前时间。这样就确保计数器统计的是最近一个窗口的访问量。
/*** 毫秒为单位的时间窗口*/private final long windowInMs;/*** 时间窗口内最大允许的阈值*/private final int threshold;/*** 最后一次成功请求时间*/private long lastReqTime = System.currentTimeMillis();/*** 计数器*/private long counter;public boolean tryAcquire(String key) {long now = System.currentTimeMillis();// 如果当前时间已经超过了上一次访问时间开始的时间窗口,重置计数器,以当前时间作为新窗口的起始值if (now - lastReqTime > windowInMs) {counter = 0;lastReqTime = now;}if (counter < threshold) {counter++;return true;} else {return false;}}
private volatile long lastReqTime = System.currentTimeMillis();private LongAdder counter = new LongAdder();
a)总窗口大小 intervalInMs,滑动子窗口大小 windowLengthInMs,采样数量sampleCount:
sampleCount = intervalInMs / windowLengthInMs
windowStart:滑动窗口的开始时间。
windowLength:滑动窗口的长度。
value:滑动窗口记录的内容,泛型表示,关键的一类就是 MetricBucket,里面包含了一组 LongAdder 用于记录不同类型的数据,例如请求通过数、请求阻塞数、请求异常数等等。
最大允许请求数 N:桶的大小
时间窗口大小 T:一整桶水漏完的时间
最大访问速率 V:一整桶水漏完的速度,即 N/T
请求被限流:桶注水的速度比漏水的速度快,最终导致桶内水溢出
/*** 当前桶内剩余的水*/private long left;/*** 上次成功注水的时间戳*/private long lastInjectTime = System.currentTimeMillis();/*** 桶的容量*/private long capacity;/*** 一桶水漏完的时间*/private long duration;/*** 桶漏水的速度,即 capacity / duration*/private double velocity;public boolean tryAcquire(String key) {long now = System.currentTimeMillis();// 当前剩余的水 = 之前的剩余水量 - 过去这段时间内漏掉的水量// 过去这段时间内漏掉的水量 = (当前时间-上次注水时间) * 漏水速度// 如果当前时间相比上次注水时间相隔太久(一直没有注水),桶内的剩余水量就是0(漏完了)left = Math.max(0, left - (long)((now - lastInjectTime) * velocity));// 往当前水量基础上注一单位水,只要没有溢出就代表可以访问if (left + 1 <= capacity) {lastInjectTime = now;left++;return true;} else {return false;}}
令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。
long now = System.currentTimeMillis();left = Math.min(capacity, left + (long)((now - lastInjectTime) * velocity));if (left - 1 > 0) {lastInjectTime = now;left--;return true;} else {return false;}
滑动日志与滑动窗口非常像,区别在于滑动日志的滑动是根据日志记录的时间做动态滑动,而滑动窗口是根据子窗口的大小,以子窗口维度滑动。
# 初始化counter = 0q = []# 请求处理流程# 1.找到队列中第一个时间戳>=t-T的请求,即以当前时间t截止的时间窗口T内的最早请求t = nowstart = findWindowStart(q, t)# 2.截断队列,只保留最近T时间窗口内的记录和计数值q = q[start, q.length - 1]counter -= start# 3.判断是否放行,如果允许放行则将这次请求加到队列 q 的末尾if counter < thresholdpush(q, t)counter++# 放行else# 限流
状态的一致性在中心系统维护,实现简单。
中心系统节点的不可用会导致流控出错,需要有额外的保护。例如,中心化流控在中心存储不可用时,往往会退化为单机流控。
相比中心化方案,去中心化方案能够降低中心化单点可靠性带来的影响,但实现上比较复杂,状态的一致性难以保证。
在 CAP 中去中心化更加倾向于 A 而中心化更倾向于 C。
去中心化方案在生产环境中没有见过,因此下文只讨论中心化流控的思路。
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=10r/s;server {location /login/ {limit_req zone=mylimit;proxy_pass http://my_upstream;}}
这里借用了 Sentinel 中的 TokenServer 叫法,Sentinel 集群流控的介绍可以参考官方文档:Sentinel集群流控 (https://github.com/alibaba/Sentinel/wiki/集群流控)。
Token Server 自动管理、调度(分配/选举 Token Server)
Token Server 高可用,在某个 Server 不可用时自动 failover 到其它机器
public boolean tryAcquire(String key) {// 以秒为单位构建tair的keyString wrappedKey = wrapKey(key);// 每次请求+1,初始值为0,key的有效期设置5sResult<Integer> result = tairManager.incr(NAMESPACE, wrappedKey, 1, 0, 5);return result.isSuccess() && result.getValue() <= threshold;}private String wrapKey(String key) {long sec = System.currentTimeMillis() / 1000L;return key + ":" + sec;}
incr
Result incr(int namespace, Serializable key, int value, int defaultValue, int expireTime)
描述
增加计数。注意:incr 前不要 put!!
参数
namespace - 申请时分配的 namespace key - key 列表,不超过 1k value - 增加量 defaultValue - 第一次调用 incr 时的 key 的 count 初始值,第一次返回的值为 defaultValue + value。 expireTime - 数据过期时间,单位为秒,可设相对时间或绝对时间(Unix 时间戳)。expireTime = 0,表示数据永不过期。expireTime > 0,表示设置过期时间。若 expireTime > 当前时间的时间戳,则表示使用绝对时间,否则使用相对时间。expireTime < 0,表示不关注过期时间,若之前设过过期时间,则已之前的过期时间为准,若没有,则作为永不过期处理,但当前 mdb 统一当做永不过期来处理。
返回值
Result 对象,返回值可为负值。当 key 不存在时,第一次返回 defaultValue+ value。后续的 incr 基于该值增加 value。
简单窗口的临界突变问题。
Tair 的可靠性问题,需要有降级方案。上面其实也说了,中心化的流控一般都需要搭配降级的单机流控。
集群机器的时间同步问题。由于生成 key 会用到集群机器的本地时间,因此要求机器时间必须是一致的。
Redis INCR key(https://redis.io/commands/incr)
FUNCTION LIMIT_API_CALL(ip)ts = CURRENT_UNIX_TIME()keyname = ip+":"+tscurrent = GET(keyname)IF current != NULL AND current > 10 THENERROR "too many requests per second"ELSEMULTIINCR(keyname,1)EXPIRE(keyname,10)EXECPERFORM_API_CALL()END
FUNCTION LIMIT_API_CALL(ip):current = GET(ip)IF current != NULL AND current > 10 THENERROR "too many requests per second"ELSEvalue = INCR(ip)IF value == 1 THENEXPIRE(ip,1)ENDPERFORM_API_CALL()END
local currentcurrent = redis.call("incr",KEYS[1])if tonumber(current) == 1 thenredis.call("expire",KEYS[1],1)end
FUNCTION LIMIT_API_CALL(ip)current = LLEN(ip)IF current > 10 THENERROR "too many requests per second"ELSEIF EXISTS(ip) == FALSE #1MULTIRPUSH(ip,ip)EXPIRE(ip,1)EXECELSERPUSHX(ip,ip)ENDPERFORM_API_CALL()END
local tokens_key = KEYS[1]local timestamp_key = KEYS[2]local rate = tonumber(ARGV[1])local capacity = tonumber(ARGV[2])local now = tonumber(ARGV[3])local requested = tonumber(ARGV[4])local fill_time = capacity/ratelocal ttl = math.floor(fill_time*2)local last_tokens = tonumber(redis.call("get", tokens_key))if last_tokens == nil thenlast_tokens = capacityendlocal last_refreshed = tonumber(redis.call("get", timestamp_key))if last_refreshed == nil thenlast_refreshed = 0endlocal delta = math.max(0, now-last_refreshed)local filled_tokens = math.min(capacity, last_tokens+(delta*rate))local allowed = filled_tokens >= requestedlocal new_tokens = filled_tokensif allowed thennew_tokens = filled_tokens - requestedendredis.call("setex", tokens_key, ttl, new_tokens)redis.call("setex", timestamp_key, ttl, now)return { allowed, new_tokens }
其中每个元素的 key 和 value 可以是相同的,即请求的时间戳。
Sorted Set 可以根据时间窗口大小设置有效期,比如时间窗口为 1s 时设置过期时间 5s,在请求量不大时可以节省 Redis 服务器内存。
请求时间戳 t < 当前时间戳 now - 时间窗口大小 interval
long now = System.currentTimeMillis();long maxScoreMs = now - windowInSecond * 1000;Transaction redis = jedisPool.getResource().multi();redis.zremrangeByScore(key, 0, maxScoreMs);redis.zadd(key, now, now + "-" + Math.random()); // 加入一个随机值使得member不重复redis.expire(key, windowInSecond);redis.exec();
使用 Redis 事务 MULTI/EXEC。
使用 RedLock(https://redis.io/topics/distlock) 等分布式锁,要求每个客户端操作前先获取对应 key 的分布式锁。
Lua 脚本。
将可用配额的一部分,按一定比例(例如 50%),先预分配给集群内的机器。一般是平均分配,如果预先就已经知道每台机器的流量权重,可以加权分配。每台机器消耗配额的速率不同,中间也可能有机器宕机,可能有扩缩容,因此预分配的比例不宜太大,当然也不宜太小。
每台机器在配额耗尽时,向中心系统请求配额,这里的一个优化点是每台机器会记录自身配额消耗的速率(等同于承受的流量速率),按照速率大小申请不同大小的配额,消耗速率大则一次性申请更多。
在整体可用配额不足一定比例时(例如 10%),限制每台机器一次可申请的配额数,按剩余窗口大小计算发放配额的大小,并且每次发放量不超过剩余配额的一定比例(例如 50%),使得剩余的流量能够平滑地过渡。
Sentinel 是阿里巴巴开源的、面向分布式服务架构的流量控制组件。在发布的 Sentinel Go 0.3.0 版本中,支持熔断降级,可以针对 Go 服务中的不稳定调用进行自动熔断,避免出现级联错误/雪崩,是保障服务高可用重要的一环。
点击“阅读原文”了解流控降级最佳实践~