ただし、
プロデューサ が 85 行目で context.done() をチェックしてからコンテキストをキャンセルし、すべての コンシューマ が閉じてから した場合には、依然としてデッドロックが発生する可能性があります。プロデューサーデータをキューに挿入しようとしていますか?
そうであれば、それを軽減する方法。コードを再投稿:
package main import ( "context" "fmt" "sync" ) func main() { a1 := []int{1, 2, 3, 4, 5} a2 := []int{5, 4, 3, 1, 1} a3 := []int{6, 7, 8, 9} a4 := []int{1, 2, 3, 4, 5} a5 := []int{5, 4, 3, 1, 1} a6 := []int{6, 7, 18, 9} arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6} ctx, cancel := context.WithCancel(context.Background()) ch1 := read(ctx, arrayOfArray) messageCh := make(chan int) errCh := make(chan error) producerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { producerWg.Add(1) producer(ctx, producerWg, ch1, messageCh, errCh) } consumerWg := &sync.WaitGroup{} for i := 0; i < 3; i++ { consumerWg.Add(1) consumer(ctx, consumerWg, messageCh, errCh) } firstError := handleAllErrors(ctx, cancel, errCh) producerWg.Wait() close(messageCh) consumerWg.Wait() close(errCh) fmt.Println(<-firstError) } func read(ctx context.Context, arrayOfArray [][]int) <-chan []int { ch := make(chan []int) go func() { defer close(ch) for i := 0; i < len(arrayOfArray); i++ { select { case <-ctx.Done(): return case ch <- arrayOfArray[i]: } } }() return ch } func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) { go func() { defer wg.Done() for { select { case <-ctx.Done(): return case arr, ok := <-in: if !ok { return } for i := 0; i < len(arr); i++ { // simulating an error. //if arr[i] == 10 { // errCh <- fmt.Errorf("producer interrupted") //} select { case <-ctx.Done(): return case messageCh <- 2 * arr[i]: } } } } }() } func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) { go func() { wg.Done() for { select { case <-ctx.Done(): return case n, ok := <-messageCh: if !ok { return } fmt.Println("consumed: ", n) // simulating erros //if n == 10 { // errCh <- fmt.Errorf("output error during write") //} } } }() } func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error { firstErrCh := make(chan error, 1) isFirstError := true go func() { defer close(firstErrCh) for err := range errCh { select { case <-ctx.Done(): default: cancel() } if isFirstError { firstErrCh <- err isFirstError = !isFirstError } } }() return firstErrCh }
select ステートメントでラップしているため、プロデューサーの書き込みでデッドロックすることはありません。コンシューマーが終了したためにチャネル書き込みが実行できない場合でも、コンテキストキャンセル句にヒットしてプロデューサーを終了します。
リーリー
これはその遊び場のリンクです 。
コードの簡略化について。これが現実のコードであれば、おそらくgolang.org/x/sync/errgroup の
group を使用するでしょう。または、それらからヒントを得て、
sync.once と
すべてのプロデューサーとコンシューマー をゴルーチンを生成し、エラー内でさらに使用せずにエラーを処理できる関数でラップします。 複雑なエラー チャネルの排気コード処理関数。
以上がGo プロデューサーとコンシューマーがデッドロックを回避するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。