この記事では、Go 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明し、トークン バケットとリーキー バケットの実装原則と実際のプロジェクトでの簡単なアプリケーションを紹介します。
大量のデータが同時にアクセスされると、サービスまたはインターフェイス が大量のリクエストに直面し、データベースがクラッシュ したり、場合によってはエラーが発生したりすることがよくあります。システム全体を崩壊させる連鎖反応。あるいは、誰かが Web サイトを悪意を持って攻撃し、大量の無駄なリクエストがキャッシュの侵入につながる可能性があります。電流制限ミドルウェアを使用すると、短期間のリクエスト数を制限し、ダウングレードに役割を果たし、Web サイトのセキュリティを確保できます。
メッセージミドルウェアを使用して統合制限 (速度低下) を行う
電流制限スキームを使用して冗長なリクエストを返す (電流制限)
ライブラリ関数のソースcode
// New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter { return newAtomicBased(rate, opts...) } // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter { // TODO consider moving config building to the implementation // independent code. config := buildConfig(opts) perRequest := config.per / time.Duration(rate) l := &atomicLimiter{ perRequest: perRequest, maxSlack: -1 * time.Duration(config.slack) * perRequest, clock: config.clock, } initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l }
複数の構造 オブジェクトを初期化します。バケット構造 rate は int
パラメータとして渡されます。初期化プロセスには、
水滴ごとに必要な時間
-1 * time.Duration(config.slack) * perRequest
Slack は待ち時間を標準化するために使用されます // Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface { Now() time.Time Sleep(time.Duration) }
同時に、現在のリクエストの時間
nowと現時点でリクエストを待つのにかかる時間を記録するための構造体
Clock<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;"> type state struct {
last time.Time
sleepFor time.Duration
}</pre><div class="contentsignin">ログイン後にコピー</div></div>
state
主に、最後の実行時間と現在の実行リクエストの待ち時間を記録するために使用されます (中間状態レコードとして)
<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:js;toolbar:false;"> func (t *atomicLimiter) Take() time.Time {
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// 计算是否需要进行等待取水操作
newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔)
// 如果等待取水时间特别小,就需要松紧度进行维护
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果等待时间大于0,就进行更新
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
// 最后返回需要等待的时间
return newState.last
}</pre><div class="contentsignin">ログイン後にコピー</div></div>
Take メソッドの実装
oldState.last .IsZero()
水汲みが初めてかどうかを判断し、初めての場合は、
newState.sleepFor
が非常に小さい場合、問題が発生するため、緩和度を使用する必要があります。最小値が緩和度より小さくなったら、調整は緩めに行ってください。 取水時間中にメンテナンスを行ってください。
If newState.sleepFor > 0
構造体の最終実行時刻を直接更新します
。 水の取得と待機操作が許可されている場合、同時実行の競合は発生せず、スリープ時間は
t.clock.Sleep(interval)
でシミュレートされることを意味します。次に、水の回収の目標時間が返され、サーバー コードは応答を送り返すか、この時間まで応答を続けるかを決定します。
t.clock.Sleep(interval)
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
func ratelimit1() func(ctx *gin.Context) { r1 := rate1.New(100) return func(ctx *gin.Context) { now := time.Now() // Take 返回的是一个 time.Duration的时间 if r1.Take().Sub(now) > 0 { // 返回的时间比当前的时间还大,说明需要进行等待 // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行 // 如果不需要等待请求时间,就直接进行Abort 然后返回 response(ctx, http.StatusRequestTimeout, "rate1 limit...") fmt.Println("rate1 limit...") ctx.Abort() return } // 放行 ctx.Next() } }
ここで、戻るかどうかを選択できます。 Take は必ず sleep 関数を実行するため、take の実行が終了すると、現在のリクエストが受信されたことになります。現在のデモでは最初のケースを使用します。
の内容を直接無視することです。 (推奨)###
这里定义了一个响应函数和一个handler
函数方便测试
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
执行go test -run=Run -v
先开启一个web服务
func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
使用接口压力测试工具go-wrk
进行测试->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest
下载
Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间
输入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流积攒(压测速度过快)。
可以看出,89
个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。
引入ratelimit
库
go get -u github.com/juju/ratelimit
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
进行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
令牌桶初始化过程,初始化结构体 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量。
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
调用TakeAvailable
函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // 如果需要取出的令牌数小于等于零,那么就返回0个令牌 if count <= 0 { return 0 } // 根据时间对当前桶中令牌数进行计算 tb.adjustavailableTokens(tb.currentTick(now)) // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌 if tb.availableTokens <= 0 { return 0 } // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数 if count > tb.availableTokens { count = tb.availableTokens } // 调整令牌数 tb.availableTokens -= count return count }
如果需要取出的令牌数小于等于零,那么就返回0个令牌
根据时间对当前桶中令牌数进行计算
计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
调整令牌数
func (tb *Bucket) adjustavailableTokens(tick int64) { lastTick := tb.latestTick tb.latestTick = tick // 如果当前令牌数大于最大等于容量,直接返回最大容量 if tb.availableTokens >= tb.capacity { return } // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量) tb.availableTokens += (tick - lastTick) * tb.quantum // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
如果当前令牌数大于最大等于容量,直接返回最大容量
当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
加锁 defer
解锁
判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
调用函数adjustTokens
获取可用的令牌数量
如果当前可以取出的令牌数小于等于0 直接返回 0
如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
当前的令牌数 -= 取出的令牌数 (count)
返回 count(可以取出的令牌数)
take
函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函数,可以根据最大等待时间来进行判断。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。
func ratelimit2() func(ctx *gin.Context) { // 生成速率 最大容量 r2 := rate2.NewBucket(time.Second, 200) return func(ctx *gin.Context) { //r2.Take() // 允许欠账,令牌不够也可以接收请求 if r2.TakeAvailable(1) == 1 { // 如果想要取出1个令牌并且能够取出,就放行 ctx.Next() return } response(ctx, http.StatusRequestTimeout, "rate2 limit...") ctx.Abort() return } }
压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!
令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。
【関連する推奨事項: Go ビデオ チュートリアル 、プログラミング教育 】
以上がGo 言語の電流制限リーキー バケット ライブラリとトークン バケット ライブラリについて説明する記事の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。