在之前的兩篇文章中,我們分別介紹了 Fanout 和 Fanin。通常情況下,我們將它們一起使用,因為我們有一個資料流,我們希望單獨對專案進行操作,並且可以使用並發安全地執行此操作。因此,我們扇出到多個工作線程,然後扇回到單一流。
例如,假設您有一個很大的日誌檔案。您可以將文件分成多個區塊,允許每個工作人員同時操作文件的不同部分,然後合併結果。
如果您遵循前兩篇文章,這種模式是顯而易見的。如果您不確定,請參閱上面的連結。
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
有一個 Produce() 函數可以建立模擬的數位輸入流。
有一個工作函數在輸入通道上運行,直到沒有更多資料為止。它對每個值「處理」輸入資料(確定該值是奇數還是偶數),然後將結果結構傳送到輸出通道。
請注意,當每個工作人員完成後,它會關閉其結果通道。這對於防止死鎖是必要的,因為否則 fanin 操作將休眠等待 chan 上的更多資料。
主線程從 Produce 取得輸入流,然後啟動許多工作線程,為每個工作線程提供自己的通道,以便發送結果。
這些結果通道隨後被傳送到 fanin 操作。為了扇入,我們建立一個通道來接收輸出,然後為每個工作通道啟動一個 goroutine。每個 goroutine 只是在通道上進行迭代,直到沒有更多資料為止,然後終止。 請記住,我們關閉了工作執行緒中的結果通道,這就是允許 for 迴圈終止的原因
請注意,我們使用 WaitGroup 進行 fanin 過程。這讓我們知道所有結果通道的所有結果何時已組合到輸出通道中。當發生這種情況時,我們關閉輸出通道,以便任何消耗輸出的下游執行緒都可以終止。
有了輸出通道中的所有數據,主執行緒就可以繼續顯示結果了。請注意,我們使用布林通道來防止主執行緒在一切完成之前終止;否則,它將終止進程。
請注意,還有另一種方法可以使用 select 語句進行扇入。這裡使用的技術更乾淨一些,因為我們可以增加或減少工人的數量。
另請注意,我們還沒有解決有關 SIGTERM 或 SIGINT 等提前終止的問題。這增加了一點複雜性。
你會如何實現這個? 扇出/扇入模式還有其他實作。請在下面留下您的評論和想法?
謝謝!
這篇文章以及本系列所有文章的程式碼可以在這裡找到
以上是Go 中的扇出-扇南模式的詳細內容。更多資訊請關注PHP中文網其他相關文章!