让我们想象这样一个场景:有一个与第三方 API 交互的分布式应用程序。通常,第三方 API 具有速率限制控制机制,以避免其客户端突发请求并导致服务停机。在这样的场景下,调用者如何在分布式环境中控制向第三方API发出请求的速率?这篇文章讨论了解决这个问题的可能策略。
有多种算法可以控制请求速率,但这里我们将重点关注令牌桶算法,因为它相对容易理解和实现。该算法规定:一个桶最多可以容纳T个令牌,当应用程序想要向第三方API发出请求时,它必须采取1桶中的令牌。如果桶是空的,则必须等到桶中至少有 1 个令牌。此外,存储桶会以 R 令牌/毫秒的固定速率重新填充 1 令牌。
令牌桶算法非常容易理解,但是如何在分布式环境中使用它来控制对第三方 API 的传出请求?
如果想要在分布式环境中控制传出速率限制,则需要当前速率限制的集中事实来源。有多种方法可以实现事实来源,我用可能的实现理想化了下图:
在上图中,我们有一个分布在多个pod中的应用程序,每个pod都可以向第三方API发出请求。在应用基础设施中,有一个TCP服务器,它通过令牌桶算法来控制速率限制。在向第三方 API 发出请求之前,pod 会向 TCP 服务器请求新的令牌,并且 pod 会等待 TCP 服务器的响应,直到至少有一个可用令牌。令牌可用后,Pod 向第三方 API 发出请求。
TCP 服务器实现可以在这个存储库 https://github.com/rafaquelhodev/rlimit/ 中找到,在下一节中我将简要讨论 golang 中的令牌桶实现。
下面,我将展示令牌桶实现背后的主要思想。请查看 https://github.com/rafaquelhodev/rlimit/ 存储库以了解详细的实现。
速率限制控制集中在TokenBucket结构体中:
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
您可以注意到 TokenBucket 结构中有一个 subs 属性。基本上,这是特定令牌桶的订阅者数组:每次客户端请求令牌时,客户端都会添加到 subs 数组中,并且当新令牌添加到存储桶时客户端会收到通知。
启动桶时,我们需要提供桶可以支持的最大令牌数(maxTokens)以及令牌添加到桶中的时间量(refillPeriod):
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
现在,您可能想知道“如何将令牌添加到存储桶中?”。为此,当创建存储桶时,会启动一个 cron 作业,并在每个 refillPeriod 毫秒时,将一个新令牌添加到存储桶中:
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case <-tb.cron: ticker.Stop() return case <-ticker.C: if tb.tokens < tb.maxTokens { tb.tokens += 1 fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens) if len(tb.subs) > 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] sub <- true } } } } }() }
最后,当客户端想要从存储桶中获取令牌时,必须调用 waitAvailable 函数:
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock() <-ch fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id) tb.tokens -= 1 return true }
灵感来自 https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
以上是控制传出速率限制的详细内容。更多信息请关注PHP中文网其他相关文章!