L'article original est publié sur le blog VictoriaMetrics : https://victoriametrics.com/blog/go-singleflight/
Cet article fait partie d'une série sur la gestion de la concurrence dans Go :
Ainsi, lorsque vous recevez plusieurs requêtes arrivant en même temps demandant les mêmes données, le comportement par défaut est que chacune de ces requêtes va à la base de données individuellement pour obtenir les mêmes informations. . Cela signifie que vous finirez par exécuter la même requête plusieurs fois, ce qui, soyons honnêtes, est tout simplement inefficace.
Cela finit par imposer une charge inutile sur votre base de données, ce qui pourrait tout ralentir, mais il existe un moyen de contourner ce problème.
L'idée est que seule la première requête est réellement envoyée à la base de données. Le reste des demandes attendent la fin de la première. Une fois que les données reviennent de la requête initiale, les autres obtiennent le même résultat : aucune requête supplémentaire n'est nécessaire.
Alors, maintenant vous avez une assez bonne idée de ce dont parle cet article, n'est-ce pas ?
Le package singleflight de Go est spécialement conçu pour gérer exactement ce dont nous venons de parler. Et juste pour info, elle ne fait pas partie de la bibliothèque standard mais elle est maintenue et développée par l'équipe Go.
Singleflight garantit qu'un seul de ces goroutines exécute réellement l'opération, comme extraire les données de la base de données. Il permet une seule opération « en vol » (en cours) pour la même donnée (appelée « clé ») à un moment donné.
Donc, si d'autres goroutines demandent les mêmes données (même clé) pendant que cette opération est toujours en cours, ils attendront simplement. Ensuite, lorsque le premier se termine, tous les autres obtiennent le même résultat sans avoir à relancer l'opération.
Très bien, assez parlé, passons à une démo rapide pour voir comment fonctionne Singleflight en action :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Que se passe-t-il ici :
Nous simulons une situation dans laquelle 5 goroutines tentent de récupérer les mêmes données presque en même temps, espacées de 60 ms. Pour faire simple, nous utilisons des nombres aléatoires pour imiter les données extraites d'une base de données.
Avec singleflight.Group, nous nous assurons que seule la première goroutine exécute réellement fetchData() et que les autres attendent le résultat.
La ligne v, err, shared := g.Do("key-fetch-data", fetchData) attribue une clé unique ("key-fetch-data") pour suivre ces requêtes. Ainsi, si une autre goroutine demande la même clé alors que la première est encore en train de récupérer les données, elle attend le résultat plutôt que de lancer un nouvel appel.
Une fois le premier appel terminé, toutes les goroutines en attente obtiennent le même résultat, comme nous pouvons le voir dans le résultat. Bien que nous ayons eu 5 goroutines demandant les données, fetchData n'a été exécuté que deux fois, ce qui représente un énorme coup de pouce.
L'indicateur partagé confirme que le résultat a été réutilisé sur plusieurs goroutines.
"Mais pourquoi le drapeau partagé est-il vrai pour la première goroutine ? Je pensais que seuls ceux en attente auraient partagé == vrai ?"
Oui, cela peut sembler un peu contre-intuitif si vous pensez que seules les goroutines en attente auraient dû partager == true.
Le fait est que la variable partagée dans g.Do vous indique si le résultat a été partagé entre plusieurs appelants. Il s'agit essentiellement de dire : "Hé, ce résultat a été utilisé par plusieurs appelants". Il ne s'agit pas de savoir qui a exécuté la fonction, c'est juste un signal que le résultat a été réutilisé sur plusieurs goroutines.
"J'ai une cache, pourquoi ai-je besoin d'un vol unique ?"
La réponse courte est : les caches et le vol unique résolvent différents problèmes, et ils fonctionnent vraiment très bien ensemble.
Dans une configuration avec un cache externe (comme Redis ou Memcached), singleflight ajoute une couche de protection supplémentaire, non seulement pour votre base de données mais aussi pour le cache lui-même.
De plus, singleflight aide à se protéger contre une tempête d'échecs de cache (parfois appelée « débandade de cache »).
Normalement, lorsqu'une requête demande des données, si les données sont dans le cache, parfait : c'est un accès au cache. Si les données ne sont pas dans le cache, il s'agit d'un échec de cache. Supposons que 10 000 requêtes parviennent au système en même temps avant que le cache ne soit reconstruit, la base de données pourrait soudainement être confrontée à 10 000 requêtes identiques en même temps.
Pendant ce pic, singleflight garantit qu'une seule de ces 10 000 requêtes arrive réellement dans la base de données.
Mais plus tard, dans la section d'implémentation interne, nous verrons que singleflight utilise un verrou global pour protéger la carte des appels en vol, ce qui peut devenir un point de discorde unique pour chaque goroutine. Cela peut ralentir les choses, surtout si vous avez affaire à une concurrence élevée.
Le modèle ci-dessous pourrait mieux fonctionner pour les machines équipées de plusieurs processeurs :
Dans cette configuration, nous n'utilisons que le vol unique lorsqu'un échec de cache se produit.
Pour utiliser singleflight, vous créez d'abord un objet Groupe, qui est la structure de base qui suit les appels de fonction en cours liés à des touches spécifiques.
Il dispose de deux méthodes clés qui aident à éviter les appels en double :
Nous avons déjà vu comment utiliser g.Do() dans la démo, voyons comment utiliser g.DoChan() avec une fonction wrapper modifiée :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
Pour être honnête, utiliser DoChan() ici ne change pas grand-chose par rapport à Do(), puisque nous attendons toujours le résultat avec une opération de réception de canal (<-ch), qui bloque fondamentalement la même chose. façon.
Là où DoChan() brille, c'est lorsque vous souhaitez lancer une opération et faire d'autres choses sans bloquer la goroutine. Par exemple, vous pouvez gérer les délais d'attente ou les annulations plus proprement en utilisant les canaux :
package singleflight type Result struct { Val interface{} Err error Shared bool }
Cet exemple soulève également quelques problèmes que vous pourriez rencontrer dans des scénarios du monde réel :
Oui, singleflight fournit un moyen de gérer des situations comme celles-ci avec la méthode group.Forget(key), qui vous permet d'annuler une exécution en cours.
La méthode Forget() supprime une clé de la carte interne qui suit les appels de fonction en cours. C'est un peu comme "invalider" la clé, donc si vous appelez à nouveau g.Do() avec cette clé, il exécutera la fonction comme s'il s'agissait d'une nouvelle requête, au lieu d'attendre la fin de l'exécution précédente.
Mettons à jour notre exemple pour utiliser Forget() et voyons combien de fois la fonction est réellement appelée :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Goroutine 0 et Goroutine 1 appellent tous deux Do() avec la même clé ("key-fetch-data"), et leurs requêtes sont combinées en une seule exécution et le résultat est partagé entre les deux goroutines.
Goroutine 2, en revanche, appelle Forget() avant d'exécuter Do(). Cela efface tout résultat précédent lié à "key-fetch-data", ce qui déclenche une nouvelle exécution de la fonction.
Pour résumer, même si le vol unique est utile, il peut quand même avoir quelques cas extrêmes, par exemple :
Si vous avez remarqué tous les problèmes dont nous avons discuté, passons à la section suivante pour discuter du fonctionnement réel de Singleflight sous le capot.
En utilisant singleflight, vous avez peut-être déjà une idée de base de son fonctionnement en interne, l'implémentation complète de singleflight ne représente qu'environ 150 lignes de code.
Fondamentalement, chaque clé unique reçoit une structure qui gère son exécution. Si une goroutine appelle Do() et constate que la clé existe déjà, cet appel sera bloqué jusqu'à la fin de la première exécution, et voici la structure :
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
Deux primitives de synchronisation sont utilisées ici :
Nous allons nous concentrer ici sur la méthode group.Do() puisque l'autre méthode, group.DoChan(), fonctionne de la même manière. La méthode group.Forget() est également simple car elle supprime simplement la clé de la carte.
Lorsque vous appelez group.Do(), la première chose qu'il fait est de verrouiller toute la carte des appels (g.mu).
"N'est-ce pas mauvais pour la performance ?"
Oui, ce n'est peut-être pas idéal pour les performances dans tous les cas (il est toujours bon de comparer en premier) car un seul vol verrouille toutes les clés. Si vous recherchez de meilleures performances ou si vous travaillez à grande échelle, une bonne approche consiste à fragmenter ou à distribuer les clés. Au lieu d'utiliser un seul groupe de vols, vous pouvez répartir la charge sur plusieurs groupes, un peu comme si vous faisiez "multiflight" à la place
Pour référence, consultez ce dépôt : shardedsingleflight.
Maintenant, une fois qu'il a le verrou, le groupe regarde la carte interne (g.m), s'il y a déjà un appel en cours ou terminé pour la clé donnée. Cette carte garde une trace de tout travail en cours ou terminé, avec des clés mappées aux tâches correspondantes.
Si la clé est trouvée (un autre goroutine exécute déjà la tâche), au lieu de démarrer un nouvel appel, nous incrémentons simplement un compteur (c.dups) pour suivre les demandes en double. La goroutine libère ensuite le verrou et attend la fin de la tâche d'origine en appelant call.wg.Wait() sur le WaitGroup associé.
Lorsque la tâche d'origine est terminée, cette goroutine récupère le résultat et évite de réexécuter la tâche.
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Si aucune autre goroutine ne travaille sur cette clé, la goroutine actuelle assume la responsabilité de l'exécution de la tâche.
À ce stade, nous créons un nouvel objet d'appel, l'ajoutons à la carte et initialisons son WaitGroup. Ensuite, nous déverrouillons le mutex et procédons à l'exécution de la tâche nous-mêmes via une méthode d'assistance g.doCall(c, key, fn). Une fois la tâche terminée, toutes les goroutines en attente sont débloquées par l'appel wg.Wait().
Rien de bien fou ici, à part la façon dont nous gérons les erreurs, il y a trois scénarios possibles :
C'est là que les choses commencent à devenir un peu plus intelligentes dans la méthode d'assistance g.doCall().
"Attendez, c'est quoi runtime.Goexit() ?"
Avant de plonger dans le code, permettez-moi de vous expliquer rapidement, runtime.Goexit() est utilisé pour arrêter l'exécution d'une goroutine.
Lorsqu'une goroutine appelle Goexit(), elle s'arrête et toutes les fonctions différées sont toujours exécutées dans l'ordre Last-In-First-Out (LIFO), comme d'habitude. C'est similaire à une panique, mais il y a quelques différences :
Maintenant, voici une bizarrerie intéressante (pas directement liée à notre sujet, mais qui mérite d'être mentionnée). Si vous appelez runtime.Goexit() dans la goroutine principale (comme dans main()), vérifiez ceci :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Ce qui se passe, c'est que Goexit() met fin à la goroutine principale, mais s'il y a d'autres goroutines encore en cours d'exécution, le programme continue car le runtime Go reste actif tant qu'au moins une goroutine est active. Cependant, une fois qu'il ne reste plus de goroutines, il plante avec une erreur "pas de goroutines", une sorte de petit cas amusant.
Maintenant, revenons à notre code, si runtime.Goexit() termine uniquement la goroutine actuelle et ne peut pas être interceptée par recovery(), comment pouvons-nous détecter si elle a été appelée ?
La clé réside dans le fait que lorsque runtime.Goexit() est invoqué, tout code ultérieur n'est pas exécuté.
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
Dans le cas ci-dessus, la ligne normalReturn = true n'est jamais exécutée après avoir appelé runtime.Goexit(). Ainsi, à l’intérieur du defer, nous pouvons vérifier si normalReturn est toujours faux pour détecter que cette méthode spéciale a été appelée.
La prochaine étape consiste à déterminer si la tâche panique ou non. Pour cela, nous utilisons recovery() comme retour normal, bien que le code réel dans singleflight soit un peu plus subtil :
package singleflight type Result struct { Val interface{} Err error Shared bool }
Au lieu de définir récupéré = true directement à l'intérieur du bloc de récupération, ce code devient un peu sophistiqué en définissant récupéré après le bloc recovery() comme dernière ligne.
Alors, pourquoi ça marche ?
Lorsque runtime.Goexit() est appelé, il met fin à l'intégralité de la goroutine, tout comme une panique(). Cependant, si une panic() est récupérée, seule la chaîne de fonctions entre panic() et recovery() est terminée, et non la goroutine entière.
C'est pourquoi récupéré = true est défini en dehors du defer contenant recovery(), il n'est exécuté que dans deux cas : lorsque la fonction se termine normalement ou lorsqu'une panique est récupérée, mais pas lorsque runtime.Goexit() est appelé.
À l'avenir, nous discuterons de la manière dont chaque cas est traité.
func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) select { case res := <-ch: if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) case <-time.After(50 * time.Millisecond): return fmt.Errorf("timeout waiting for result") } return nil }
Si la tâche panique pendant l'exécution, la panique est détectée et enregistrée dans c.err en tant que panicError, qui contient à la fois la valeur de panique et la trace de la pile. singleflight attrape la panique pour nettoyer gracieusement, mais il ne l'avale pas, il renvoie la panique après avoir géré son état.
Cela signifie que la panique se produira dans la goroutine qui exécute la tâche (la première à lancer l'opération), et toutes les autres goroutines en attente du résultat paniqueront également.
Puisque cette panique se produit dans le code du développeur, c'est à nous de la gérer correctement.
Maintenant, il reste encore un cas particulier que nous devons considérer : lorsque d'autres goroutines utilisent la méthode group.DoChan() et attendent un résultat via un canal. Dans ce cas, un vol unique ne peut pas paniquer dans ces goroutines. Au lieu de cela, il provoque ce qu'on appelle une panique irrécupérable (go panic(e)), qui fait planter notre application.
Enfin, si la tâche s'appelle runtime.Goexit(), il n'est pas nécessaire de prendre d'autres mesures car la goroutine est déjà en train de s'arrêter, et nous laissons cela se produire sans interférer.
Et c'est à peu près tout, rien de bien compliqué à part les cas particuliers dont nous avons évoqué.
Bonjour, je m'appelle Phuong Le, ingénieur logiciel chez VictoriaMetrics. Le style d'écriture ci-dessus se concentre sur la clarté et la simplicité, expliquant les concepts d'une manière facile à comprendre, même si elle n'est pas toujours parfaitement alignée avec la précision académique.
Si vous repérez quelque chose qui est obsolète ou si vous avez des questions, n'hésitez pas à nous contacter. Vous pouvez m'envoyer un DM sur X (@func25).
Quelques autres articles qui pourraient vous intéresser :
Si vous souhaitez surveiller vos services, suivre les métriques et voir comment tout fonctionne, vous voudrez peut-être consulter VictoriaMetrics. Il s'agit d'un moyen rapide, open source et économique de garder un œil sur votre infrastructure.
Et nous sommes des Gophers, des passionnés qui aiment rechercher, expérimenter et partager leurs connaissances sur Go et son écosystème.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!