Stellen wir uns ein Szenario vor, in dem man eine verteilte Anwendung hat, die mit einer Drittanbieter-API interagiert. Normalerweise verfügen APIs von Drittanbietern über einen Ratenbegrenzungskontrollmechanismus, um zu verhindern, dass ihre Clients Anfragen überhäufen und Ausfallzeiten bei ihren Diensten verursachen. Wie kann der Aufrufer in einem solchen Szenario die Rate ausgehender Anforderungen an die Drittanbieter-API in einer verteilten Umgebung steuern? In diesem Beitrag wird eine mögliche Strategie für dieses Problem besprochen.
Es gibt mehrere Algorithmen zur Steuerung der Anforderungsrate, aber hier konzentrieren wir uns auf den Token-Bucket-Algorithmus, da er relativ einfach zu verstehen und zu implementieren ist. Dieser Algorithmus besagt: Ein Bucket kann maximal T Token enthalten, und wenn eine Anwendung eine Anfrage an die Drittanbieter-API stellen möchte, muss sie 1 annehmen Token aus dem Eimer. Wenn der Bucket leer ist, muss gewartet werden, bis sich mindestens 1 Token im Bucket befindet. Außerdem wird der Bucket mit 1 Token mit einer festen Rate von R Tokens/Millisekunden aufgefüllt.
Der Token-Bucket-Algorithmus ist sehr einfach zu verstehen, aber wie kann jemand ihn in einer verteilten Umgebung verwenden, um die ausgehende Anfrage an APIs von Drittanbietern zu steuern?
Wenn man das ausgehende Ratenlimit in einer verteilten Umgebung kontrollieren möchte, ist eine zentrale Quelle der Wahrheit für das aktuelle Ratenlimit erforderlich. Es gibt mehrere Möglichkeiten, die Quelle der Wahrheit zu implementieren, und ich habe das folgende Diagramm mit einer möglichen Implementierung idealisiert:
In der Abbildung oben haben wir eine verteilte Anwendung in mehreren Pods, und jeder Pod kann Anfragen an eine Drittanbieter-API stellen. In der Anwendungsinfrastruktur gibt es einen TCP-Server, der die Ratenbegrenzung mithilfe des Token-Bucket-Algorithmus steuert. Bevor der Pod eine Anfrage an die Drittanbieter-API stellt, fragt er den TCP-Server nach einem neuen Token und wartet auf eine Antwort vom TCP-Server, bis mindestens ein Token verfügbar ist. Sobald ein Token verfügbar ist, stellt der Pod die Anfrage an die Drittanbieter-API.
Die TCP-Server-Implementierung finden Sie in diesem Repository https://github.com/rafaquelhodev/rlimit/ und im nächsten Abschnitt werde ich kurz die Token-Bucket-Implementierung in Golang besprechen.
Im Folgenden zeige ich die Hauptideen hinter der Token-Bucket-Implementierung. Bitte werfen Sie einen Blick auf das Repository https://github.com/rafaquelhodev/rlimit/, um die detaillierte Implementierung zu verstehen.
Die Kontrolle der Ratenbegrenzung ist in der TokenBucket-Struktur zentralisiert:
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
Sie können feststellen, dass die TokenBucket-Struktur eine subs-Eigenschaft enthält. Im Grunde handelt es sich hierbei um ein Array von Abonnenten für einen bestimmten Token-Bucket: Jedes Mal, wenn ein Token von einem Client angefordert wird, wird der Client zum Subs-Array hinzugefügt und der Client wird benachrichtigt, wenn ein neues Token zum Bucket hinzugefügt wird.
Beim Starten des Buckets müssen wir eine maximale Anzahl von Tokens angeben, die der Bucket unterstützen kann (maxTokens) und die Zeitspanne, für die ein Token dem Bucket hinzugefügt wird (refillPeriod):
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
Nun fragen Sie sich vielleicht: „Wie wird ein Token zum Bucket hinzugefügt?“. Zu diesem Zweck wird beim Erstellen eines Buckets ein Cron-Job gestartet und bei jeder fillPeriod-Millisekunden wird dem Bucket ein neues Token hinzugefügt:
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case <-tb.cron: ticker.Stop() return case <-ticker.C: if tb.tokens < tb.maxTokens { tb.tokens += 1 fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens) if len(tb.subs) > 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] sub <- true } } } } }() }
Wenn ein Client schließlich ein Token aus dem Bucket möchte, muss die Funktion „waitAvailable“ aufgerufen werden:
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock() <-ch fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id) tb.tokens -= 1 return true }
Inspiriert von https://github.com/Mohamed-khattab/Token-bucket-rate-limiter
Das obige ist der detaillierte Inhalt vonKontrolle des ausgehenden Ratenlimits. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!