client-go 源码分析(9) - workerqueue之限速队列RateLimitingQueue
workerqueue的限速队列RateLimitingQueue搞明白三件事就可以了。
- 代码结构
- 5种RateLimitingQueue(限速队列)
- Kubernetes主要用了上述5种限速队列的哪几种
代码结构
因为普通队列Queue,延时队列DelayingQueue,限速队列RateLimitingQueue,后一个队列以前一个队列的实现为基础,层层添加新功能,所以rateLimitingType结构体包装了延迟队列的接口和RateLimiter接口。而5种限速限速队列都实现了RateLimiter接口。
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
rateLimitingType结构体实现了RateLimitingInterface接口的所有方法:
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
其中 AddRateLimited方法 将限速队列的item通过When方法获取到期时间,然后通过延迟队列的AddAfter方法将该item加入队列。
5种限速限速队列
5种限速限速队列都实现了下面的RateLimiter接口。
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
BucketRateLimiter
令牌桶限速器BucketRateLimiter 用了 golang 标准库的令牌桶限流器 golang.org/x/time/rate.Limiter 实现。所有元素都是一样的,来几次都是一样,所以NumRequeues,Forget都没有意义。
type BucketRateLimiter struct {
*rate.Limiter
}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
ItemExponentialFailureRateLimiter
指数失败限速器ItemExponentialFailureRateLimiter,失败次数越多,限速越长而且是指数级增长的一种限速器。当然也不是无限增长下去,有最大限速时间配置。
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter 对一定次数的尝试进行快速重试,然后进行慢速重试。
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter
MaxOfRateLimiter 调用每个 RateLimiter 并返回最坏情况响应。
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
WithMaxWaitRateLimiter
使用MaxWaitRateLimiter具有maxDelay,可避免等待太长时间。
type WithMaxWaitRateLimiter struct {
limiter RateLimiter
maxDelay time.Duration
}
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay
}
return delay
}
func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
w.limiter.Forget(item)
}
func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
return w.limiter.NumRequeues(item)
}
Kubernetes主要用了上述5种限速队列的哪几种
可以看出基本就用了两种限速队列BucketRateLimiter和ItemExponentialFailureRateLimiter,且以BucketRateLimiter为主。
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}