php editor Apple today introduces you to a method of using errgroup to implement Go work pool, which solves the problem of goroutines getting stuck. In concurrent programming, efficient concurrent processing can be achieved by using goroutines, but when an error or stuck occurs in a certain goroutine, it will affect the execution of the entire program. By using the errgroup package, we can elegantly manage the execution of goroutines and handle errors when errors occur, ensuring the stability and reliability of the program. Let's take a look at how this is implemented.
I have implemented the worker pool pattern using errgroup so that errors in any goroutine can be caught. Here are my details:
jobs := make(chan usersinfo, totalusers) results := make(chan usersinfo, totalusers) g, gctx := errgroup.withcontext(ctx) for i := 1; i <= 4; i++ { g.go(func() error { err := workeruser(gctx, jobs, results) if err != nil { return err } return nil }) } for _, user := range usersresp { jobs <- user } close(jobs) var usersarray []usersinfo for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) } if err := g.wait(); err != nil { return nil, err }
Then the implementation of the worker function is as follows:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error { for { select { case <-ctx.Done(): return ctx.Err() case user, _ := <-jobs: userInfo, err := CallUserAPI(ctx, user) if err != nil { return err } results <- userInfo } } }
calluserapi returns a 403 forbidden error, which should call g.wait() and should stop all goroutines immediately on a non-nil error. But that's not the case here, g.wait() is never called.
There are several problems:
cycle
for i := 1; i <= totalusers; i++ { r := <-results usersarray = append(usersarray, r) }
Wait for workers to send results for each user. This does not happen when calluserapi
returns an error.
worker does not handle jobs
shutdown situations.
The following code can solve these two problems:
Declare a type, specifying which users to process and where to place the results:
type job struct { user usersinfo result *usersinfo }
Modify worker threads to use this new type. Additionally, modify the worker so that it exits when jobs
is closed.
func workeruser(ctx context.context, jobs <-chan job) error { for { select { case <-ctx.done(): return ctx.err() case job, ok := <-jobs: if !ok { // no more jobs, exit. return nil } var err error *job.result, err = calluserapi(ctx, job.user) if err != nil { return err } } } }
Glue them together in the main goroutine:
jobs := make(chan UsersInfo, totalUsers) usersArray := make([]UsersInfo, totalUsers) g, gCtx := errgroup.WithContext(ctx) // Start the workers. for i := 1; i <= 4; i++ { g.Go(func() error { return workerUser(gCtx, jobs) }) } // Feed the workers. for i, user := range usersResp { jobs <- job{user: user, result: &usersArray[i]} } // Close the channel to indicate that no more jobs will be sent. // The workers exit via the `if !ok { return nil }` statement. close(jobs) // Wait for the workers to complete. if err := g.Wait(); err != nil { return nil, err } // It is now safe to access the results in usersArray.
The above is the detailed content of Using errgroup to implement Go work pool, goroutines get stuck. For more information, please follow other related articles on the PHP Chinese website!