Ich bin neu bei GO und habe eine Frage zum Stoppen von Goroutine mithilfe des Kanalsignals.
Ich habe eine langjährige Goroutine (über 1000 davon) und einen Manager, der sie verwaltet:
func myThreadFunc(stop chan bool) { for { select { case <- stop: log.Debug("Stopping thread") return default: callClientTask() } } } func callClientTask() { // This can take long time up to 30 seconds - this is external HTTP API call time.Sleep(5 * time.Second) } func manager() { var cancelChannelSlice []chan bool for i := 0; i < 1000; i++ { cancelChannel := make(chan bool) cancelChannelSlice = append(cancelChannelSlice, cancelChannel) go myThreadFunc(cancelChannel) } var stopTest = func() { for _, c := range cancelChannelSlice { c <- true } } timeout := time.After(time.Duration(300) * time.Second) for { select { case <-timeout: stopTest() default: time.Sleep(time.Second) } } }
In diesem Fall jedes Mal, wenn ich anrufe c <- true
管理器都会等待 callClientTask()
完成,然后转到下一个 cancelChannel
Ich möchte, dass alle Goroutinen in einer Iteration von callClientTask()
(nicht länger als 30 Sekunden)
Die einzige Möglichkeit, die ich versucht habe, bestand darin, die neue Goroutine so zu besetzen:
var stopTest = func() { for _, c := range cancelChannelSlice { go func(c chan bool) { c <- true close(c) }(c) } }
Mache ich das richtig?
Soweit ich Ihre Frage verstehe: „Sie möchten, dass alle Goroutinen innerhalb einer Iteration von callClientTask() (nicht länger als 30 Sekunden) anhalten“ und die Arbeitsthreads gleichzeitig ohne Synchronisierungsfrage ausgeführt werden.
Ich habe den Code so umorganisiert, dass er gleichzeitig mit der Wartegruppe ausgeführt wird.
Beispielcode:
package main import ( "log" "sync" "time" ) func worker(stop <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { select { case <-stop: log.Println("Stopping thread") return default: callClientTask() } } } func callClientTask() { time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes) } func main() { var wg sync.WaitGroup stop := make(chan struct{}) for i := 0; i < 1000; i++ { wg.Add(1) go worker(stop, &wg) } time.Sleep(5 * time.Second) // allow workers to run for a while close(stop) // stop all workers, close channel wg.Wait() // wait for all workers }
Ausgabe:
2023/10/26 10:40:44 Stopping thread 2023/10/26 10:40:44 Stopping thread .... 2023/10/26 10:40:49 Stopping thread 2023/10/26 10:40:49 Stopping thread
Herausgeber:
Wenn Sie einige Arbeiter stoppen möchten, müssen Sie die Arbeiter aktualisieren. Der folgende Code enthält einen Worker mit „Stopp“- und „Stopp“-Kanälen und einer Start/Stopp-Funktion.
Beispielcode:
package main import ( "log" "sync" "time" ) type Worker struct { stop chan struct{} stopped chan struct{} } func NewWorker() *Worker { return &Worker{ stop: make(chan struct{}), stopped: make(chan struct{}), } } func (w *Worker) Start(wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for { select { case <-w.stop: log.Println("Stopping thread") close(w.stopped) return default: callClientTask() } } }() } func (w *Worker) Stop() { close(w.stop) <-w.stopped } func callClientTask() { time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes) } func main() { var wg sync.WaitGroup workers := make([]*Worker, 1000) for i := 0; i < 1000; i++ { workers[i] = NewWorker() workers[i].Start(&wg) } time.Sleep(5 * time.Second) // allow workers to run for a while for i := 0; i < 100; i++ { // stop first 100 workers workers[i].Stop() } for i := 100; i < 1000; i++ { // wait other workers to finish workers[i].Stop() } wg.Wait() }
Ausgabe:
2023/10/26 12:51:26 Stopping thread 2023/10/26 12:51:28 Stopping thread 2023/10/26 12:51:30 Stopping thread ....
Das obige ist der detaillierte Inhalt vonSchließen Sie Goroutinen mithilfe von Kanälen schneller. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!