이 기사에서는 Go 언어의 현재 제한 누출 버킷 및 토큰 버킷 라이브러리에 대해 설명하고 토큰 버킷 및 누출 버킷의 구현 원리와 실제 프로젝트에서의 간단한 적용을 소개합니다.
많은 양의 데이터에 동시에 액세스하는 경우 서비스나 인터페이스가 많은 요청에 직면하여 데이터베이스가 충돌하거나 전체 시스템이 붕괴되는 연쇄 반응을 일으키는 경우가 종종 있습니다. 또는 누군가 악의적으로 웹사이트를 공격하여 쓸모없는 요청이 많아 캐시 침투가 발생할 수 있습니다. 전류 제한 미들웨어를 사용하면 단시간에 요청 횟수를 제한하고 다운그레이드하는 역할을 하여 웹사이트의 보안을 확보할 수 있습니다.
통합 제한을 위해 메시지 미들웨어 사용(속도 감소)
현재 제한 방식을 사용하여 중복 요청 반환(현재 제한)
서버 업그레이드
캐시(그러나 여전히 캐시 침투가 있음) 위험을 기다리는 중)
대기하는 중
코드를 개선할 수 없을 때 하드웨어 수준을 향상시키는 유일한 방법은 이를 개선하는 것이라고 볼 수 있습니다. 아니면 아키텍처를 변경하고 다른 레이어를 추가하세요! 통합 처리를 위해 메시지 미들웨어를 사용할 수도 있습니다. 결합된 관점에서 볼 때 전류 제한 솔루션은 큰 변경이나 높은 오버헤드가 필요하지 않은 전략입니다.
토큰 버킷 알고리즘
누수 버킷 알고리즘
슬라이딩 윈도우 알고리즘
etc.
go get -u go.uber.org/ratelimit
go get -u go.uber.org/ratelimit
// New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter { return newAtomicBased(rate, opts...) } // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter { // TODO consider moving config building to the implementation // independent code. config := buildConfig(opts) perRequest := config.per / time.Duration(rate) l := &atomicLimiter{ perRequest: perRequest, maxSlack: -1 * time.Duration(config.slack) * perRequest, clock: config.clock, } initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l }
该函数使用了函数选项模式对多个结构体对象进行初始化
根据传入的值来初始化一个桶结构体 rate
为int
传参 。
初始化过程中包括了
perquest = config.per / time.Duration(rate)
maxSlack
宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest
松紧度是用来规范等待时间的// Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface { Now() time.Time Sleep(time.Duration) }
同时还需要结构体Clock
来记录当前请求的时间now
和此刻的请求所需要花费等待的时间sleep
type state struct { last time.Time sleepFor time.Duration }
state
主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)
func (t *atomicLimiter) Take() time.Time { var ( newState state taken bool interval time.Duration ) for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{ last: now, sleepFor: oldState.sleepFor, } if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue } // 计算是否需要进行等待取水操作 newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔) // 如果等待取水时间特别小,就需要松紧度进行维护 if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } // 如果等待时间大于0,就进行更新 if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) // 最后返回需要等待的时间 return newState.last }
实现一个Take方法
该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。
记录下当前的时间 now := t.clock.Now()
oldState.last.IsZero()
判断是不是第一次取水,如果是就直接将state
结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。
如果 newState.sleepFor
非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。
如果newState.sleepFor > 0
就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)
并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0
。
如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间t.clock.Sleep(interval)
。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。
func ratelimit1() func(ctx *gin.Context) { r1 := rate1.New(100) return func(ctx *gin.Context) { now := time.Now() // Take 返回的是一个 time.Duration的时间 if r1.Take().Sub(now) > 0 { // 返回的时间比当前的时间还大,说明需要进行等待 // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行 // 如果不需要等待请求时间,就直接进行Abort 然后返回 response(ctx, http.StatusRequestTimeout, "rate1 limit...") fmt.Println("rate1 limit...") ctx.Abort() return } // 放行 ctx.Next() } }
这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。
如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。
如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将if
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
rate
는 int
매개변수입니다. 🎜🎜초기화 프로세스에는 🎜perquest = config.per / time.Duration(rate)
🎜🎜maxSlack
휴식( 느슨함은 음수 값) -1 * time.Duration(config.slack) * perRequest
느슨함은 대기 시간을 표준화하는 데 사용됩니다🎜🎜func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
현재 요청의 시간 <code>지금
과 이 순간 sleep
🎜Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
now := t.clock.Now()
🎜🎜🎜🎜oldState.last.IsZero()
처음인지 확인 물을 얻으려면 state
구조에 값을 직접 반환하세요. 이 구조에서는 마지막 실행 시간이 초기화됩니다. 처음 물을 가져오는 경우에는 현재 시간이 그대로 전달됩니다. 🎜🎜🎜🎜newState.sleepFor
가 너무 작으면 문제가 발생하므로 laxity를 사용해야 합니다. 최소값이 laxity보다 작으면 laxity를 사용하여 물 가져오는 시간을 유지합니다. . 🎜🎜🎜🎜newState.sleepFor > 0
인 경우 newState.last = newState.last.Add(newState.sleepFor)
구조에서 마지막 실행 시간을 직접 업데이트하고 대기 시간 interval, newState.sleepFor = newState.sleepFor, 0
을 기록합니다. 🎜🎜🎜🎜물 가져오기 및 대기 작업이 허용되면 동시성 경쟁이 발생하지 않으며 수면 시간이 t.clock.Sleep(interval)
로 시뮬레이션된다는 의미입니다. 그런 다음 물 검색을 위한 목표 시간이 반환되고 서버 코드는 응답을 다시 보낼지 아니면 이 시간 동안 응답이 계속될 때까지 기다릴지 결정합니다. 🎜🎜🎜func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
if
의 내용을 직접 무시하는 것입니다. (추천)🎜这里定义了一个响应函数和一个handler
函数方便测试
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
执行go test -run=Run -v
先开启一个web服务
func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
使用接口压力测试工具go-wrk
进行测试->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest
下载
Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间
输入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流积攒(压测速度过快)。
可以看出,89
个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。
引入ratelimit
库
go get -u github.com/juju/ratelimit
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
进行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 单次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
令牌桶初始化过程,初始化结构体 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量。
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
调用TakeAvailable
函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // 如果需要取出的令牌数小于等于零,那么就返回0个令牌 if count <= 0 { return 0 } // 根据时间对当前桶中令牌数进行计算 tb.adjustavailableTokens(tb.currentTick(now)) // 计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌 if tb.availableTokens <= 0 { return 0 } // 如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数 if count > tb.availableTokens { count = tb.availableTokens } // 调整令牌数 tb.availableTokens -= count return count }
如果需要取出的令牌数小于等于零,那么就返回0个令牌
根据时间对当前桶中令牌数进行计算
计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
调整令牌数
func (tb *Bucket) adjustavailableTokens(tick int64) { lastTick := tb.latestTick tb.latestTick = tick // 如果当前令牌数大于最大等于容量,直接返回最大容量 if tb.availableTokens >= tb.capacity { return } // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量) tb.availableTokens += (tick - lastTick) * tb.quantum // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
如果当前令牌数大于最大等于容量,直接返回最大容量
当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
加锁 defer
解锁
判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
调用函数adjustTokens
获取可用的令牌数量
如果当前可以取出的令牌数小于等于0 直接返回 0
如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
当前的令牌数 -= 取出的令牌数 (count)
返回 count(可以取出的令牌数)
take
函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函数,可以根据最大等待时间来进行判断。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。
func ratelimit2() func(ctx *gin.Context) { // 生成速率 最大容量 r2 := rate2.NewBucket(time.Second, 200) return func(ctx *gin.Context) { //r2.Take() // 允许欠账,令牌不够也可以接收请求 if r2.TakeAvailable(1) == 1 { // 如果想要取出1个令牌并且能够取出,就放行 ctx.Next() return } response(ctx, http.StatusRequestTimeout, "rate2 limit...") ctx.Abort() return } }
压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!
令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。
【관련 추천: Go 비디오 튜토리얼, 프로그래밍 교육】
위 내용은 Go 언어의 현재 제한적인 누수 버킷 및 토큰 버킷 라이브러리에 대해 설명하는 기사의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!