Je suis nouveau sur GO et j'ai une question sur l'arrêt de goroutine en utilisant le signal de canal.
J'ai une goroutine de longue date (plus de 1000) et un manager pour la gérer :
func myThreadFunc(stop chan bool) { for { select { case <- stop: log.Debug("Stopping thread") return default: callClientTask() } } } func callClientTask() { // This can take long time up to 30 seconds - this is external HTTP API call time.Sleep(5 * time.Second) } func manager() { var cancelChannelSlice []chan bool for i := 0; i < 1000; i++ { cancelChannel := make(chan bool) cancelChannelSlice = append(cancelChannelSlice, cancelChannel) go myThreadFunc(cancelChannel) } var stopTest = func() { for _, c := range cancelChannelSlice { c <- true } } timeout := time.After(time.Duration(300) * time.Second) for { select { case <-timeout: stopTest() default: time.Sleep(time.Second) } } }
Dans ce cas, à chaque fois que j'appelle c <- true
管理器都会等待 callClientTask()
完成,然后转到下一个 cancelChannel
Je veux que toutes les goroutines s'arrêtent en 1 itération de callClientTask()
(pas plus de 30 secondes)
La seule façon que j'ai essayée était de lancer la nouvelle goroutine comme ceci :
var stopTest = func() { for _, c := range cancelChannelSlice { go func(c chan bool) { c <- true close(c) }(c) } }
Est-ce que je fais les choses de la bonne manière ?
D'après ce que j'ai compris de votre question, "Vous voulez que toutes les goroutines s'arrêtent en 1 itération de callClientTask() (pas plus de 30 secondes)" et que les threads de travail s'exécutent simultanément sans question de synchronisation.
J'ai réorganisé le code pour qu'il s'exécute simultanément avec le groupe d'attente.
Exemple de code :
package main import ( "log" "sync" "time" ) func worker(stop <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-stop: log.Println("Stopping thread") return default: callClientTask() } } } func callClientTask() { time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes) } func main() { var wg sync.WaitGroup stop := make(chan struct{}) for i := 0; i < 1000; i++ { wg.Add(1) go worker(stop, &wg) } time.Sleep(5 * time.Second) // allow workers to run for a while close(stop) // stop all workers, close channel wg.Wait() // wait for all workers }
Sortie :
2023/10/26 10:40:44 Stopping thread 2023/10/26 10:40:44 Stopping thread .... 2023/10/26 10:40:49 Stopping thread 2023/10/26 10:40:49 Stopping thread
Éditeur :
Si vous souhaitez arrêter certains travailleurs, vous devez mettre à jour les travailleurs. Le code suivant comprend un travailleur avec des canaux « stop » et « stop » et une fonction start/stop.
Exemple de code :
package main import ( "log" "sync" "time" ) type Worker struct { stop chan struct{} stopped chan struct{} } func NewWorker() *Worker { return &Worker{ stop: make(chan struct{}), stopped: make(chan struct{}), } } func (w *Worker) Start(wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for { select { case <-w.stop: log.Println("Stopping thread") close(w.stopped) return default: callClientTask() } } }() } func (w *Worker) Stop() { close(w.stop) <-w.stopped } func callClientTask() { time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes) } func main() { var wg sync.WaitGroup workers := make([]*Worker, 1000) for i := 0; i < 1000; i++ { workers[i] = NewWorker() workers[i].Start(&wg) } time.Sleep(5 * time.Second) // allow workers to run for a while for i := 0; i < 100; i++ { // stop first 100 workers workers[i].Stop() } for i := 100; i < 1000; i++ { // wait other workers to finish workers[i].Stop() } wg.Wait() }
Sortie :
2023/10/26 12:51:26 Stopping thread 2023/10/26 12:51:28 Stopping thread 2023/10/26 12:51:30 Stopping thread ....
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!