Der Herausgeber von PHP Apple stellt Ihnen heute eine Methode zur Verwendung von errgroup zur Implementierung des Go-Arbeitspools vor, die das Problem löst, dass Goroutinen stecken bleiben. Bei der gleichzeitigen Programmierung kann eine effiziente gleichzeitige Verarbeitung durch die Verwendung von Goroutinen erreicht werden. Wenn jedoch in einer bestimmten Goroutine ein Fehler auftritt oder hängen bleibt, wirkt sich dies auf die Ausführung des gesamten Programms aus. Durch die Verwendung des errgroup-Pakets können wir die Ausführung von Goroutinen elegant verwalten und Fehler behandeln, wenn Fehler auftreten, wodurch die Stabilität und Zuverlässigkeit des Programms gewährleistet wird. Werfen wir einen Blick darauf, wie dies umgesetzt wird.
Ich habe das Worker-Pool-Muster mithilfe von errgroup implementiert, damit Fehler in jeder Goroutine abgefangen werden können. Hier sind meine Daten:
jobs := make(chan usersinfo, totalusers) results := make(chan usersinfo, totalusers) g, gctx := errgroup.withcontext(ctx) for i := 1; i <= 4; i++ { g.go(func() error { err := workeruser(gctx, jobs, results) if err != nil { return err } return nil }) } for _, user := range usersresp { jobs <- user } close(jobs) var usersarray []usersinfo for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) } if err := g.wait(); err != nil { return nil, err }
Dann ist die Implementierung der Worker-Funktion wie folgt:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error { for { select { case <-ctx.Done(): return ctx.Err() case user, _ := <-jobs: userInfo, err := CallUserAPI(ctx, user) if err != nil { return err } results <- userInfo } } }
calluserapi gibt einen 403 verbotenen Fehler zurück, der g.wait() aufrufen und bei einem Fehler ungleich Null alle Goroutinen sofort stoppen sollte. Aber das ist hier nicht der Fall, g.wait() wird nie aufgerufen.
Es gibt mehrere Probleme:
Schleife
for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) }
Warten Sie, bis die Mitarbeiter Ergebnisse für jeden Benutzer gesendet haben. Dies passiert nicht, wenn calluserapi
einen Fehler zurückgibt.
Arbeiter bewältigt keine jobs
geschlossenen Situationen.
Der folgende Code kann diese beiden Probleme lösen:
Deklarieren Sie einen Typ, geben Sie an, welche Benutzer verarbeitet werden sollen und wo die Ergebnisse abgelegt werden sollen:
type job struct { user usersinfo result *usersinfo }
Ändern Sie Arbeitsthreads, um diesen neuen Typ zu verwenden. Ändern Sie außerdem den Worker so, dass er beendet wird, wenn jobs
geschlossen wird.
func workeruser(ctx context.context, jobs <-chan job) error { for { select { case <-ctx.done(): return ctx.err() case job, ok := <-jobs: if !ok { // no more jobs, exit. return nil } var err error *job.result, err = calluserapi(ctx, job.user) if err != nil { return err } } } }
Kleben Sie sie in der Haupt-Goroutine zusammen:
jobs := make(chan UsersInfo, totalUsers) usersArray := make([]UsersInfo, totalUsers) g, gCtx := errgroup.WithContext(ctx) // Start the workers. for i := 1; i <= 4; i++ { g.Go(func() error { return workerUser(gCtx, jobs) }) } // Feed the workers. for i, user := range usersResp { jobs <- job{user: user, result: &usersArray[i]} } // Close the channel to indicate that no more jobs will be sent. // The workers exit via the `if !ok { return nil }` statement. close(jobs) // Wait for the workers to complete. if err := g.Wait(); err != nil { return nil, err } // It is now safe to access the results in usersArray.
Das obige ist der detaillierte Inhalt vonDurch die Verwendung von errgroup zur Implementierung des Go-Arbeitspools bleiben Goroutinen hängen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!