本篇文章帶大家聊聊go語言中的限流漏桶和令牌桶庫,介紹令牌桶和漏桶的實作原理以及在實際專案中簡單應用。
在大數據量高並發存取時,經常會出現服務或介面面對大量的請求而導致資料庫崩潰的情況,甚至引發連鎖反映導致整個系統崩潰。或者有人惡意攻擊網站,大量的無用請求出現會導致快取穿透的情況出現。使用限流中間件可以在短時間內對請求進行限制數量,起到降級的作用,從而保障了網站的安全性。
使用訊息中間件進行統一限制(降速)
使用限流方案將多餘請求傳回(限流)
升級伺服器
快取(但仍有快取穿透等危險)
等等等
可以看出在程式碼已經無法提升的情況下,只能去提升硬體等級。或是改動架構再加一層!也可以使用訊息中間件統一處理。而結合看來,限流方案是一種既不需要大幅改變也不需要高額開銷的策略。
令牌桶演算法
漏桶演算法
滑動視窗演算法
等等
go get -u go.uber.org/ratelimit
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicBased(rate, opts...)
}
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}
物件進行初始化#根據傳入的值來初始化一個桶結構體
rate
為
傳參。
每一滴水所需的時間
maxSlack
寬鬆度(寬鬆度為負值) <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;"> // Clock is the minimum necessary interface to instantiate a rate limiter with
// a clock or mock clock, compatible with clocks created using
// github.com/andres-erbsen/clock.
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}</pre><div class="contentsignin">登入後複製</div></div>
同時也需要結構體Clock
來記錄目前請求的時間now
和此刻的請求所需要花費等待的時間
<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;"> type state struct {
last time.Time
sleepFor time.Duration
}</pre><div class="contentsignin">登入後複製</div></div>
func (t *atomicLimiter) Take() time.Time { var ( newState state taken bool interval time.Duration ) for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{ last: now, sleepFor: oldState.sleepFor, } if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue } // 计算是否需要进行等待取水操作 newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔) // 如果等待取水时间特别小,就需要松紧度进行维护 if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } // 如果等待时间大于0,就进行更新 if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) // 最后返回需要等待的时间 return newState.last }
記錄目前的時間
oldState. last.IsZero()
判斷是不是第一次取水,如果是就直接將
如果
如果
newState.sleepFor > 0
就直接更新結構體中上次執行時間newState.last = newState.last.Add(newState. sleepFor)
並記錄需要等待的時間
如果允許取水和等待操作,那就說明沒有發生並發競爭的情況,就模擬睡眠時間
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
func ratelimit1() func(ctx *gin.Context) { r1 := rate1.New(100) return func(ctx *gin.Context) { now := time.Now() // Take 返回的是一个 time.Duration的时间 if r1.Take().Sub(now) > 0 { // 返回的时间比当前的时间还大,说明需要进行等待 // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行 // 如果不需要等待请求时间,就直接进行Abort 然后返回 response(ctx, http.StatusRequestTimeout, "rate1 limit...") fmt.Println("rate1 limit...") ctx.Abort() return } // 放行 ctx.Next() } }
如果你的業務允許回應等待,那麼該請求等待對應的接水時間後進行下一步。具體程式碼就是將
这里定义了一个响应函数和一个handler
函数方便测试
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
执行go test -run=Run -v
先开启一个web服务
func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
使用接口压力测试工具go-wrk
进行测试->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest
下载
Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间
输入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流积攒(压测速度过快)。
可以看出,89
个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。
引入ratelimit
库
go get -u github.com/juju/ratelimit
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
进行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
令牌桶初始化过程,初始化结构体 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量。
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
调用TakeAvailable
函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // 如果需要取出的令牌数小于等于零,那么就返回0个令牌 if count <= 0 { return 0 } // 根据时间对当前桶中令牌数进行计算 tb.adjustavailableTokens(tb.currentTick(now)) // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌 if tb.availableTokens <= 0 { return 0 } // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数 if count > tb.availableTokens { count = tb.availableTokens } // 调整令牌数 tb.availableTokens -= count return count }
如果需要取出的令牌数小于等于零,那么就返回0个令牌
根据时间对当前桶中令牌数进行计算
计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
调整令牌数
func (tb *Bucket) adjustavailableTokens(tick int64) { lastTick := tb.latestTick tb.latestTick = tick // 如果当前令牌数大于最大等于容量,直接返回最大容量 if tb.availableTokens >= tb.capacity { return } // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量) tb.availableTokens += (tick - lastTick) * tb.quantum // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
如果当前令牌数大于最大等于容量,直接返回最大容量
当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
加锁 defer
解锁
判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
调用函数adjustTokens
获取可用的令牌数量
如果当前可以取出的令牌数小于等于0 直接返回 0
如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
当前的令牌数 -= 取出的令牌数 (count)
返回 count(可以取出的令牌数)
take
函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函数,可以根据最大等待时间来进行判断。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。
func ratelimit2() func(ctx *gin.Context) { // 生成速率 最大容量 r2 := rate2.NewBucket(time.Second, 200) return func(ctx *gin.Context) { //r2.Take() // 允许欠账,令牌不够也可以接收请求 if r2.TakeAvailable(1) == 1 { // 如果想要取出1个令牌并且能够取出,就放行 ctx.Next() return } response(ctx, http.StatusRequestTimeout, "rate2 limit...") ctx.Abort() return } }
压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!
令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。
以上是一文聊聊go語言中的限流漏桶和令牌桶庫的詳細內容。更多資訊請關注PHP中文網其他相關文章!