上图可以看出 client-go 用到了 workqueue 队列 来处理 从 DeltaFIFO pop 出来的内容,workqueue 队列用到了限流队列(微服务中常用的技术,防止性能过载,从而导致任务处理失败)。
在分析workqueue前,需要了解下实现限流队列的限流器。限流器有多种实现方式,client-go用了其中一种,client-go用的限流器是 Golang 标准库限流器(Golang 的 timer/rate)。
本篇是关于 Golang 标准库限流器。
Golang 标准库限流器通过令牌桶实现。令牌桶可以想象有一个固定大小的桶,通过有取有放,实现了限流目的。
放:系统会以恒定速率向桶中放 Token,桶满则暂时不放。
取:用户则从桶中取 Token,如果有剩余 Token 就可以一直取。如果没有剩余 Token,则需要等到系统中被放置了 Token 才行。
直接按上面的实现,效率太低了,不仅要维护一个定时放token的定时器,还要维护一个token队列,消耗不必要的内存和算力。在 Golang 的 timer/rate 中的实现 是通过 lazyload 的方式,每次消费之前才根据时间差更新 Token 数目(是计算得到的)。
下面进入代码。内容主要是两个结构体,Limiter struct 和 Reservation struct。两个方法,reserve 预留方法 和 Token 的归还方法。
// Limit defines the maximum frequency of some events.
// Limit is represented as number of events per second.
// A zero Limit allows no events.
type Limit float64
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)
// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
type Limiter struct {
mu sync.Mutex
// limit 最大事件率
limit Limit
// burst 桶大小
burst int
// tokens 桶的当前令牌数目
tokens float64
// last is the last time the limiter's tokens field was updated
// last是关键,只加了一个结构体成员,就可以避免上面提到的不必要的定时器和token队列,转而用lazyload方式使用计算时间差的方法更新token数目
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
// ok 是否 limiter 可以提供请求所需的tokens数目
ok bool
// lim 就是上面的结构体 Limiter struct
lim *Limiter
tokens int
// timeToAct reserved action 预定的动作发生的时间
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
reserve 方法的大致流程:
- if lim.limit == Inf 如果最大时间率是无限大的,那么直接返回 Reservation struct, ok=true,预定执行时间是立刻。相当于没有限流器,限流器功能disable。
- lim.advance(now) 重新计算桶里的token数目,就是通过计算Limiter结构体的last属性减去现在的时间,算出这段时间流逝中应该往桶里加多少token,加上旧的token数目(Limiter结构体的tokens属性)就是now的token数目
- tokens -= float64(n) now的token数目减去reserve方法的入参n,就是经过reserve消费后的token数目
- 更新last时间为将now时间,返回结构体Reservation
// Usage example:
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
defer lim.mu.Unlock()
if lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
} else if lim.limit == 0 {
var ok bool
if lim.burst >= n {
ok = true
lim.burst -= n
}
return Reservation{
ok: ok,
lim: lim,
tokens: lim.burst,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
return r
}
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed.
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
CancelAt 表示预订持有者不会执行预订的操作,并考虑到可能已经进行了其他预订,并尽可能地逆转此预订对速率限制的影响。
CancelAt 方法的大致流程:
- 四种直接返回的情况: !r.ok r.lim.limit == Inf r.tokens == 0 r.timeToAct.Before(now)
- restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) 算出要归还的token数目
- r.lim.advance(now) lazyload 方式算出当前时间的token数目
- tokens += restoreTokens 算出归还后的token数目
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// advance time to now
now, _, tokens := r.lim.advance(now)
// calculate new number of tokens
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
}
转载请注明来源,欢迎指出任何有错误或不够清晰的表达。可以邮件至 backendcloud@gmail.com