Dalam 2 post sebelum ini, kami telah melihat Fanout dan Fanin secara berasingan. Selalunya kita menggunakannya bersama-sama di mana kita mempunyai satu aliran data di mana kita mahu mengendalikan item secara individu dan boleh melakukannya dengan selamat menggunakan concurrency. Jadi, kami memasukkan beberapa utas pekerja kemudian kembali ke satu aliran.
Sebagai contoh, katakan anda mempunyai fail log yang besar. Anda boleh memecahkan fail kepada beberapa bahagian, membenarkan setiap pekerja beroperasi pada bahagian fail yang berbeza secara serentak, kemudian menggabungkan hasilnya.
Jika anda mengikuti dua siaran sebelumnya, corak ini jelas. Lihat pautan di atas jika anda tidak pasti.
// produce is simulating our single input as a channel func produce() chan int { ch := make(chan int) go func() { for i := 0; i < 10; i++ { ch <- rand.Intn(50) } fmt.Printf("producer done\n") close(ch) // this is important!!! }() return ch } func worker(id int, jobs chan int, out chan OddEven, wg *sync.WaitGroup) { for value := range jobs { odd := "even" if (value & 1) == 1 { odd = "odd" } out <- OddEven{ Number: value, OddEven: odd, } } close(out) // remember this wg.Done() } // OddEven struct will be the result of the work done by each fanout thread // and be the fanin data type OddEven struct { Number int OddEven string } func fanin(inputs []chan OddEven) chan OddEven { output := make(chan OddEven) var wg sync.WaitGroup for i, input := range inputs { wg.Add(1) // explicit params to capture loop vars go func(id int, input chan OddEven, output chan OddEven, wg *sync.WaitGroup) { for value := range input { output <- value } fmt.Printf("done merging source %d\n", id) wg.Done() }(i, input, output, &wg) } go func() { wg.Wait() close(output) // this is important!!! }() return output } func main() { // simulate the input data stream inputCh := produce() numWorkers := 3 // fan-out to send data items to workers as individual jobs var wg sync.WaitGroup workerResults := make([]chan OddEven, numWorkers) for i := 0; i < numWorkers; i++ { wg.Add(1) workerResults[i] = make(chan OddEven) go worker(i, inputCh, workerResults[i], &wg) } go func() { wg.Wait() }() // fan-in the results results := fanin(workerResults) done := make(chan bool) go func() { for value := range results { fmt.Printf("got %d is %s\n", value.Number, value.OddEven) } close(done) }() <-done fmt.Println("done") }
Terdapat fungsi produce() yang mencipta aliran input simulasi nombor.
Terdapat fungsi pekerja yang beroperasi pada saluran input sehingga tiada lagi data. Pada setiap nilai ia 'memproses' data input (menentukan sama ada nilai itu ganjil atau genap), kemudian menghantar struct hasil ke saluran keluaran.
Perhatikan bahawa apabila setiap pekerja selesai, ia menutup saluran keputusannya. Ini adalah perlu untuk mengelakkan kebuntuan kerana operasi fanin akan tidur menunggu lebih banyak data pada chan.
Urut utama mendapat aliran input daripada hasil, kemudian melancarkan beberapa pekerja yang memberi setiap pekerja salurannya sendiri di mana ia akan menghantar hasilnya.
Saluran hasil ini kemudiannya dihantar ke operasi fanin. Untuk fanin, kami mencipta saluran untuk menerima output, kemudian melancarkan goroutine untuk setiap saluran pekerja. Setiap goroutine hanya berulang melalui saluran sehingga tiada lagi data kemudian ditamatkan. Ingat bahawa kami menutup saluran hasil dalam urutan pekerja, itulah yang membolehkan gelung for ditamatkan
Perhatikan bahawa kami menggunakan WaitGroup untuk proses fanin. Ini memberitahu kami apabila semua hasil daripada semua saluran hasil telah digabungkan ke dalam saluran keluaran. Apabila ini berlaku, kami menutup saluran keluaran supaya apa sahaja benang hiliran yang memakan output boleh ditamatkan.
Dengan semua data dalam saluran keluaran, utas utama boleh diteruskan dan memaparkan hasilnya. Ambil perhatian bahawa kami menggunakan saluran boolean untuk menghalang utas utama daripada ditamatkan sehingga semuanya selesai; jika tidak, ia akan menamatkan proses.
Perhatikan bahawa terdapat cara lain untuk melakukan kipas masuk menggunakan pernyataan pilih. Teknik yang digunakan di sini adalah lebih bersih sedikit kerana kita boleh menambah atau mengurangkan bilangan pekerja.
Perhatikan juga bahawa kami tidak menangani apa-apa berkaitan penamatan awal daripada perkara seperti SIGTERM atau SIGINT. Itu menambahkan sedikit lagi kerumitan.
Bagaimana anda akan melaksanakan ini? Terdapat pelaksanaan lain corak fanout/fanin. Sila tinggalkan komen dan pendapat anda di bawah?
Terima kasih!
Kod untuk siaran ini dan semua siaran dalam siri ini boleh didapati di sini
Atas ialah kandungan terperinci Corak Fanout-Fanin dalam Go. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!