Parallelität und WorkerPool in der Go-Sprache – Teil 1

Freigeben: 2023-07-25 14:35:02
nach vorne
545 Leute haben es durchsucht

Parallelität ist zu einem wesentlichen Merkmal moderner Programmiersprachen geworden. Die meisten Programmiersprachen verfügen mittlerweile über eine Methode, um Parallelität zu erreichen.

Einige dieser Implementierungen sind sehr leistungsfähig und können die Last auf verschiedene Systemthreads wie Java usw. verlagern; andere simulieren dieses Verhalten im selben Thread wie Ruby usw.

Golangs Parallelitätsmodell namens CSP (Communicating Sequential Process) ist sehr leistungsfähig und unterteilt ein Problem in kleinere sequentielle Prozesse und plant dann Instanzen dieser Prozesse (sogenannte Goroutinen). Diese Prozesse kommunizieren, indem sie Informationen über Kanäle weiterleiten.

In diesem Artikel werden wir untersuchen, wie wir die Parallelität von Golang nutzen und in workerPool verwenden können. Im zweiten Artikel der Reihe werden wir untersuchen, wie man eine leistungsstarke Parallelitätslösung aufbaut.

Ein einfaches Beispiel

Angenommen, wir müssen eine externe API-Schnittstelle aufrufen und der gesamte Vorgang dauert 100 ms. Wenn wir diese Schnittstelle 1000 Mal synchron aufrufen müssen, dauert es 100 Sekunden.

//// model/data.go

package model

type SimpleData struct {
 ID int
}

//// basic/basic.go

package basic

import (
 "fmt"
 "github.com/Joker666/goworkerpool/model"
 "time"
)

func Work(allData []model.SimpleData) {
 start := time.Now()
 for i, _ := range allData {
  Process(allData[i])
 }
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func Process(data model.SimpleData) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 fmt.Printf("Finish processing %d\n", data.ID)
}

//// main.go

package main

import (
 "fmt"
 "github.com/Joker666/goworkerpool/basic"
 "github.com/Joker666/goworkerpool/model"
 "github.com/Joker666/goworkerpool/worker"
)

func main() {
 // Prepare the data
 var allData []model.SimpleData
 for i := 0; i < 1000; i++ {
  data := model.SimpleData{ ID: i }
  allData = append(allData, data)
 }
 fmt.Printf("Start processing all work \n")

 // Process
 basic.Work(allData)
}
Nach dem Login kopieren
Start processing all work
Took ===============> 1m40.226679665s
Nach dem Login kopieren

Der obige Code erstellt das Modellpaket, das eine Struktur enthält, die nur ein Mitglied vom Typ int hat. Wir verarbeiten die Daten synchron, was natürlich nicht optimal ist, da diese Aufgaben gleichzeitig bearbeitet werden können. Lassen Sie uns die Lösung ändern und Goroutine und Channel verwenden, um damit umzugehen.

Asynchron

//// worker/notPooled.go

func NotPooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup

 dataCh := make(chan model.SimpleData, 100)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for data := range dataCh {
   wg.Add(1)
   go func(data model.SimpleData) {
    defer wg.Done()
    basic.Process(data)
   }(data)
  }
 }()

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.NotPooledWork(allData)
Nach dem Login kopieren
Start processing all work
Took ===============> 101.191534ms
Nach dem Login kopieren

Im obigen Code erstellen wir einen Cache-Kanal mit einer Kapazität von 100 und schieben Daten über NoPooledWork() in den Kanal. Nachdem die Kanallänge 100 erreicht hat, können wir keine Elemente hinzufügen, bis ein Element gelesen wurde. Verwenden Sie for range, um den Kanal zu lesen und eine Goroutine-Verarbeitung zu generieren. Hier haben wir keine Begrenzung der Anzahl der generierten Goroutinen, die möglichst viele Aufgaben bewältigen können. Theoretisch können mit den erforderlichen Ressourcen so viele Daten wie möglich verarbeitet werden. Beim Ausführen des Codes dauerte es nur 100 ms, um 1000 Aufgaben abzuschließen. Es ist verrückt! Nicht ganz, lesen Sie weiter.

Problem

Wenn wir nicht über alle Ressourcen der Erde verfügen, können zu einem bestimmten Zeitpunkt nur eine begrenzte Menge Ressourcen zugewiesen werden. Der von einer Goroutine belegte Mindestspeicher beträgt 2 KB, kann aber auch 1 GB erreichen. Die obige Lösung, bei der alle Aufgaben gleichzeitig ausgeführt werden, geht von einer Million Aufgaben aus und erschöpft schnell den Speicher und die CPU der Maschine. Wir müssen entweder die Konfiguration der Maschine aktualisieren oder andere bessere Lösungen finden.

计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。

解决方案:Worker Pool

我们通过实现 worker pool 来修复之前遇到的问题。

//// worker/pooled.go

func PooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    basic.Process(data)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.PooledWork(allData)
Nach dem Login kopieren
Start processing all work
Took ===============> 1.002972449s
Nach dem Login kopieren

上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。

Go 语言的 channel 可以当作队列使用。

这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。

处理错误

我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。

//// worker/pooledError.go

func PooledWorkError(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)
 errors := make(chan error, 1000)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    process(data, errors)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for {
   select {
   case err := <-errors:
    fmt.Println("finished with error:", err.Error())
   case <-time.After(time.Second * 1):
    fmt.Println("Timeout: errors finished")
    return
   }
  }
 }()

 defer close(errors)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func process(data model.SimpleData, errors chan<- error) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 if data.ID % 29 == 0 {
  errors <- fmt.Errorf("error on job %v", data.ID)
 } else {
  fmt.Printf("Finish processing %d\n", data.ID)
 }
}

//// main.go

// Process
worker.PooledWorkError(allData)
Nach dem Login kopieren

我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。

比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,

我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。

总结

Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!

Das obige ist der detaillierte Inhalt vonParallelität und WorkerPool in der Go-Sprache – Teil 1. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:Go语言进阶学习
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!