前の 2 つの投稿では、Fanout と Fanin を別々に見てきました。アイテムを個別に操作したい単一のデータ ストリームがあり、並行性を使用して安全に操作できる場合、それらを一緒に使用することがよくあります。したがって、複数のワーカー スレッドにファンアウトしてから、単一のストリームにファンインして戻します。
たとえば、大きなログ ファイルがあるとします。ファイルをいくつかのチャンクに分割し、各ワーカーがファイルの異なる部分を同時に操作して、その結果を結合できるようにすることができます。
前の 2 つの投稿を参照した場合、このパターンは明らかです。よくわからない場合は、上のリンクを参照してください。
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
シミュレートされた数値の入力ストリームを作成するProduce()関数があります。
データがなくなるまで入力チャネル上で動作するワーカー関数があります。各値に対して入力データを「処理」し (値が奇数か偶数かを判断し)、結果の構造体を出力チャネルに送信します。
各ワーカーが完了すると、結果チャネルが閉じられることに注意してください。これはデッドロックを防ぐために必要です。そうしないと、fanin 操作が chan 上のさらなるデータを待ってスリープ状態になってしまうからです。
メインスレッドは、product から入力ストリームを取得し、多数のワーカーを起動し、各ワーカーに結果を送信する独自のチャネルを与えます。
これらの結果チャネルは、fanin オペレーションに送信されます。 fanin するには、出力を受信するチャネルを作成し、各ワーカー チャネルに対して goroutine を起動します。各ゴルーチンは、データがなくなるまでチャネルを反復処理し、終了します。 ワーカー スレッドで結果チャネルを閉じたことを思い出してください。これにより、for ループが終了できるようになります
fanin プロセスには WaitGroup を使用することに注意してください。これにより、すべての結果チャネルからのすべての結果がいつ出力チャネルに結合されたかを知ることができます。これが発生した場合、出力を消費するダウンストリーム スレッドが終了できるように、出力チャネルを閉じます。
出力チャネルにすべてのデータがあれば、メインスレッドは続行して結果を表示できます。すべてが完了するまでメインスレッドが終了しないように、ブール値チャネルを使用していることに注意してください。それ以外の場合は、プロセスが終了します。
select ステートメントを使用してファンインを行う別の方法があることに注意してください。ここで使用される手法は、ワーカーの数を増減できるため、少しすっきりしています。
SIGTERM や SIGINT などによる早期終了に関しては何も対処していないことにも注意してください。これにより、もう少し複雑になります。
これをどのように実装しますか? ファンアウト/ファンイン パターンの実装は他にもあります。以下にコメントやご意見を残してください?
ありがとうございます!
この投稿とこのシリーズのすべての投稿のコードはここにあります
以上がGo のファンアウト - ファニン パターンの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。