In the previous 2 posts, we have looked at Fanout and Fanin separately. It is often the case that we use them together where we have a single data stream where we want to operate on the items individually and can do so safely using concurrency. So, we fanout into multiple worker threads then fanin back into a single stream.
For example, suppose you have a large log file. You could break the file into chunks, allowing each worker to operate on a different part of the file concurrently, then combine the results.
If you followed the previous two post, this pattern is obvious. See the links above if you're not sure.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
There is a produce() function that creates a simulated input stream of numbers.
There is a worker function that operates on an input channel until there is no more data. On each value it 'processes' the input data (determines if the value is odd or even), then sends a result struct to an output channel.
Note that when each worker is done, it closes its result channel. This is necessary to prevent deadlock since the fanin operation would otherwise sleep waiting for more data on the chan.
The main thread gets the input stream from produce, then launches a number of workers giving each worker its own channel where it will send its results.
These result channels are then sent to the fanin operation. To fanin, we create a channel to receive the output, then launch a goroutine for each of the worker channels. Each goroutine simply iterates over the channel until there is no more data then terminates. Remember that we closed the result channel in the worker thread, that is what allows the for loop to terminate
Note that we use a WaitGroup for the fanin process. This let's us know when all the results from all the result channels have been combined into the output channel. When this happen, we close the output channel so that whatever downstream thread consuming the output can terminate.
With all the data in the output channel, the main thread can go ahead and display the results. Note that we use a boolean channel to prevent the main thread from terminating until everything is done; otherwise, it will terminate the process.
Note that there is another way to do fan-in using a select statement. The technique used here is a little cleaner since we can increase or decrease the number of workers.
Note also that we have not addressed anything with regard to early termination from things like SIGTERM or SIGINT. That adds a little more complexity.
How would you implement this? There are other implementations of the fanout/fanin pattern. Please leave your comments and thoughts below?
Thanks!
The code for this post and all posts in this series can be found here
The above is the detailed content of Fanout-Fanin Pattern in Go. For more information, please follow other related articles on the PHP Chinese website!