Artikel ini akan membincangkan tentang pustaka baldi bocor dan baldi token yang mengehadkan semasa dalam bahasa Go, memperkenalkan prinsip pelaksanaan baldi token dan baldi bocor serta aplikasi mudahnya dalam projek sebenar.
Apabila sejumlah besar data diakses dengan akses serentak yang tinggi, selalunya perkhidmatan atau antara muka menghadapi sejumlah besar permintaan, menyebabkan pangkalan data ranap , atau malah mencetuskan tindak balas berantai yang menyebabkan keseluruhan sistem runtuh . Atau seseorang secara berniat jahat menyerang tapak web, dan sejumlah besar permintaan yang tidak berguna boleh membawa kepada penembusan cache. Menggunakan perisian tengah yang mengehadkan semasa boleh mengehadkan bilangan permintaan dalam tempoh yang singkat dan memainkan peranan dalam menurunkan taraf, sekali gus memastikan keselamatan tapak web.
Gunakan perisian tengah mesej untuk sekatan bersatu (pengurangan kelajuan)
Gunakan skim pengehadan semasa untuk mengembalikan permintaan berlebihan (had semasa)
Naik taraf pelayan
cache (tetapi masih terdapat bahaya seperti penembusan cache)
dsb. Menunggu
dapat dilihat apabila kod tidak boleh diperbaiki, satu-satunya cara untuk meningkatkan tahap perkakasan adalah dengan memperbaikinya. Atau tukar seni bina dan tambah lapisan lain! Anda juga boleh menggunakan perisian tengah mesej untuk pemprosesan bersatu. Dari perspektif gabungan, penyelesaian mengehadkan semasa ialah strategi yang tidak memerlukan perubahan besar mahupun overhed yang tinggi.
Algoritma Baldi Token
Algoritma Baldi Bocor
Algoritma tetingkap gelongsor
dll.
go get -u go.uber.org/ratelimit
// New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter { return newAtomicBased(rate, opts...) } // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter { // TODO consider moving config building to the implementation // independent code. config := buildConfig(opts) perRequest := config.per / time.Duration(rate) l := &atomicLimiter{ perRequest: perRequest, maxSlack: -1 * time.Duration(config.slack) * perRequest, clock: config.clock, } initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l }
Fungsi ini menggunakan mod pilihan fungsi untuk memulakan berbilang struktur objek
untuk memulakan struktur baldi mengikut nilai yang diluluskan rate
sebagai int
Lulus parameter.
Proses permulaan termasuk
perquest = config.per / time.Duration(rate)
maxSlack
Relaksasi (relaksasi ialah nilai negatif)-1 * time.Duration(config.slack) * perRequest
Keketatan digunakan untuk menyeragamkan masa menunggu // Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface { Now() time.Time Sleep(time.Duration) }
Pada masa yang sama, struktur Clock
juga diperlukan untuk merekodkan masa permintaan semasa now
dan masa menunggu. untuk permintaan pada masa ini sleep
type state struct { last time.Time sleepFor time.Duration }
state
Terutamanya digunakan untuk merekodkan masa pelaksanaan terakhir dan masa menunggu untuk permintaan pelaksanaan semasa (sebagai rekod status perantaraan)
func (t *atomicLimiter) Take() time.Time { var ( newState state taken bool interval time.Duration ) for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{ last: now, sleepFor: oldState.sleepFor, } if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue } // 计算是否需要进行等待取水操作 newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔) // 如果等待取水时间特别小,就需要松紧度进行维护 if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } // 如果等待时间大于0,就进行更新 if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) // 最后返回需要等待的时间 return newState.last }
Laksanakan kaedah Ambil
Kaedah Ambil akan melakukan operasi atom (boleh difahami sebagai mengunci dan membuka kunci), dalam sebilangan besar bersamaan Penggunaan biasa masih dijamin atas permintaan.
Rekod masa semasanow := t.clock.Now()
oldState.last.IsZero()
Nilai sama ada kali pertama mendapat air, cuma state
Nilai dalam struktur dikembalikan. Masa pelaksanaan terakhir dimulakan dalam struktur ini Jika ia adalah kali pertama untuk mengambil air, ia akan dihantar terus sebagai masa semasa.
Jika newState.sleepFor
sangat kecil, masalah akan berlaku, jadi anda perlu menggunakan kelonggaran Apabila nilai minimum lebih kecil daripada kelonggaran, gunakan kelonggaran untuk mengekalkan masa pengambilan air .
Jika newState.sleepFor > 0
, kemas kini terus masa pelaksanaan terakhir newState.last = newState.last.Add(newState.sleepFor)
dalam struktur dan rekod masa menunggu interval, newState.sleepFor = newState.sleepFor, 0
.
Jika pengambilan air dan operasi menunggu dibenarkan, ini bermakna tiada persaingan serentak berlaku dan masa tidur disimulasikan t.clock.Sleep(interval)
. Kemudian masa sasaran untuk mendapatkan semula air dikembalikan, dan kod pelayan menentukan sama ada untuk menghantar semula respons atau menunggu masa ini untuk meneruskan respons.
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
Malah, apabila permintaan datang, pengehad semasa akan Masa yang sepadan untuk tidur, dan masa pengambilan air terkini akan dikembalikan selepas tidur.
func ratelimit1() func(ctx *gin.Context) { r1 := rate1.New(100) return func(ctx *gin.Context) { now := time.Now() // Take 返回的是一个 time.Duration的时间 if r1.Take().Sub(now) > 0 { // 返回的时间比当前的时间还大,说明需要进行等待 // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行 // 如果不需要等待请求时间,就直接进行Abort 然后返回 response(ctx, http.StatusRequestTimeout, "rate1 limit...") fmt.Println("rate1 limit...") ctx.Abort() return } // 放行 ctx.Next() } }
Di sini anda boleh memilih sama ada untuk kembali. Kerana Take pasti akan melaksanakan fungsi tidur, apabila pelaksanaan take tamat, ini bermakna permintaan semasa telah diterima. Demonstrasi semasa menggunakan kes pertama.
Jika jawapan permintaan perniagaan anda tidak membenarkan menunggu. Kemudian anda boleh melengkapkan permintaan selepas menerima air, seperti dalam contoh di atas.
Jika perniagaan anda membenarkan respons menunggu, maka permintaan akan menunggu masa penerimaan air yang sepadan sebelum meneruskan ke langkah seterusnya. Kod khusus adalah untuk mengabaikan terus kandungan dalam if
. (disyorkan)
这里定义了一个响应函数和一个handler
函数方便测试
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
执行go test -run=Run -v
先开启一个web服务
func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
使用接口压力测试工具go-wrk
进行测试->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest
下载
Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间
输入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流积攒(压测速度过快)。
可以看出,89
个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。
引入ratelimit
库
go get -u github.com/juju/ratelimit
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
进行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
令牌桶初始化过程,初始化结构体 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量。
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
调用TakeAvailable
函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // 如果需要取出的令牌数小于等于零,那么就返回0个令牌 if count <= 0 { return 0 } // 根据时间对当前桶中令牌数进行计算 tb.adjustavailableTokens(tb.currentTick(now)) // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌 if tb.availableTokens <= 0 { return 0 } // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数 if count > tb.availableTokens { count = tb.availableTokens } // 调整令牌数 tb.availableTokens -= count return count }
如果需要取出的令牌数小于等于零,那么就返回0个令牌
根据时间对当前桶中令牌数进行计算
计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
调整令牌数
func (tb *Bucket) adjustavailableTokens(tick int64) { lastTick := tb.latestTick tb.latestTick = tick // 如果当前令牌数大于最大等于容量,直接返回最大容量 if tb.availableTokens >= tb.capacity { return } // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量) tb.availableTokens += (tick - lastTick) * tb.quantum // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
如果当前令牌数大于最大等于容量,直接返回最大容量
当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
加锁 defer
解锁
判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
调用函数adjustTokens
获取可用的令牌数量
如果当前可以取出的令牌数小于等于0 直接返回 0
如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
当前的令牌数 -= 取出的令牌数 (count)
返回 count(可以取出的令牌数)
take
函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函数,可以根据最大等待时间来进行判断。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。
func ratelimit2() func(ctx *gin.Context) { // 生成速率 最大容量 r2 := rate2.NewBucket(time.Second, 200) return func(ctx *gin.Context) { //r2.Take() // 允许欠账,令牌不够也可以接收请求 if r2.TakeAvailable(1) == 1 { // 如果想要取出1个令牌并且能够取出,就放行 ctx.Next() return } response(ctx, http.StatusRequestTimeout, "rate2 limit...") ctx.Abort() return } }
压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!
令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。
[Cadangan berkaitan: Pergi tutorial video, Pengajaran pengaturcaraan]
Atas ialah kandungan terperinci Artikel untuk membincangkan tentang pustaka baldi bocor dan baldi token yang mengehadkan semasa dalam bahasa Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!