In den beiden vorherigen Beiträgen haben wir Fanout und Fanin getrennt betrachtet. Es kommt häufig vor, dass wir sie zusammen verwenden, wenn wir über einen einzigen Datenstrom verfügen, in dem wir die Elemente einzeln bearbeiten möchten und dies sicher mithilfe der Parallelität tun können. Also fächern wir uns in mehrere Arbeitsthreads auf und fächern sie dann wieder in einen einzigen Stream auf.
Angenommen, Sie haben eine große Protokolldatei. Sie könnten die Datei in Teile aufteilen, sodass jeder Arbeiter gleichzeitig an einem anderen Teil der Datei arbeiten und dann die Ergebnisse kombinieren kann.
Wenn Sie den beiden vorherigen Beiträgen gefolgt sind, ist dieses Muster offensichtlich. Wenn Sie sich nicht sicher sind, sehen Sie sich die Links oben an.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
Es gibt eine Funktion Produce(), die einen simulierten Eingabestrom von Zahlen erstellt.
Es gibt eine Worker-Funktion, die auf einem Eingabekanal arbeitet, bis keine Daten mehr vorhanden sind. Bei jedem Wert „verarbeitet“ es die Eingabedaten (bestimmt, ob der Wert ungerade oder gerade ist) und sendet dann eine Ergebnisstruktur an einen Ausgabekanal.
Beachten Sie, dass jeder Worker seinen Ergebniskanal schließt, wenn er fertig ist. Dies ist notwendig, um einen Deadlock zu verhindern, da der Fanin-Vorgang andernfalls ruhen würde und auf weitere Daten auf dem Kanal warten würde.
Der Hauptthread erhält den Eingabestrom von „produzieren“ und startet dann eine Reihe von Workern, wobei jedem Worker ein eigener Kanal zugewiesen wird, an den er seine Ergebnisse sendet.
Diese Ergebniskanäle werden dann an den Fanin-Betrieb gesendet. Um Fanin zu erstellen, erstellen wir einen Kanal, um die Ausgabe zu empfangen, und starten dann eine Goroutine für jeden der Worker-Kanäle. Jede Goroutine durchläuft einfach den Kanal, bis keine Daten mehr vorhanden sind, und wird dann beendet. Denken Sie daran, dass wir den Ergebniskanal im Arbeitsthread geschlossen haben, wodurch die for-Schleife beendet werden kann
Beachten Sie, dass wir für den Fanin-Prozess eine WaitGroup verwenden. Dadurch erfahren wir, wann alle Ergebnisse aller Ergebniskanäle im Ausgabekanal zusammengefasst wurden. Wenn dies geschieht, schließen wir den Ausgabekanal, sodass jeder Downstream-Thread, der die Ausgabe verbraucht, beendet werden kann.
Mit allen Daten im Ausgabekanal kann der Hauptthread fortfahren und die Ergebnisse anzeigen. Beachten Sie, dass wir einen booleschen Kanal verwenden, um zu verhindern, dass der Hauptthread beendet wird, bis alles erledigt ist; Andernfalls wird der Vorgang abgebrochen.
Beachten Sie, dass es eine andere Möglichkeit gibt, Fan-In mithilfe einer Select-Anweisung durchzuführen. Die hier verwendete Technik ist etwas sauberer, da wir die Anzahl der Arbeiter erhöhen oder verringern können.
Beachten Sie auch, dass wir uns nicht mit der vorzeitigen Kündigung durch Dinge wie SIGTERM oder SIGINT befasst haben. Das erhöht die Komplexität etwas.
Wie würden Sie das umsetzen? Es gibt andere Implementierungen des Fanout/Fanin-Musters. Bitte hinterlassen Sie unten Ihre Kommentare und Gedanken?
Danke!
Den Code für diesen Beitrag und alle Beiträge dieser Reihe finden Sie hier
Das obige ist der detaillierte Inhalt vonFanout-Fanin-Muster in Go. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!