Berbilang goroutine membaca dari saluran yang sama

王林
Lepaskan: 2024-02-09 16:30:10
ke hadapan
455 orang telah melayarinya

多个 goroutine 从同一通道读取

editor php Strawberry akan memperkenalkan kepada anda kandungan yang berkaitan dengan pelbagai bacaan goroutine dari saluran yang sama dalam artikel ini. Dalam pengaturcaraan serentak, goroutine ialah utas ringan dalam bahasa Go yang boleh melaksanakan berbilang tugas pada masa yang sama. Saluran ialah cara penting untuk berkomunikasi antara goroutine. Apabila berbilang goroutine perlu membaca data dari saluran yang sama, kita perlu memberi perhatian kepada beberapa isu dan mengambil langkah yang sepadan untuk memastikan ketepatan dan kecekapan program. Dalam perkara berikut, kami akan menerangkan proses secara terperinci dan memberikan beberapa petua dan nasihat praktikal.

Kandungan soalan

Pertimbangkan untuk melahirkan berbilang goroutin untuk membaca nilai dari saluran yang sama. Kedua-dua pekerja dijana seperti yang diharapkan, tetapi hanya membaca satu item daripada saluran dan berhenti membaca. Saya menjangkakan goroutine akan terus membaca data dari saluran sehingga goroutine yang menghantar nilai ke saluran ditutup. Walaupun ada sesuatu yang menghalang pengirim daripada menghantar, goroutine yang melahirkan projek itu tidak ditutup. Mengapa setiap pekerja hanya membaca satu nilai dan berhenti?

Output menunjukkan dua nilai yang dihantar, satu dibaca oleh setiap goroutine pekerja. Nilai ketiga dihantar tetapi tidak dibaca dari mana-mana benang pekerja.

new worker
new worker
waiting
sending 0
sending 1
sending 2
running func 1
sending value out 1
running func 0
sending value out 0
Salin selepas log masuk

Pergi ke taman permainan

package main

import (
    "fmt"
    "sync"
)

func workerPool(done <-chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < numberOfWorkers; i++ {
        fmt.Println("new worker")
        wg.Add(1)
        // fan out worker goroutines reading from in channel and
        // send output into out channel
        go func() {
            defer wg.Done()
            for {
                select {
                case <-done:
                    fmt.Println("recieved done signal")
                    return
                case data, ok := <-in:
                    if !ok {
                        fmt.Println("no more items")
                        return
                    }
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
                }
            }
        }()
    }

    fmt.Println("waiting")
    wg.Wait()
    fmt.Println("done waiting")
    close(out)
    return out
}

func main() {
    done := make(chan bool)
    defer close(done)

    in := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("sending", i)
            in <- i
        }
        close(in)
    }()

    out := workerPool(done, in, 2, func(i int) int {
        return i
    })

    for {
        select {
        case o, ok := <-out:
            if !ok {
                continue
            }

            fmt.Println("output", o)
        case <-done:
            return
        default:
        }
    }

}
Salin selepas log masuk

Penyelesaian

Ulasan sebelumnya tentang saluran tidak ditimbal adalah betul, tetapi terdapat isu penyegerakan lain.

Saluran tidak buffer pada dasarnya bermakna apabila nilai ditulis, nilai itu mesti diterima sebelum sebarang penulisan lain boleh berlaku.

  1. workerpool 创建一个无缓冲通道 out 来存储结果,但只有在所有结果写入 out 后才返回。但由于从 out 通道的读取发生在 out 返回之后,并且 out 没有缓冲,因此 workerpool 在尝试写入时被阻塞,从而导致死锁。这就是为什么看起来每个工作人员只发送一个值;实际上,在发送第一个之后,所有工作人员都被阻止,因为没有任何东西可以接收该值(您可以通过在写入 outAlihkan kenyataan cetakan kemudian untuk melihat ini)

Betulkan pilihan termasuk membuat out 有一个大小为 n = 结果数 的缓冲区(即 out := make(chan int, n))或使 out 不缓冲并在写入时从 out melakukan bacaan.

  • done 频道也没有被正确使用。 mainworkerpool kedua-duanya bergantung padanya untuk menghentikan pelaksanaan, tetapi tiada apa yang ditulis kepadanya! Ia juga tidak dibuffer dan oleh itu mengalami masalah kebuntuan yang disebutkan di atas.
  • Untuk menyelesaikan isu ini, anda boleh terlebih dahulu menyelesaikan kebuntuan daripada workerpool 中删除 case <-done: 并简单地通过 in 进行范围,因为它在 main 中关闭。然后可以将donemenetapkan kepada saluran penimbal.

    Gabungkan pembaikan ini untuk mendapatkan:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func workerPool(done chan bool, in <-chan int, numberOfWorkers int, fn func(int) int) chan int {
        out := make(chan int, 100)
        var wg sync.WaitGroup
    
        for i := 0; i < numberOfWorkers; i++ {
            fmt.Println("new worker")
            wg.Add(1)
            // fan out worker goroutines reading from in channel and
            // send output into out channel
            go func() {
                defer wg.Done()
                for data := range in {
                    // fan-in job execution multiplexing results into the results channel
                    fmt.Println("running func", data)
                    value := fn(data)
                    fmt.Println("sending value out", value)
                    out <- value
    
                }
                fmt.Println("no more items")
                return
            }()
        }
    
        fmt.Println("waiting")
        wg.Wait()
        fmt.Println("done waiting")
        close(out)
        done <- true
        close(done)
        return out
    }
    
    func main() {
        done := make(chan bool, 1)
    
        in := make(chan int)
    
        go func() {
            for i := 0; i < 10; i++ {
                fmt.Println("sending", i)
                in <- i
            }
            close(in)
        }()
    
        out := workerPool(done, in, 2, func(i int) int {
            return i
        })
    
        for {
            select {
            case o, ok := <-out:
                if !ok {
                    continue
                }
    
                fmt.Println("output", o)
            case <-done:
                return
            }
        }
    
    }
    Salin selepas log masuk

    Ini mungkin menyelesaikan masalah anda, tetapi ini bukan cara terbaik untuk menggunakan saluran! Struktur itu sendiri boleh diubah dengan lebih mudah tanpa perlu bergantung pada saluran penimbal.

    Atas ialah kandungan terperinci Berbilang goroutine membaca dari saluran yang sama. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Label berkaitan:
    sumber:stackoverflow.com
    Kenyataan Laman Web ini
    Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
    Tutorial Popular
    Lagi>
    Muat turun terkini
    Lagi>
    kesan web
    Kod sumber laman web
    Bahan laman web
    Templat hujung hadapan
    Tentang kita Penafian Sitemap
    Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!