Maison > développement back-end > Golang > En utilisant errgroup pour implémenter le pool de travail Go, les goroutines restent bloquées

En utilisant errgroup pour implémenter le pool de travail Go, les goroutines restent bloquées

王林
Libérer: 2024-02-08 21:09:18
avant
1047 Les gens l'ont consulté

使用 errgroup 实现 Go 工作池,goroutines 卡住

L'éditeur de PHP Apple vous présente aujourd'hui une méthode d'utilisation d'errgroup pour implémenter le pool de travail Go, qui résout le problème du blocage des goroutines. En programmation simultanée, un traitement simultané efficace peut être obtenu en utilisant des goroutines, mais lorsqu'une erreur ou un blocage se produit dans une certaine goroutine, cela affectera l'exécution de l'ensemble du programme. En utilisant le package errgroup, nous pouvons gérer avec élégance l'exécution des goroutines et gérer les erreurs lorsque des erreurs se produisent, garantissant ainsi la stabilité et la fiabilité du programme. Voyons comment cela est mis en œuvre.

Contenu de la question


J'ai implémenté le modèle de pool de travailleurs en utilisant errgroup afin que les erreurs dans n'importe quelle goroutine puissent être détectées. Voici mes coordonnées :

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }
Copier après la connexion

Ensuite, la mise en œuvre de la fonction travailleur est la suivante :

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}
Copier après la connexion

calluserapi renvoie une erreur interdite 403, qui devrait appeler g.wait() et devrait arrêter toutes les goroutines immédiatement en cas d'erreur non nulle. Mais ce n'est pas le cas ici, g.wait() n'est jamais appelé.


Solution


Il y a plusieurs problèmes :

  • Boucle

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }
    Copier après la connexion

    En attente que le personnel envoie les résultats pour chaque utilisateur. Cela ne se produit pas lorsque calluserapi renvoie une erreur.

  • le travailleur ne gère pas les jobssituations fermées.

Le code suivant peut résoudre ces deux problèmes :

Déclarez un type, précisez quels utilisateurs traiter et où mettre les résultats :

type job struct {
    user usersinfo
    result *usersinfo
}
Copier après la connexion

Modifiez les threads de travail pour utiliser ce nouveau type. De plus, modifiez le travailleur pour qu'il se ferme lorsque jobs est fermé.

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}
Copier après la connexion

Collez-les ensemble dans la goroutine principale :

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:stackoverflow.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal