Durch die Verwendung von errgroup zur Implementierung des Go-Arbeitspools bleiben Goroutinen hängen

王林
Freigeben: 2024-02-08 21:09:18
nach vorne
1016 Leute haben es durchsucht

使用 errgroup 实现 Go 工作池,goroutines 卡住

Der Herausgeber von PHP Apple stellt Ihnen heute eine Methode zur Verwendung von errgroup zur Implementierung des Go-Arbeitspools vor, die das Problem löst, dass Goroutinen stecken bleiben. Bei der gleichzeitigen Programmierung kann eine effiziente gleichzeitige Verarbeitung durch die Verwendung von Goroutinen erreicht werden. Wenn jedoch in einer bestimmten Goroutine ein Fehler auftritt oder hängen bleibt, wirkt sich dies auf die Ausführung des gesamten Programms aus. Durch die Verwendung des errgroup-Pakets können wir die Ausführung von Goroutinen elegant verwalten und Fehler behandeln, wenn Fehler auftreten, wodurch die Stabilität und Zuverlässigkeit des Programms gewährleistet wird. Werfen wir einen Blick darauf, wie dies umgesetzt wird.

Frageninhalt


Ich habe das Worker-Pool-Muster mithilfe von errgroup implementiert, damit Fehler in jeder Goroutine abgefangen werden können. Hier sind meine Daten:

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }
Nach dem Login kopieren

Dann ist die Implementierung der Worker-Funktion wie folgt:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}
Nach dem Login kopieren

calluserapi gibt einen 403 verbotenen Fehler zurück, der g.wait() aufrufen und bei einem Fehler ungleich Null alle Goroutinen sofort stoppen sollte. Aber das ist hier nicht der Fall, g.wait() wird nie aufgerufen.


Lösung


Es gibt mehrere Probleme:

  • Schleife

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }
    Nach dem Login kopieren

    Warten Sie, bis die Mitarbeiter Ergebnisse für jeden Benutzer gesendet haben. Dies passiert nicht, wenn calluserapi einen Fehler zurückgibt.

  • Arbeiter bewältigt keine jobsgeschlossenen Situationen.

Der folgende Code kann diese beiden Probleme lösen:

Deklarieren Sie einen Typ, geben Sie an, welche Benutzer verarbeitet werden sollen und wo die Ergebnisse abgelegt werden sollen:

type job struct {
    user usersinfo
    result *usersinfo
}
Nach dem Login kopieren

Ändern Sie Arbeitsthreads, um diesen neuen Typ zu verwenden. Ändern Sie außerdem den Worker so, dass er beendet wird, wenn jobs geschlossen wird.

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}
Nach dem Login kopieren

Kleben Sie sie in der Haupt-Goroutine zusammen:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.
Nach dem Login kopieren

Das obige ist der detaillierte Inhalt vonDurch die Verwendung von errgroup zur Implementierung des Go-Arbeitspools bleiben Goroutinen hängen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:stackoverflow.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage