Rumah > pembangunan bahagian belakang > Golang > Corak Fanout-Fanin dalam Go

Corak Fanout-Fanin dalam Go

PHPz
Lepaskan: 2024-07-29 06:32:53
asal
749 orang telah melayarinya

Fanout-Fanin Pattern in Go

Dalam 2 post sebelum ini, kami telah melihat Fanout dan Fanin secara berasingan. Selalunya kita menggunakannya bersama-sama di mana kita mempunyai satu aliran data di mana kita mahu mengendalikan item secara individu dan boleh melakukannya dengan selamat menggunakan concurrency. Jadi, kami memasukkan beberapa utas pekerja kemudian kembali ke satu aliran.

Sebagai contoh, katakan anda mempunyai fail log yang besar. Anda boleh memecahkan fail kepada beberapa bahagian, membenarkan setiap pekerja beroperasi pada bahagian fail yang berbeza secara serentak, kemudian menggabungkan hasilnya.

Jika anda mengikuti dua siaran sebelumnya, corak ini jelas. Lihat pautan di atas jika anda tidak pasti.

// produce is simulating our single input as a channel
func produce() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- rand.Intn(50)
        }
        fmt.Printf("producer done\n")
        close(ch) // this is important!!!
    }()
    return ch
}

func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) {
    for value := range jobs {
        odd := "even"
        if (value & 1) == 1 {
            odd = "odd"
        }
        out <- OddEven{
            Number:  value,
            OddEven: odd,
        }
    }
    close(out) // remember this
    wg.Done()
}

// OddEven struct will be the result of the work done by each fanout thread
// and be the fanin data
type OddEven struct {
    Number  int
    OddEven string
}

func fanin(inputs []chan OddEven) chan OddEven {
    output := make(chan OddEven)

    var wg sync.WaitGroup
    for i, input := range inputs {
        wg.Add(1)
                // explicit params to capture loop vars
        go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) {
            for value := range input {
                output <- value
            }
            fmt.Printf("done merging source %d\n", id)
            wg.Done()
        }(i, input, output, &wg)
    }
    go func() {
        wg.Wait()
        close(output) // this is important!!!
    }()

    return output
}

func main() {
    // simulate the input data stream
    inputCh := produce()

    numWorkers := 3
    // fan-out to send data items to workers as individual jobs
    var wg sync.WaitGroup
    workerResults := make([]chan OddEven, numWorkers)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        workerResults[i] = make(chan OddEven)
        go worker(i, inputCh, workerResults[i], &wg)
    }
    go func() {
        wg.Wait()
    }()

    // fan-in the results
    results := fanin(workerResults)

    done := make(chan bool)
    go func() {
        for value := range results {
            fmt.Printf("got %d is %s\n", value.Number, value.OddEven)
        }
        close(done)
    }()
    <-done
    fmt.Println("done")
}
Salin selepas log masuk

Terdapat fungsi produce() yang mencipta aliran input simulasi nombor.

Terdapat fungsi pekerja yang beroperasi pada saluran input sehingga tiada lagi data. Pada setiap nilai ia 'memproses' data input (menentukan sama ada nilai itu ganjil atau genap), kemudian menghantar struct hasil ke saluran keluaran.

Perhatikan bahawa apabila setiap pekerja selesai, ia menutup saluran keputusannya. Ini adalah perlu untuk mengelakkan kebuntuan kerana operasi fanin akan tidur menunggu lebih banyak data pada chan.

Urut utama mendapat aliran input daripada hasil, kemudian melancarkan beberapa pekerja yang memberi setiap pekerja salurannya sendiri di mana ia akan menghantar hasilnya.

Saluran hasil ini kemudiannya dihantar ke operasi fanin. Untuk fanin, kami mencipta saluran untuk menerima output, kemudian melancarkan goroutine untuk setiap saluran pekerja. Setiap goroutine hanya berulang melalui saluran sehingga tiada lagi data kemudian ditamatkan. Ingat bahawa kami menutup saluran hasil dalam urutan pekerja, itulah yang membolehkan gelung for ditamatkan

Perhatikan bahawa kami menggunakan WaitGroup untuk proses fanin. Ini memberitahu kami apabila semua hasil daripada semua saluran hasil telah digabungkan ke dalam saluran keluaran. Apabila ini berlaku, kami menutup saluran keluaran supaya apa sahaja benang hiliran yang memakan output boleh ditamatkan.

Dengan semua data dalam saluran keluaran, utas utama boleh diteruskan dan memaparkan hasilnya. Ambil perhatian bahawa kami menggunakan saluran boolean untuk menghalang utas utama daripada ditamatkan sehingga semuanya selesai; jika tidak, ia akan menamatkan proses.

Perhatikan bahawa terdapat cara lain untuk melakukan kipas masuk menggunakan pernyataan pilih. Teknik yang digunakan di sini adalah lebih bersih sedikit kerana kita boleh menambah atau mengurangkan bilangan pekerja.

Perhatikan juga bahawa kami tidak menangani apa-apa berkaitan penamatan awal daripada perkara seperti SIGTERM atau SIGINT. Itu menambahkan sedikit lagi kerumitan.

Bagaimana anda akan melaksanakan ini? Terdapat pelaksanaan lain corak fanout/fanin. Sila tinggalkan komen dan pendapat anda di bawah?

Terima kasih!

Kod untuk siaran ini dan semua siaran dalam siri ini boleh didapati di sini

Atas ialah kandungan terperinci Corak Fanout-Fanin dalam Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan