原文發佈在VictoriaMetrics部落格:https://victoriametrics.com/blog/go-singleflight/
這篇文章是關於 Go 中處理並發的系列文章的一部分:
因此,當您同時收到多個請求相同的資料時,預設行為是每個請求都會單獨存取資料庫以獲取相同的資訊。這意味著您最終會執行多次相同的查詢,老實說,這效率很低。
它最終會為資料庫帶來不必要的負載,這可能會減慢一切,但有一種方法可以解決這個問題。
這個想法是只有第一個請求實際上才會傳送到資料庫。其餘請求等待第一個請求完成。一旦資料從初始請求返回,其他請求就會得到相同的結果 - 不需要額外的查詢。
那麼,現在您已經很清楚這篇文章的內容了,對吧?
Go 中的 singleflight 套件是專門為處理我們剛才討論的問題而建造的。請注意,它不是標準庫的一部分,但由 Go 團隊維護和開發。
singleflight 的作用是確保只有一個 goroutine 實際執行該操作,例如從資料庫取得資料。它只允許在任何給定時刻對同一條資料(稱為“密鑰”)執行一次“進行中”(正在進行的)操作。
因此,如果其他 goroutine 在該操作仍在進行時請求相同的資料(相同的鍵),它們只會等待。然後,當第一個操作完成時,所有其他操作都會得到相同的結果,而無需再次執行該操作。
好了,說得夠多了,讓我們深入了解 singleflight 的實際運作原理:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
這裡發生了什麼事:
我們正在模擬這樣的情況:5 個 goroutine 幾乎同時嘗試獲取相同的數據,間隔 60 毫秒。為了簡單起見,我們使用隨機數來模擬從資料庫中獲得的資料。
使用 singleflight.Group,我們確保只有第一個 goroutine 實際運行 fetchData(),其餘的 goroutine 等待結果。
行 v, err, shared := g.Do("key-fetch-data", fetchData) 分配一個唯一的鍵 ("key-fetch-data") 來追蹤這些請求。因此,如果另一個 goroutine 請求相同的鍵,而第一個 goroutine 仍在獲取數據,它會等待結果而不是開始新的呼叫。
一旦第一個呼叫完成,任何等待的 goroutine 都會得到相同的結果,正如我們在輸出中看到的那樣。雖然我們有 5 個 goroutine 請求數據,但 fetchData 只運行了兩次,這是一個巨大的提升。
共享標誌確認結果已在多個 goroutine 之間重複使用。
「但是為什麼第一個 goroutine 的共享標誌為 true?我以為只有等待的 goroutine 才會共用 == true?」
是的,如果您認為只有等待的 goroutine 應該共享 == true,這可能會感覺有點違反直覺。
問題是,g.Do 中的共享變數告訴您結果是否在多個呼叫者之間共用。它基本上是在說:「嘿,這個結果被多個呼叫者使用了。」這與誰運行該函數無關,它只是一個信號,表明結果在多個 goroutine 之間重複使用。
「我有緩存,為什麼我需要單次飛行?」
簡短的答案是:快取和 singleflight 解決不同的問題,而且它們實際上可以很好地協同工作。
在使用外部快取(如 Redis 或 Memcached)的設定中,singleflight 增加了額外的保護層,不僅為您的資料庫,也為快取本身。
此外,singleflight 有助於防止快取未命中風暴(有時稱為「快取踩踏」)。
通常,當請求請求資料時,如果資料在快取中,那就太好了 - 這是快取命中。如果資料不在快取中,則為快取未命中。假設在重建快取之前有 10,000 個請求同時到達系統,資料庫可能會突然同時受到 10,000 個相同查詢的衝擊。
在此高峰期間,singleflight 確保這 10,000 個請求中只有一個真正到達資料庫。
但是稍後,在內部實作部分,我們將看到 singleflight 使用全域鎖來保護正在進行的呼叫的映射,這可能會成為每個 goroutine 的單點爭用。這可能會減慢速度,尤其是在處理高並發時。
下面的模型可能更適合具有多個 CPU 的機器:
在此設定中,我們只在發生快取未命中時使用 singleflight。
要使用 singleflight,您首先建立一個 Group 對象,它是追蹤連結到特定鍵的正在進行的函數呼叫的核心結構。
它有兩個有助於防止重複呼叫的關鍵方法:
我們已經在示範中了解如何使用 g.Do(),讓我們看看如何使用經過修改的包裝函數的 g.DoChan() :
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
說實話,這裡使用 DoChan() 與 Do() 相比並沒有太大變化,因為我們仍在等待通道接收操作 (
DoChan() 的閃光點是當你想要啟動一個操作並在不阻塞 goroutine 的情況下執行其他操作。例如,您可以使用通道更乾淨地處理逾時或取消:
package singleflight type Result struct { Val interface{} Err error Shared bool }
此範例也提出了您在現實場景中可能遇到的一些問題:
是的,singleflight 提供了一種使用 group.Forget(key) 方法來處理此類情況的方法,它可以讓您放棄正在進行的執行。
Forget() 方法從追蹤正在進行的函數呼叫的內部映射中刪除一個鍵。這有點像“使鍵無效”,因此如果您使用該鍵再次呼叫 g.Do(),它將像新請求一樣執行該函數,而不是等待上一次執行完成。
讓我們更新範例以使用 Forget() 並查看該函數實際被呼叫了多少次:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
Goroutine 0 和 Goroutine 1 都使用相同的鍵(“key-fetch-data”)呼叫 Do(),它們的請求合併為一次執行,結果在兩個 Goroutine 之間共用。
Goroutine 2,另一方面,在執行 Do() 之前呼叫 Forget()。這會清除與「key-fetch-data」相關的任何先前結果,因此它會觸發該函數的新執行。
總而言之,雖然 singleflight 很有用,但它仍然可能存在一些邊緣情況,例如:
如果您已經注意到我們討論過的所有問題,讓我們深入到下一部分來討論 singleflight 的實際工作原理。
透過使用singleflight,你可能已經對它的內部運作有了基本的了解,singleflight的整個實作只有大約150行程式碼。
基本上,每個唯一的鍵都有一個管理其執行的結構。如果 goroutine 呼叫 Do() 並發現 key 已經存在,則該呼叫將被阻塞,直到第一次執行完成,結構如下:
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
這裡使用了兩個同步原語:
這裡我們將重點放在 group.Do() 方法,因為另一個方法 group.DoChan() 的工作方式類似。 group.Forget() 方法也很簡單,因為它只是從地圖中刪除鍵。
當你呼叫 group.Do() 時,它所做的第一件事就是鎖定整個呼叫映射 (g.mu)。
「這對效能不是很不利嗎?」
是的,它可能不適合每種情況下的效能(總是先進行基準測試),因為 singleflight 鎖定了整個金鑰。如果您的目標是獲得更好的效能或大規模工作,一個好的方法是分片或分發金鑰。您可以將負載分散到多個組,而不是僅使用單一飛行組,有點像「多重飛行」
作為參考,請查看此儲存庫:shardedsingleflight。
現在,一旦獲得鎖,該群組就會查看內部映射 (g.m),如果已經有對給定密鑰的正在進行或已完成的呼叫。該地圖追蹤任何正在進行或已完成的工作,並將鍵映射到相應的任務。
如果找到該鍵(另一個 goroutine 已經在運行該任務),我們只需增加一個計數器(c.dups)來追蹤重複請求,而不是開始新的呼叫。然後,goroutine 釋放鎖定並透過在關聯的 WaitGroup 上呼叫 call.wg.Wait() 來等待原始任務完成。
當原始任務完成時,這個 goroutine 會取得結果並避免再次執行該任務。
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
如果沒有其他 Goroutine 正在處理該鍵,則當前 Goroutine 負責執行該任務。
此時,我們建立一個新的呼叫對象,將其新增至映射中,並初始化其 WaitGroup。然後,我們解鎖互斥體並繼續透過輔助方法 g.doCall(c, key, fn) 自行執行任務。當任務完成時,任何等待的 goroutine 都會被 wg.Wait() 呼叫解除阻塞。
這裡沒什麼太瘋狂的,除了我們如何處理錯誤之外,還有三種可能的情況:
這是輔助方法 g.doCall() 中事情開始變得更加聰明的地方。
「等等,什麼是runtime.Goexit()?」
在深入程式碼之前,讓我快速解釋一下,runtime.Goexit() 用來停止 goroutine 的執行。
當 goroutine 呼叫 Goexit() 時,它會停止,並且任何延遲函數仍然按照後進先出 (LIFO) 順序運行,就像正常情況一樣。它與恐慌類似,但有一些區別:
現在,這是一個有趣的怪癖(與我們的主題沒有直接關係,但值得一提)。如果你在主協程中呼叫runtime.Goexit()(例如在main()內部),請檢查一下:
var callCount atomic.Int32 var wg sync.WaitGroup // Simulate a function that fetches data from a database func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil } // Wrap the fetchData function with singleflight func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil } func main() { var g singleflight.Group // 5 goroutines to fetch the same data const numGoroutines = 5 wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) } wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) } // Output: // Goroutine 0: result: 90, shared: true // Goroutine 2: result: 90, shared: true // Goroutine 1: result: 90, shared: true // Goroutine 3: result: 13, shared: true // Goroutine 4: result: 13, shared: true // Function was called 2 times
發生的情況是 Goexit() 終止了主 goroutine,但如果還有其他 goroutine 仍在運行,程式會繼續運行,因為只要至少有一個 goroutine 處於活動狀態,Go 運行時就會保持活動狀態。然而,一旦沒有剩下 goroutines,它就會因“no goroutine”錯誤而崩潰,這是一個有趣的小角落案例。
現在,回到我們的程式碼,如果runtime.Goexit()僅終止目前的goroutine並且無法被recover()捕獲,我們如何偵測它是否被呼叫?
關鍵在於,當呼叫runtime.Goexit()時,其後面的任何程式碼都不會被執行。
// Wrap the fetchData function with singleflight using DoChan func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) res := <-ch if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) return nil }
在上面的情況下,呼叫runtime.Goexit()之後,normalReturn = true這一行永遠不會被執行。因此,在 defer 內部,我們可以檢查 normalReturn 是否仍然為 false,以偵測是否呼叫了特殊方法。
下一步是確定任務是否出現恐慌。為此,我們使用recover()作為正常返回,儘管singleflight中的實際程式碼有點微妙:
package singleflight type Result struct { Val interface{} Err error Shared bool }
這段程式碼不是直接在recover區塊內設定recovered = true,而是透過在recover()區塊之後將recovery設定為最後一行來獲得一點奇特的效果。
那麼,為什麼這會起作用?
當調用runtime.Goexit()時,它會終止整個goroutine,就像panic()一樣。然而,如果panic()被恢復,只有panic()和recover()之間的函數鏈被終止,而不是整個goroutine。
這就是為什麼在包含recover()的defer之外設定recovered = true,它只在兩種情況下執行:當函數正常完成時或當恐慌恢復時,但在呼叫runtime.Goexit()時不會執行。
接下來,我們將討論如何處理每個案例。
func fetchDataWrapperWithTimeout(g *singleflight.Group, id int) error { defer wg.Done() ch := g.DoChan("key-fetch-data", fetchData) select { case res := <-ch: if res.Err != nil { return res.Err } fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, res.Val, res.Shared) case <-time.After(50 * time.Millisecond): return fmt.Errorf("timeout waiting for result") } return nil }
如果任務在執行過程中發生緊急情況,則會捕獲緊急情況並將其保存在 c.err 中作為緊急錯誤,其中包含緊急值和堆疊追蹤。 singleflight 捕捉到恐慌並優雅地清理,但它不會吞掉它,它會在處理其狀態後重新拋出恐慌。
這意味著執行任務的 Goroutine(第一個開始執行操作的 Goroutine)會發生恐慌,並且所有其他等待結果的 Goroutine 也會發生恐慌。
由於這種恐慌發生在開發人員的程式碼中,因此我們有責任妥善處理它。
現在,我們仍然需要考慮一種特殊情況:當其他 goroutine 使用 group.DoChan() 方法並透過通道等待結果時。在這種情況下,singleflight 不能在這些 goroutine 中發生恐慌。相反,它會執行所謂的不可恢復的恐慌(gopanic(e)),這會使我們的應用程式崩潰。
最後,如果任務呼叫了runtime.Goexit(),則不需要採取任何進一步的操作,因為goroutine已經處於關閉過程中,我們只是讓它發生而不干擾。
差不多就是這樣,除了我們討論過的特殊情況之外,沒有什麼太複雜的。
大家好,我是 Phuong Le,VictoriaMetrics 的軟體工程師。上述寫作風格著重於清晰和簡單,以易於理解的方式解釋概念,即使它並不總是與學術精度完全一致。
如果您發現任何過時的內容或有疑問,請隨時與我們聯繫。您可以在 X(@func25) 上留言給我。
您可能感興趣的其他一些帖子:
如果您想監控您的服務、追蹤指標並了解一切的執行情況,您可能需要查看 VictoriaMetrics。這是一種快速、開源且節省成本的方式來監控您的基礎設施。
我們是 Gophers,熱愛研究、實驗和分享 Go 及其生態系統知識的愛好者。
以上是Go Singleflight 融入您的程式碼中,而不是您的資料庫中的詳細內容。更多資訊請關注PHP中文網其他相關文章!