I have a code about consumer and producer. Although I asked this question here for code review, and a large part of the idea was derived from this thread, here is the code in the playground.
I'm worried about a deadlock scenario where all consumers are down but producers are still adding data to the shared channel. To "mitigate" this problem, I added a context check before adding the data to the data queue - specifically line 85 in the go playground.
However, a deadlock may still occur if the producer checks context.done() on line 85 and then cancels the context, causing all consumers to close and then ProducerTrying to insert data into the queue?
If so, how to alleviate it.
Repost code:
package main import ( "context" "fmt" "sync" ) func main() { a1 := []int{1, 2, 3, 4, 5} a2 := []int{5, 4, 3, 1, 1} a3 := []int{6, 7, 8, 9} a4 := []int{1, 2, 3, 4, 5} a5 := []int{5, 4, 3, 1, 1} a6 := []int{6, 7, 18, 9} arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6} ctx, cancel := context.WithCancel(context.Background()) ch1 := read(ctx, arrayOfArray) messageCh := make(chan int) errCh := make(chan error) producerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { producerWg.Add(1) producer(ctx, producerWg, ch1, messageCh, errCh) } consumerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { consumerWg.Add(1) consumer(ctx, consumerWg, messageCh, errCh) } firstError := handleAllErrors(ctx, cancel, errCh) producerWg.Wait() close(messageCh) consumerWg.Wait() close(errCh) fmt.Println(<-firstError) } func read(ctx context.Context, arrayOfArray [][]int) <-chan []int { ch := make(chan []int) go func() { defer close(ch) for i := 0; i < len(arrayOfArray); i++ { select { case <-ctx.Done(): return case ch <- arrayOfArray[i]: } } }() return ch } func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) { go func() { defer wg.Done() for { select { case <-ctx.Done(): return case arr, ok := <-in: if !ok { return } for i := 0; i < len(arr); i++ { // simulating an error. //if arr[i] == 10 { // errCh <- fmt.Errorf("producer interrupted") //} select { case <-ctx.Done(): return case messageCh <- 2 * arr[i]: } } } } }() } func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) { go func() { wg.Done() for { select { case <-ctx.Done(): return case n, ok := <-messageCh: if !ok { return } fmt.Println("consumed: ", n) // simulating erros //if n == 10 { // errCh <- fmt.Errorf("output error during write") //} } } }() } func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error { firstErrCh := make(chan error, 1) isFirstError := true go func() { defer close(firstErrCh) for err := range errCh { select { case <-ctx.Done(): default: cancel() } if isFirstError { firstErrCh <- err isFirstError = !isFirstError } } }() return firstErrCh }
No you are fine, this will not deadlock on producer writes because you wrap the channel writes in select
statement, so even if the channel write can't happen because the consumer has terminated, you'll still hit the context cancellation clause and terminate your producer.
Just to demonstrate the concept, you can run it and see that it does not deadlock, although it is trying to write to the channel without a reader.
package main import ( "context" "fmt" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) go func() { time.Sleep(1 * time.Second) cancel() }() select { case ch <- struct{}{}: case <-ctx.Done(): fmt.Println("context canceled") } fmt.Println("bye!") }
This is its playground link.
About some code simplification. If this were any kind of real life code, I'd probably just use group
from golang.org/x/sync/errgroup
. Or take a hint from them and utilize sync.once
and wrap all producers and consumers with a function that generates a goroutine and can handle errors without using more in the error Complex error channel exhaust code handling functions.
The above is the detailed content of Go producer-consumer avoids deadlocks. For more information, please follow other related articles on the PHP Chinese website!