Fanout-Fanin Pattern in Go
In the previous 2 posts, we have looked at Fanout and Fanin separately. It is often the case that we use them together where we have a single data stream where we want to operate on the items individually and can do so safely using concurrency. So, we fanout into multiple worker threads then fanin back into a single stream.
For example, suppose you have a large log file. You could break the file into chunks, allowing each worker to operate on a different part of the file concurrently, then combine the results.
If you followed the previous two post, this pattern is obvious. See the links above if you're not sure.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
There is a produce() function that creates a simulated input stream of numbers.
There is a worker function that operates on an input channel until there is no more data. On each value it 'processes' the input data (determines if the value is odd or even), then sends a result struct to an output channel.
Note that when each worker is done, it closes its result channel. This is necessary to prevent deadlock since the fanin operation would otherwise sleep waiting for more data on the chan.
The main thread gets the input stream from produce, then launches a number of workers giving each worker its own channel where it will send its results.
These result channels are then sent to the fanin operation. To fanin, we create a channel to receive the output, then launch a goroutine for each of the worker channels. Each goroutine simply iterates over the channel until there is no more data then terminates. Remember that we closed the result channel in the worker thread, that is what allows the for loop to terminate
Note that we use a WaitGroup for the fanin process. This let's us know when all the results from all the result channels have been combined into the output channel. When this happen, we close the output channel so that whatever downstream thread consuming the output can terminate.
With all the data in the output channel, the main thread can go ahead and display the results. Note that we use a boolean channel to prevent the main thread from terminating until everything is done; otherwise, it will terminate the process.
Note that there is another way to do fan-in using a select statement. The technique used here is a little cleaner since we can increase or decrease the number of workers.
Note also that we have not addressed anything with regard to early termination from things like SIGTERM or SIGINT. That adds a little more complexity.
How would you implement this? There are other implementations of the fanout/fanin pattern. Please leave your comments and thoughts below?
Thanks!
The code for this post and all posts in this series can be found here
The above is the detailed content of Fanout-Fanin Pattern in Go. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics











Golang is better than Python in terms of performance and scalability. 1) Golang's compilation-type characteristics and efficient concurrency model make it perform well in high concurrency scenarios. 2) Python, as an interpreted language, executes slowly, but can optimize performance through tools such as Cython.

Goisidealforbeginnersandsuitableforcloudandnetworkservicesduetoitssimplicity,efficiency,andconcurrencyfeatures.1)InstallGofromtheofficialwebsiteandverifywith'goversion'.2)Createandrunyourfirstprogramwith'gorunhello.go'.3)Exploreconcurrencyusinggorout

Golang is better than C in concurrency, while C is better than Golang in raw speed. 1) Golang achieves efficient concurrency through goroutine and channel, which is suitable for handling a large number of concurrent tasks. 2)C Through compiler optimization and standard library, it provides high performance close to hardware, suitable for applications that require extreme optimization.

Golang is suitable for rapid development and concurrent scenarios, and C is suitable for scenarios where extreme performance and low-level control are required. 1) Golang improves performance through garbage collection and concurrency mechanisms, and is suitable for high-concurrency Web service development. 2) C achieves the ultimate performance through manual memory management and compiler optimization, and is suitable for embedded system development.

Golang and Python each have their own advantages: Golang is suitable for high performance and concurrent programming, while Python is suitable for data science and web development. Golang is known for its concurrency model and efficient performance, while Python is known for its concise syntax and rich library ecosystem.

The performance differences between Golang and C are mainly reflected in memory management, compilation optimization and runtime efficiency. 1) Golang's garbage collection mechanism is convenient but may affect performance, 2) C's manual memory management and compiler optimization are more efficient in recursive computing.

Golang and C each have their own advantages in performance competitions: 1) Golang is suitable for high concurrency and rapid development, and 2) C provides higher performance and fine-grained control. The selection should be based on project requirements and team technology stack.

Golangisidealforbuildingscalablesystemsduetoitsefficiencyandconcurrency,whilePythonexcelsinquickscriptinganddataanalysisduetoitssimplicityandvastecosystem.Golang'sdesignencouragesclean,readablecodeanditsgoroutinesenableefficientconcurrentoperations,t
