Home > Backend Development > Golang > Using errgroup to implement Go work pool, goroutines get stuck

Using errgroup to implement Go work pool, goroutines get stuck

王林
Release: 2024-02-08 21:09:18
forward
1047 people have browsed it

使用 errgroup 实现 Go 工作池,goroutines 卡住

php editor Apple today introduces you to a method of using errgroup to implement Go work pool, which solves the problem of goroutines getting stuck. In concurrent programming, efficient concurrent processing can be achieved by using goroutines, but when an error or stuck occurs in a certain goroutine, it will affect the execution of the entire program. By using the errgroup package, we can elegantly manage the execution of goroutines and handle errors when errors occur, ensuring the stability and reliability of the program. Let's take a look at how this is implemented.

Question content


I have implemented the worker pool pattern using errgroup so that errors in any goroutine can be caught. Here are my details:

jobs := make(chan usersinfo, totalusers)
    results := make(chan usersinfo, totalusers)

    g, gctx := errgroup.withcontext(ctx)

    for i := 1; i <= 4; i++ {
        g.go(func() error {
            err := workeruser(gctx, jobs, results)
            if err != nil {
                return err
            }
            return nil
        })
    }

    for _, user := range usersresp {
        jobs <- user
    }
    close(jobs)

    var usersarray []usersinfo
    for  i := 1; i <= totalusers; i++ {
        r := <-results
        usersarray = append(usersarray, r)
    }

    if err := g.wait(); err != nil {
        return nil, err
    }
Copy after login

Then the implementation of the worker function is as follows:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case user, _ := <-jobs:
        userInfo, err := CallUserAPI(ctx, user)
        if err != nil {
            return err
        }
        results <- userInfo
    }
 }
}
Copy after login

calluserapi returns a 403 forbidden error, which should call g.wait() and should stop all goroutines immediately on a non-nil error. But that's not the case here, g.wait() is never called.


Solution


There are several problems:

  • cycle

    for  i := 1; i <= totalusers; i++ {
          r := <-results
          usersarray = append(usersarray, r)
      }
    Copy after login

    Wait for workers to send results for each user. This does not happen when calluserapi returns an error.

  • worker does not handle jobs shutdown situations.

The following code can solve these two problems:

Declare a type, specifying which users to process and where to place the results:

type job struct {
    user usersinfo
    result *usersinfo
}
Copy after login

Modify worker threads to use this new type. Additionally, modify the worker so that it exits when jobs is closed.

func workeruser(ctx context.context, jobs <-chan job) error {
    for {
        select {
        case <-ctx.done():
            return ctx.err()
        case job, ok := <-jobs:
            if !ok {
                // no more jobs, exit.
                return nil
            }
            var err error
            *job.result, err = calluserapi(ctx, job.user)
            if err != nil {
                return err
            }
        }
    }
}
Copy after login

Glue them together in the main goroutine:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        return workerUser(gCtx, jobs)
    })
}

// Feed the workers.  
for i, user := range usersResp {
    jobs <- job{user: user, result: &usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
    return nil, err
}

// It is now safe to access the results in usersArray.
Copy after login

The above is the detailed content of Using errgroup to implement Go work pool, goroutines get stuck. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template