Dans les 2 articles précédents, nous avons examiné Fanout et Fanin séparément. Il arrive souvent que nous les utilisions ensemble lorsque nous disposons d'un flux de données unique sur lequel nous souhaitons opérer sur les éléments individuellement et pouvons le faire en toute sécurité grâce à la concurrence. Ainsi, nous nous répartissons dans plusieurs threads de travail, puis nous revenons en un seul flux.
Par exemple, supposons que vous ayez un fichier journal volumineux. Vous pouvez diviser le fichier en morceaux, permettant à chaque travailleur d'opérer simultanément sur une partie différente du fichier, puis combiner les résultats.
Si vous avez suivi les deux posts précédents, ce schéma est évident. Consultez les liens ci-dessus si vous n'êtes pas sûr.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
Il existe une fonction Produce() qui crée un flux d'entrée simulé de nombres.
Il existe une fonction de travail qui opère sur un canal d'entrée jusqu'à ce qu'il n'y ait plus de données. Sur chaque valeur, il « traite » les données d'entrée (détermine si la valeur est paire ou impaire), puis envoie une structure de résultat à un canal de sortie.
Notez que lorsque chaque travailleur a terminé, il ferme son canal de résultats. Ceci est nécessaire pour éviter un blocage, car sinon l'opération fanin resterait en veille en attendant plus de données sur le canal.
Le thread principal récupère le flux d'entrée de Produce, puis lance un certain nombre de travailleurs donnant à chaque travailleur son propre canal où il enverra ses résultats.
Ces chaînes de résultats sont ensuite envoyées à l'opération fanin. Pour fanin, nous créons un canal pour recevoir la sortie, puis lançons une goroutine pour chacun des canaux de travail. Chaque goroutine parcourt simplement le canal jusqu'à ce qu'il n'y ait plus de données, puis se termine. Rappelez-vous que nous avons fermé le canal de résultat dans le thread de travail, c'est ce qui permet à la boucle for de se terminer
Notez que nous utilisons un WaitGroup pour le processus fanin. Cela nous permet de savoir quand tous les résultats de tous les canaux de résultats ont été combinés dans le canal de sortie. Lorsque cela se produit, nous fermons le canal de sortie afin que tout thread en aval consommant la sortie puisse se terminer.
Avec toutes les données dans le canal de sortie, le thread principal peut continuer et afficher les résultats. Notez que nous utilisons un canal booléen pour empêcher le thread principal de se terminer jusqu'à ce que tout soit terminé ; sinon, cela mettra fin au processus.
Notez qu'il existe une autre façon de procéder à un fan-in en utilisant une instruction select. La technique utilisée ici est un peu plus propre puisqu'on peut augmenter ou diminuer le nombre d'ouvriers.
Notez également que nous n'avons rien abordé concernant la résiliation anticipée de choses comme SIGTERM ou SIGINT. Cela ajoute un peu plus de complexité.
Comment mettriez-vous cela en œuvre ? Il existe d'autres implémentations du modèle fanout/fanin. Veuillez laisser vos commentaires et réflexions ci-dessous ?
Merci !
Le code de cet article et de tous les articles de cette série peut être trouvé ici
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!