Parallelität ist zu einem wesentlichen Merkmal moderner Programmiersprachen geworden. Die meisten Programmiersprachen verfügen mittlerweile über eine Methode, um Parallelität zu erreichen.
Einige dieser Implementierungen sind sehr leistungsfähig und können die Last auf verschiedene Systemthreads wie Java usw. verlagern; andere simulieren dieses Verhalten im selben Thread wie Ruby usw.
Golangs Parallelitätsmodell namens CSP (Communicating Sequential Process) ist sehr leistungsfähig und unterteilt ein Problem in kleinere sequentielle Prozesse und plant dann Instanzen dieser Prozesse (sogenannte Goroutinen). Diese Prozesse kommunizieren, indem sie Informationen über Kanäle weiterleiten.
In diesem Artikel werden wir untersuchen, wie wir die Parallelität von Golang nutzen und in workerPool verwenden können. Im zweiten Artikel der Reihe werden wir untersuchen, wie man eine leistungsstarke Parallelitätslösung aufbaut.
Angenommen, wir müssen eine externe API-Schnittstelle aufrufen und der gesamte Vorgang dauert 100 ms. Wenn wir diese Schnittstelle 1000 Mal synchron aufrufen müssen, dauert es 100 Sekunden.
//// model/data.go package model type SimpleData struct { ID int } //// basic/basic.go package basic import ( "fmt" "github.com/Joker666/goworkerpool/model" "time" ) func Work(allData []model.SimpleData) { start := time.Now() for i, _ := range allData { Process(allData[i]) } elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func Process(data model.SimpleData) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) fmt.Printf("Finish processing %d\n", data.ID) } //// main.go package main import ( "fmt" "github.com/Joker666/goworkerpool/basic" "github.com/Joker666/goworkerpool/model" "github.com/Joker666/goworkerpool/worker" ) func main() { // Prepare the data var allData []model.SimpleData for i := 0; i < 1000; i++ { data := model.SimpleData{ ID: i } allData = append(allData, data) } fmt.Printf("Start processing all work \n") // Process basic.Work(allData) }
Start processing all work Took ===============> 1m40.226679665s
Der obige Code erstellt das Modellpaket, das eine Struktur enthält, die nur ein Mitglied vom Typ int hat. Wir verarbeiten die Daten synchron, was natürlich nicht optimal ist, da diese Aufgaben gleichzeitig bearbeitet werden können. Lassen Sie uns die Lösung ändern und Goroutine und Channel verwenden, um damit umzugehen.
//// worker/notPooled.go func NotPooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup dataCh := make(chan model.SimpleData, 100) wg.Add(1) go func() { defer wg.Done() for data := range dataCh { wg.Add(1) go func(data model.SimpleData) { defer wg.Done() basic.Process(data) }(data) } }() for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.NotPooledWork(allData)
Start processing all work Took ===============> 101.191534ms
Im obigen Code erstellen wir einen Cache-Kanal mit einer Kapazität von 100 und schieben Daten über NoPooledWork() in den Kanal. Nachdem die Kanallänge 100 erreicht hat, können wir keine Elemente hinzufügen, bis ein Element gelesen wurde. Verwenden Sie for range, um den Kanal zu lesen und eine Goroutine-Verarbeitung zu generieren. Hier haben wir keine Begrenzung der Anzahl der generierten Goroutinen, die möglichst viele Aufgaben bewältigen können. Theoretisch können mit den erforderlichen Ressourcen so viele Daten wie möglich verarbeitet werden. Beim Ausführen des Codes dauerte es nur 100 ms, um 1000 Aufgaben abzuschließen. Es ist verrückt! Nicht ganz, lesen Sie weiter.
Wenn wir nicht über alle Ressourcen der Erde verfügen, können zu einem bestimmten Zeitpunkt nur eine begrenzte Menge Ressourcen zugewiesen werden. Der von einer Goroutine belegte Mindestspeicher beträgt 2 KB, kann aber auch 1 GB erreichen. Die obige Lösung, bei der alle Aufgaben gleichzeitig ausgeführt werden, geht von einer Million Aufgaben aus und erschöpft schnell den Speicher und die CPU der Maschine. Wir müssen entweder die Konfiguration der Maschine aktualisieren oder andere bessere Lösungen finden.
计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。
我们通过实现 worker pool 来修复之前遇到的问题。
//// worker/pooled.go func PooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { basic.Process(data) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.PooledWork(allData)
Start processing all work Took ===============> 1.002972449s
上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。
Go 语言的 channel 可以当作队列使用。
这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。
我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。
//// worker/pooledError.go func PooledWorkError(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) errors := make(chan error, 1000) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { process(data, errors) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Add(1) go func() { defer wg.Done() for { select { case err := <-errors: fmt.Println("finished with error:", err.Error()) case <-time.After(time.Second * 1): fmt.Println("Timeout: errors finished") return } } }() defer close(errors) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func process(data model.SimpleData, errors chan<- error) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) if data.ID % 29 == 0 { errors <- fmt.Errorf("error on job %v", data.ID) } else { fmt.Printf("Finish processing %d\n", data.ID) } } //// main.go // Process worker.PooledWorkError(allData)
我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。
比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,
我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。
Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!
Das obige ist der detaillierte Inhalt vonParallelität und WorkerPool in der Go-Sprache – Teil 1. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!