Artikel asal disiarkan di blog VictoriaMetrics: https://victoriametrics.com/blog/go-singleflight/
Siaran ini adalah sebahagian daripada siri tentang pengendalian concurrency dalam Go:
Jadi, apabila anda mendapat berbilang permintaan yang masuk pada masa yang sama meminta data yang sama, tingkah laku lalai ialah setiap permintaan tersebut akan pergi ke pangkalan data secara individu untuk mendapatkan maklumat yang sama . Maksudnya ialah anda akan melaksanakan pertanyaan yang sama beberapa kali, yang, sejujurnya, adalah tidak cekap.
Ia akhirnya meletakkan beban yang tidak perlu pada pangkalan data anda, yang boleh melambatkan segala-galanya, tetapi ada cara untuk mengatasinya.
Ideanya ialah hanya permintaan pertama yang benar-benar pergi ke pangkalan data. Permintaan selebihnya menunggu untuk yang pertama selesai. Setelah data kembali daripada permintaan awal, yang lain hanya mendapat hasil yang sama—tiada pertanyaan tambahan diperlukan.
Jadi, sekarang anda sudah mendapat idea yang bagus tentang siaran ini, bukan?
Pakej penerbangan tunggal dalam Go dibina khusus untuk mengendalikan perkara yang baru sahaja kita bincangkan. Dan sekadar makluman, ia bukan sebahagian daripada pustaka standard tetapi ia diselenggara dan dibangunkan oleh pasukan Go.
Apa yang dilakukan oleh singleflight ialah memastikan bahawa hanya satu daripada gorout tersebut yang benar-benar menjalankan operasi, seperti mendapatkan data daripada pangkalan data. Ia membenarkan hanya satu operasi "dalam penerbangan" (berterusan) untuk sekeping data yang sama (dikenali sebagai "kunci") pada bila-bila masa.
Jadi, jika gorout lain meminta data yang sama (kunci yang sama) semasa operasi itu masih berjalan, mereka hanya akan menunggu. Kemudian, apabila yang pertama selesai, semua yang lain mendapat hasil yang sama tanpa perlu menjalankan operasi sekali lagi.
Baiklah, cukup bercakap, mari kita selami demo pantas untuk melihat cara penerbangan tunggal berfungsi dalam tindakan:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Apa yang berlaku di sini:
Kami sedang mensimulasikan situasi di mana 5 gorout cuba mengambil data yang sama hampir pada masa yang sama, dengan jarak 60ms. Untuk memastikannya mudah, kami menggunakan nombor rawak untuk meniru data yang diambil daripada pangkalan data.
Dengan singleflight.Group, kami memastikan hanya goroutine pertama benar-benar menjalankan fetchData() dan yang lain menunggu hasilnya.
Barisan v, err, shared := g.Do("key-fetch-data", fetchData) memberikan kunci unik ("key-fetch-data") untuk menjejaki permintaan ini. Jadi, jika goroutine lain meminta kunci yang sama semasa yang pertama masih mengambil data, ia menunggu keputusan daripada memulakan panggilan baharu.
Setelah panggilan pertama selesai, mana-mana goroutine menunggu mendapat hasil yang sama, seperti yang dapat kita lihat dalam output. Walaupun kami mempunyai 5 goroutine yang meminta data, fetchData hanya dijalankan dua kali, yang merupakan rangsangan besar.
Bendera kongsi mengesahkan bahawa hasilnya telah digunakan semula merentasi berbilang goroutin.
"Tetapi mengapa bendera yang dikongsi benar untuk goroutine pertama? Saya fikir hanya yang menunggu sahaja yang akan berkongsi == benar?"
Ya, ini mungkin terasa agak berlawanan dengan intuisi jika anda berfikir hanya goroutine menunggu yang sepatutnya dikongsi == benar.
Perkaranya, pembolehubah yang dikongsi dalam g.Do memberitahu anda sama ada hasilnya dikongsi antara berbilang pemanggil. Ia pada asasnya mengatakan, "Hei, hasil carian ini digunakan oleh lebih daripada seorang pemanggil." Ini bukan tentang siapa yang menjalankan fungsi itu, ia hanya isyarat bahawa hasilnya digunakan semula merentasi berbilang goroutine.
"Saya mempunyai cache, mengapa saya memerlukan penerbangan tunggal?"
Jawapan ringkasnya ialah: cache dan penerbangan tunggal menyelesaikan masalah yang berbeza, dan ia sebenarnya berfungsi dengan sangat baik bersama-sama.
Dalam persediaan dengan cache luaran (seperti Redis atau Memcached), singleflight menambah lapisan perlindungan tambahan, bukan sahaja untuk pangkalan data anda tetapi juga untuk cache itu sendiri.
Selain itu, penerbangan tunggal membantu melindungi daripada cache miss storm (kadangkala dipanggil "cache rempuhan").
Biasanya, apabila permintaan meminta data, jika data berada dalam cache, bagus - ia adalah cache hit. Jika data tiada dalam cache, ia adalah satu kehilangan cache. Katakan 10,000 permintaan melanda sistem sekaligus sebelum cache dibina semula, pangkalan data tiba-tiba boleh diselar dengan 10,000 pertanyaan yang sama pada masa yang sama.
Semasa puncak ini, penerbangan tunggal memastikan bahawa hanya satu daripada 10,000 permintaan itu benar-benar mencapai pangkalan data.
Tetapi kemudian, dalam bahagian pelaksanaan dalaman, kita akan melihat bahawa penerbangan tunggal menggunakan kunci global untuk melindungi peta panggilan dalam penerbangan, yang boleh menjadi satu titik perbalahan bagi setiap goroutine. Ini boleh melambatkan keadaan, terutamanya jika anda berhadapan dengan kesesuaian yang tinggi.
Model di bawah mungkin berfungsi lebih baik untuk mesin dengan berbilang CPU:
Dalam persediaan ini, kami hanya menggunakan penerbangan tunggal apabila kehilangan cache berlaku.
Untuk menggunakan penerbangan tunggal, anda mula-mula membuat objek Kumpulan, yang merupakan struktur teras yang menjejaki panggilan fungsi yang sedang dijalankan yang dipautkan kepada kekunci tertentu.
Ia mempunyai dua kaedah utama yang membantu menghalang panggilan pendua:
Kami telah melihat cara menggunakan g.Do() dalam demo, mari lihat cara menggunakan g.DoChan() dengan fungsi pembalut yang diubah suai:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
Sejujurnya, menggunakan DoChan() di sini tidak banyak berubah berbanding dengan Do(), kerana kami masih menunggu keputusan dengan operasi penerimaan saluran (<-ch), yang pada dasarnya menyekat perkara yang sama cara.
Di mana DoChan() bersinar ialah apabila anda ingin memulakan operasi dan melakukan perkara lain tanpa menyekat goroutine. Contohnya, anda boleh mengendalikan tamat masa atau pembatalan dengan lebih bersih menggunakan saluran:
package singleflight type Result struct { Val interface{} Err error Shared bool }
Contoh ini turut memaparkan beberapa isu yang mungkin anda hadapi dalam senario dunia sebenar:
Ya, penerbangan tunggal menyediakan cara untuk mengendalikan situasi seperti ini dengan kumpulan.Kaedah Lupakan(kunci), yang membolehkan anda membuang pelaksanaan yang sedang berjalan.
Kaedah Forget() mengalih keluar kunci daripada peta dalaman yang menjejaki panggilan fungsi yang sedang berjalan. Ia seperti "membatalkan" kunci, jadi jika anda memanggil g.Do() sekali lagi dengan kunci itu, ia akan melaksanakan fungsi itu seolah-olah ia adalah permintaan baharu, dan bukannya menunggu pelaksanaan sebelumnya untuk selesai.
Mari kemas kini contoh kami untuk menggunakan Forget() dan lihat berapa kali fungsi itu sebenarnya dipanggil:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Goroutine 0 dan Goroutine 1 kedua-duanya memanggil Do() dengan kunci yang sama ("key-fetch-data"), dan permintaan mereka digabungkan menjadi satu pelaksanaan dan hasilnya dikongsi antara dua goroutine.
Goroutine 2, sebaliknya, memanggil Forget() sebelum menjalankan Do(). Ini mengosongkan sebarang hasil sebelumnya yang terikat dengan "data pengambilan kunci", jadi ia mencetuskan pelaksanaan baharu fungsi tersebut.
Ringkasnya, walaupun penerbangan tunggal berguna, ia masih boleh mempunyai beberapa kes tepi, contohnya:
Jika anda perasan semua isu yang telah kami bincangkan, mari selami bahagian seterusnya untuk membincangkan cara penerbangan tunggal sebenarnya berfungsi di bawah hud.
Daripada menggunakan penerbangan tunggal, anda mungkin sudah mempunyai idea asas tentang cara ia berfungsi secara dalaman, keseluruhan pelaksanaan penerbangan tunggal hanyalah kira-kira 150 baris kod.
Pada asasnya, setiap kunci unik mendapat struct yang menguruskan pelaksanaannya. Jika goroutine memanggil Do() dan mendapati bahawa kunci sudah wujud, panggilan itu akan disekat sehingga pelaksanaan pertama selesai, dan berikut ialah strukturnya:
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
Dua primitif penyegerakan digunakan di sini:
Kami akan menumpukan pada kaedah kumpulan.Do() di sini kerana kaedah lain, kumpulan.DoChan(), berfungsi dengan cara yang sama. Kaedah group.Forget() juga mudah kerana ia hanya mengalih keluar kunci daripada peta.
Apabila anda memanggil kumpulan.Do(), perkara pertama yang dilakukan ialah mengunci seluruh peta panggilan (g.mu).
"Bukankah itu buruk untuk persembahan?"
Ya, ini mungkin tidak sesuai untuk prestasi dalam setiap kes (sentiasa baik untuk menanda aras dahulu) kerana penerbangan tunggal mengunci keseluruhan kekunci. Jika anda menyasarkan prestasi yang lebih baik atau bekerja pada skala yang tinggi, pendekatan yang baik adalah untuk memecahkan atau mengedarkan kunci. Daripada menggunakan hanya satu kumpulan penerbangan tunggal, anda boleh menyebarkan beban ke beberapa kumpulan, seperti melakukan "multiflight" sebaliknya
Untuk rujukan, lihat repo ini: shardedsingleflight.
Sekarang, setelah ia mempunyai kunci, kumpulan itu melihat peta dalaman (g.m), jika sudah ada panggilan yang sedang atau selesai untuk kunci yang diberikan. Peta ini menjejaki sebarang kerja yang sedang berjalan atau selesai, dengan kunci memetakan kepada tugasan yang sepadan.
Jika kunci ditemui (goroutine lain sudah menjalankan tugas), bukannya memulakan panggilan baharu, kami hanya menambah pembilang (c.dups) untuk menjejaki permintaan pendua. Goroutine kemudiannya melepaskan kunci dan menunggu tugas asal selesai dengan memanggil call.wg.Wait() pada WaitGroup yang berkaitan.
Apabila tugasan asal selesai, goroutine ini mengambil keputusan dan mengelak daripada menjalankan tugas itu semula.
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Jika tiada goroutine lain yang berfungsi pada kunci itu, goroutine semasa bertanggungjawab untuk melaksanakan tugas itu.
Pada ketika ini, kami mencipta objek panggilan baharu, menambahkannya pada peta dan memulakan WaitGroupnya. Kemudian, kami membuka kunci mutex dan meneruskan untuk melaksanakan tugas itu sendiri melalui kaedah pembantu g.doCall(c, key, fn). Apabila tugas selesai, mana-mana gorouti menunggu dinyahsekat oleh panggilan wg.Wait().
Tiada yang terlalu liar di sini, kecuali cara kami menangani ralat, terdapat tiga senario yang mungkin:
Di sinilah perkara mula menjadi lebih bijak dalam kaedah penolong g.doCall().
"Tunggu, apakah masa jalanan.Goexit()?"
Sebelum kita menyelami kod, izinkan saya menerangkan dengan cepat, runtime.Goexit() digunakan untuk menghentikan pelaksanaan goroutine.
Apabila goroutine memanggil Goexit(), ia berhenti dan mana-mana fungsi tertunda masih dijalankan dalam tertib Masuk Pertama Keluar (LIFO), seperti biasa. Ia serupa dengan panik, tetapi terdapat beberapa perbezaan:
Sekarang, inilah ciri menarik (tidak berkaitan secara langsung dengan topik kami, tetapi patut disebut). Jika anda memanggil runtime.Goexit() dalam goroutine utama (seperti dalam main()), lihat ini:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Apa yang berlaku ialah Goexit() menamatkan goroutine utama, tetapi jika terdapat goroutine lain yang masih berjalan, program akan diteruskan kerana masa jalanan Go kekal hidup selagi sekurang-kurangnya satu goroutine aktif. Walau bagaimanapun, apabila tiada gorouti yang tinggal, ia ranap dengan ralat "tiada gorouti", sejenis sarung sudut kecil yang menyeronokkan.
Sekarang, kembali kepada kod kami, jika runtime.Goexit() hanya menamatkan goroutine semasa dan tidak boleh ditangkap oleh recover(), bagaimanakah kami dapat mengesan jika ia telah dipanggil?
Kuncinya terletak pada fakta bahawa apabila runtime.Goexit() digunakan, sebarang kod selepasnya tidak dapat dilaksanakan.
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
Dalam kes di atas, baris normalReturn = true tidak pernah dilaksanakan selepas memanggil runtime.Goexit(). Jadi, di dalam penangguhan, kita boleh menyemak sama ada normalReturn masih palsu untuk mengesan kaedah khas itu dipanggil.
Langkah seterusnya ialah memikirkan sama ada tugas itu panik atau tidak. Untuk itu, kami menggunakan recover() sebagai pulangan biasa, walaupun kod sebenar dalam penerbangan tunggal adalah lebih halus sedikit:
package singleflight type Result struct { Val interface{} Err error Shared bool }
Daripada tetapan dipulihkan = benar terus di dalam blok pulih, kod ini menjadi sedikit mewah dengan menetapkan dipulihkan selepas blok recover() sebagai baris terakhir.
Jadi, mengapa ini berkesan?
Apabila runtime.Goexit() dipanggil, ia menamatkan keseluruhan goroutine, sama seperti panik(). Walau bagaimanapun, jika panic() dipulihkan, hanya rantaian fungsi antara panic() dan recover() ditamatkan, bukan keseluruhan goroutine.
Itulah sebabnya recovered = true ditetapkan di luar penangguhan yang mengandungi recover(), ia hanya dilaksanakan dalam dua kes: apabila fungsi selesai seperti biasa atau apabila panik dipulihkan, tetapi bukan apabila runtime.Goexit() dipanggil.
Melangkah ke hadapan, kami akan membincangkan cara setiap kes dikendalikan.
func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) select { case res := <-ch: if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) case <-time.After(50 * time.Millisecond): return fmt.Errorf("timeout waiting for result") } return nil }
Jika tugasan panik semasa pelaksanaan, panik itu ditangkap dan disimpan dalam c.err sebagai panicError, yang memegang kedua-dua nilai panik dan surih tindanan. singleflight menangkap panik untuk membersihkan dengan anggun, tetapi ia tidak menelannya, ia menimbulkan semula panik selepas mengendalikan keadaannya.
Ini bermakna panik akan berlaku dalam goroutine yang melaksanakan tugas (yang pertama memulakan operasi), dan semua goroutine lain yang menunggu keputusan juga akan panik.
Memandangkan panik ini berlaku dalam kod pembangun, terpulang kepada kami untuk menanganinya dengan betul.
Kini, masih terdapat satu kes khas yang perlu kita pertimbangkan: apabila gorout lain menggunakan kaedah kumpulan.DoChan() dan menunggu keputusan melalui saluran. Dalam kes ini, penerbangan tunggal tidak boleh panik dalam gorout tersebut. Sebaliknya, ia melakukan apa yang dipanggil panik tidak boleh dipulihkan (go panic(e)), yang menyebabkan aplikasi kami ranap.
Akhir sekali, jika tugas itu dipanggil runtime.Goexit(), anda tidak perlu mengambil apa-apa tindakan selanjutnya kerana goroutine sudah dalam proses untuk dimatikan, dan kami hanya membiarkan perkara itu berlaku tanpa campur tangan.
Dan itu sahaja, tiada yang terlalu rumit kecuali untuk kes-kes khas yang telah kita bincangkan.
Hai, saya Phuong Le, seorang jurutera perisian di VictoriaMetrics. Gaya penulisan di atas memfokuskan pada kejelasan dan kesederhanaan, menerangkan konsep dengan cara yang mudah difahami, walaupun ia tidak sentiasa sejajar dengan ketepatan akademik.
Jika anda melihat apa-apa yang sudah lapuk atau jika anda mempunyai soalan, jangan teragak-agak untuk menghubungi. Anda boleh menghantar DM kepada saya di X(@func25).
Beberapa siaran lain yang mungkin anda minati:
Jika anda ingin memantau perkhidmatan anda, menjejaki metrik dan melihat prestasi semuanya, anda mungkin ingin menyemak VictoriaMetrics. Ia merupakan cara yang pantas, sumber terbuka dan penjimatan kos untuk mengawasi infrastruktur anda.
Dan kami adalah Gophers, peminat yang suka menyelidik, mencuba dan berkongsi pengetahuan tentang Go dan ekosistemnya.
Atas ialah kandungan terperinci Go Singleflight Meleleh dalam Kod Anda, Bukan dalam DB Anda. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!