Home > Backend Development > Golang > An article to talk about the current-limiting leaky bucket and token bucket libraries in the Go language

An article to talk about the current-limiting leaky bucket and token bucket libraries in the Go language

青灯夜游
Release: 2023-02-03 19:00:22
forward
3378 people have browsed it

This article will talk about the current-limiting leaky bucket and token bucket libraries in the Go language, introduce the implementation principles of token buckets and leaky buckets, and their simple application in actual projects.

An article to talk about the current-limiting leaky bucket and token bucket libraries in the Go language

#Why do we need current limiting middleware?

When a large amount of data is accessed concurrently, it often happens that a service or interface faces a large number of requests, causing the database to crash , or even triggering a chain reaction that causes the entire system to collapse. . Or someone maliciously attacks the website, and a large number of useless requests may lead to cache penetration. Using current-limiting middleware can limit the number of requests in a short period of time and play a role in downgrading, thereby ensuring the security of the website.

Strategy for dealing with large number of concurrent requests?

  • Use message middleware for unified restriction (speed reduction)

  • Use the current limiting scheme to return redundant requests (current limiting)

  • Upgrade the server

  • Cache (but there are still dangers such as cache penetration)

  • etc. Wait

It can be seen that when the code can no longer be improved, the only way to improve the hardware level is to improve it. Or change the architecture and add another layer! You can also use message middleware for unified processing. From a combined perspective, the current limiting solution is a strategy that requires neither major changes nor high overhead.

Common current limiting schemes

  • Token Bucket Algorithm

  • Leaky Bucket Algorithm

  • Sliding window algorithm

  • Wait

Leaky bucket

Introducing the ratelimit library

##go get -u go.uber.org/ratelimit

Library function source code
 // New returns a Limiter that will limit to the given RPS.
 func New(rate int, opts ...Option) Limiter {
     return newAtomicBased(rate, opts...)
 }
 
 // newAtomicBased returns a new atomic based limiter.
 func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
     // TODO consider moving config building to the implementation
     // independent code.
     config := buildConfig(opts)
     perRequest := config.per / time.Duration(rate)
     l := &atomicLimiter{
         perRequest: perRequest,
         maxSlack:   -1 * time.Duration(config.slack) * perRequest,
         clock:      config.clock,
     }
 
     initialState := state{
         last:     time.Time{},
         sleepFor: 0,
     }
     atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
     return l
 }
Copy after login

This function uses the

function option mode to initialize multiple structures objects

according to the incoming The value to initialize a bucket structure

rate is passed as int parameter.

The initialization process includes

    The time required for each drop of water
  • perquest = config.per / time.Duration(rate)
  • maxSlack Slack (laxity is a negative value) -1 * time.Duration(config.slack) * perRequest The slack is used to standardize the waiting time
  •  // Clock is the minimum necessary interface to instantiate a rate limiter with
     // a clock or mock clock, compatible with clocks created using
     // github.com/andres-erbsen/clock.
     type Clock interface {
        Now() time.Time
        Sleep(time.Duration)
     }
    Copy after login
At the same time, a structure

Clock is also needed to record the time of the current requestnowand the time it takes to wait for the request at this momentsleep

 type state struct {
    last     time.Time
    sleepFor time.Duration
 }
Copy after login

state Mainly used to record the last execution time and the waiting time for the current execution request (as an intermediate state record)

The most important Take logic
 func (t *atomicLimiter) Take() time.Time {
    var (
       newState state
       taken    bool
       interval time.Duration
    )
    for !taken {
       now := t.clock.Now()
 
       previousStatePointer := atomic.LoadPointer(&t.state)
       oldState := (*state)(previousStatePointer)
 
       newState = state{
          last:     now,
          sleepFor: oldState.sleepFor,
       }

       if oldState.last.IsZero() {
          taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
          continue
       }
       // 计算是否需要进行等待取水操作
       newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔)
        
        // 如果等待取水时间特别小,就需要松紧度进行维护
       if newState.sleepFor < t.maxSlack {
          newState.sleepFor = t.maxSlack
       }
        // 如果等待时间大于0,就进行更新
       if newState.sleepFor > 0 {
          newState.last = newState.last.Add(newState.sleepFor)
          interval, newState.sleepFor = newState.sleepFor, 0
       }
       taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
    }
    t.clock.Sleep(interval)
    // 最后返回需要等待的时间
     return newState.last
 }
Copy after login

Implementing a Take method

  • The Take method will perform atomic operations (can be understood as locking and unlocking), which can still be guaranteed under a large number of concurrent requests. Normal use.

  • Record the current time

    now := t.clock.Now()

  • oldState. last.IsZero() Determine whether it is the first time to fetch water. If so, directly return the value in the state structure. The last execution time is initialized in this structure. If it is the first time to fetch water, it will be passed directly as the current time.

  • If

    newState.sleepFor is very small, problems will occur, so you need to use the lenient degree. Once the minimum value is smaller than the lenient degree, use the lenient degree to adjust Carry out maintenance during water intake time.

  • If

    newState.sleepFor > 0 directly update the last execution time in the structure newState.last = newState.last.Add(newState. sleepFor)and record the time to waitinterval, newState.sleepFor = newState.sleepFor, 0.

  • If water fetching and waiting operations are allowed, it means that no concurrency competition occurs, and the sleep time is simulated

    t.clock.Sleep(interval). Then the target time for water retrieval is returned, and the server code determines whether to send back a response or wait for this time to continue responding.

t.clock.Sleep(interval)
 func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
Copy after login

In fact, when a request comes, the current limiter will sleep. time, and return the latest water retrieval time after sleeping.

Practical application (using Gin framework)
 func ratelimit1() func(ctx *gin.Context) {
     r1 := rate1.New(100)
     return func(ctx *gin.Context) {
         now := time.Now()
         //  Take 返回的是一个 time.Duration的时间
         if r1.Take().Sub(now) > 0 {
             // 返回的时间比当前的时间还大,说明需要进行等待
             // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
             // 如果不需要等待请求时间,就直接进行Abort 然后返回
             response(ctx, http.StatusRequestTimeout, "rate1 limit...")
             fmt.Println("rate1 limit...")
             ctx.Abort()
             return
         }
         // 放行
         ctx.Next()
     }
 }
Copy after login

Here you can choose whether to return. Because Take will definitely execute the sleep function, when the execution of take ends, it means that the current request has been received. The current demonstration uses the first case.

  • If your business requires a response, waiting is not allowed. Then you can complete the request after receiving the water, as in the above example.

  • If your business allows response waiting, then the request will wait for the corresponding water receiving time before proceeding to the next step. The specific code is to directly ignore the content in

    if. (recommended)

测试代码

这里定义了一个响应函数和一个handler函数方便测试

 func response(c *gin.Context, code int, info any) {
    c.JSON(code, info)
 }
 
 func pingHandler(c *gin.Context) {
    response(c, 200, "ping ok~")
 }
Copy after login

执行go test -run=Run -v先开启一个web服务

 func TestRun(t *testing.T) {
    r := gin.Default()
 
    r.GET("/ping1", ratelimit1(), pingHandler)
    r.GET("/ping2", ratelimit2(), helloHandler)
 
    _ = r.Run(":4399")
 }
Copy after login

使用接口压力测试工具go-wrk进行测试->tsliwowicz/go-wrk: go-wrk)

在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest下载

使用帮助

    Usage: go-wrk <options> <url>
    Options:
     -H       Header to add to each request (you can define multiple -H flags) (Default )
     -M       HTTP method (Default GET)
     -T       Socket/request timeout in ms (Default 1000)
     -body    request body string or @filename (Default )
     -c       Number of goroutines to use (concurrent connections) (Default 10)
     -ca      CA file to verify peer against (SSL/TLS) (Default )
     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
     -d       Duration of test in seconds (Default 10)
     -f       Playback file name (Default <empty>)
     -help    Print help (Default false)
     -host    Host Header (Default )
     -http    Use HTTP/2 (Default true)
     -key     Private key file name (SSL/TLS (Default )
     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
     -no-vr   Skip verifying SSL certificate of the server (Default false)
     -redir   Allow Redirects (Default false)
     -v       Print version details (Default false)
Copy after login

-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间

输入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

可以稍微等待一下水流积攒(压测速度过快)。

An article to talk about the current-limiting leaky bucket and token bucket libraries in the Go language可以看出,89个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。

令牌桶

引入ratelimit

go get -u github.com/juju/ratelimit

初始化

 // NewBucket returns a new token bucket that fills at the
 // rate of one token every fillInterval, up to the given
 // maximum capacity. Both arguments must be
 // positive. The bucket is initially full.
 func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
    return NewBucketWithClock(fillInterval, capacity, nil)
 }
 
 // NewBucketWithClock is identical to NewBucket but injects a testable clock
 // interface.
 func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
    return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
 }
Copy after login

进行Bucket桶的初始化。

 func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
    if clock == nil {
       clock = realClock{}
    }
     // 填充速率
    if fillInterval <= 0 {
       panic("token bucket fill interval is not > 0")
    }
     // 最大令牌容量
    if capacity <= 0 {
       panic("token bucket capacity is not > 0")
    }
     // 单次令牌生成量
    if quantum <= 0 {
       panic("token bucket quantum is not > 0")
    }
    return &Bucket{
       clock:           clock,
       startTime:       clock.Now(),
       latestTick:      0,
       fillInterval:    fillInterval,
       capacity:        capacity,
       quantum:         quantum,
       availableTokens: capacity,
    }
 }
Copy after login

令牌桶初始化过程,初始化结构体 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量

调用

 // TakeAvailable takes up to count immediately available tokens from the
 // bucket. It returns the number of tokens removed, or zero if there are
 // no available tokens. It does not block.
 func (tb *Bucket) TakeAvailable(count int64) int64 {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    return tb.takeAvailable(tb.clock.Now(), count)
 }
Copy after login

调用TakeAvailable函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。

内部实现

 func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
    // 如果需要取出的令牌数小于等于零,那么就返回0个令牌
     if count <= 0 {
       return 0
    }
     // 根据时间对当前桶中令牌数进行计算
    tb.adjustavailableTokens(tb.currentTick(now))
     // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
    if tb.availableTokens <= 0 {
       return 0
    }
     // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
    if count > tb.availableTokens {
       count = tb.availableTokens
    }
     // 调整令牌数
    tb.availableTokens -= count
    return count
 }
Copy after login
  • 如果需要取出的令牌数小于等于零,那么就返回0个令牌

  • 根据时间对当前桶中令牌数进行计算

  • 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌

  • 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数

  • 调整令牌数

调整令牌

 func (tb *Bucket) adjustavailableTokens(tick int64) {
    lastTick := tb.latestTick
    tb.latestTick = tick
     // 如果当前令牌数大于最大等于容量,直接返回最大容量
    if tb.availableTokens >= tb.capacity {
       return
    }
     // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
    tb.availableTokens += (tick - lastTick) * tb.quantum
     // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
    if tb.availableTokens > tb.capacity {
       tb.availableTokens = tb.capacity
    }
    return
 }
Copy after login
  • 如果当前令牌数大于最大等于容量,直接返回最大容量

  • 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)

  • 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数

实现原理

  • 加锁 defer 解锁

  • 判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0

  • 调用函数adjustTokens 获取可用的令牌数量

  • 如果当前可以取出的令牌数小于等于0 直接返回 0

  • 如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数

  • 当前的令牌数 -= 取出的令牌数 (count)

  • 返回 count(可以取出的令牌数)

额外介绍

take函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函数,可以根据最大等待时间来进行判断。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。

测试

 func ratelimit2() func(ctx *gin.Context) {
     // 生成速率 最大容量
     r2 := rate2.NewBucket(time.Second, 200)
     return func(ctx *gin.Context) {
         //r2.Take() // 允许欠账,令牌不够也可以接收请求
         if r2.TakeAvailable(1) == 1 {
             // 如果想要取出1个令牌并且能够取出,就放行
             ctx.Next()
             return
         }
         response(ctx, http.StatusRequestTimeout, "rate2 limit...")
         ctx.Abort()
         return
     }
 }
Copy after login

An article to talk about the current-limiting leaky bucket and token bucket libraries in the Go language压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!

小结

令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。

【Related recommendations: Go video tutorial, Programming teaching

The above is the detailed content of An article to talk about the current-limiting leaky bucket and token bucket libraries in the Go language. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:juejin.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template