PHP-Editor Zimo Im Softwareentwicklungsprozess ist die Nachrichtenwarteschlange ein gängiger Kommunikationsmechanismus, der verwendet wird, um eine asynchrone Kommunikation zwischen Produzenten und Verbrauchern zu erreichen. Manchmal möchten wir jedoch das Lesen von Nachrichten durch Produzenten und Verbraucher kontrollieren, um die Systemressourcen besser zu verwalten und Anfragen während der Spitzenzeiten zu bearbeiten. In diesem Artikel werden einige Methoden vorgestellt, mit denen Produzenten und Verbraucher daran gehindert werden, Nachrichten zu lesen, um Entwicklern dabei zu helfen, die Systemleistung zu optimieren und die Anwendungsstabilität zu verbessern.
Ich möchte mithilfe von go eine Producer-Consumer-Anwendung (über ein Signal geschlossen) erhalten.
Der Produzent generiert kontinuierlich Nachrichten in der Warteschlange, mit einem Limit von 10. Einige Verbraucher lesen und verarbeiten den Kanal. Wenn die Anzahl der Nachrichten in der Warteschlange 0 beträgt, generiert der Produzent erneut 10 Nachrichten. Wenn ein Stoppsignal empfangen wird, stoppt der Produzent die Generierung neuer Nachrichten und der Verbraucher verarbeitet alles im Kanal.
Ich habe einen Code gefunden, kann aber nicht verstehen, ob er richtig funktioniert, weil ich etwas Seltsames gefunden habe:
Ergebnis:
Codebeispiel:
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 in := make(chan int, 10) p := Producer{&in} c := Consumer{&in, make(chan int, nConsumers)} go p.Produce() ctx, cancelFunc := context.WithCancel(context.Background()) go c.Consume(ctx) wg := &sync.WaitGroup{} wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(wg, i) } termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan cancelFunc() wg.Wait() } type Consumer struct { in *chan int jobs chan int } func (c Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.jobs { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } func (c Consumer) Consume(ctx context.Context) { for { select { case job := <-*c.in: c.jobs <- job case <-ctx.Done(): close(c.jobs) fmt.Println("Consumer close channel") return } } } type Producer struct { in *chan int } func (p Producer) Produce() { task := 1 for { *p.in <- task fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) } }
Warum nach dem Stoppen des Programms nicht alle Nachrichten in der Warteschlange verarbeitet werden und einige Daten verloren zu gehen scheinen.
Das liegt daran, dass wenn ctx
完成后,(consumer).consume
停止从 in
通道读取,但 go p.produce()
创建的 goroutine 仍然写入 in
ch.
Die folgende Demo löst dieses Problem und vereinfacht den Quellcode.
Notizen:
produce
在 ctx
完成后停止。并且它关闭了 in
Kanal.
Feld jobs
已从 consumer
中删除,工作人员直接从 in
Kanal gelesen.
Die folgende Anfrage wird ignoriert, weil sie seltsam ist. Ein übliches Verhalten besteht darin, dass bei der Generierung eines Jobs der in
通道未满,则作业会立即发送到 in
通道;当它已满时,发送操作将阻塞,直到从 in
-Kanal den Job bis dahin liest.
Wenn die Anzahl der Nachrichten in der Warteschlange 0 beträgt, generiert der Produzent erneut 10 Nachrichten
package main import ( "context" "fmt" "math/rand" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() in := make(chan int, 10) p := Producer{in} c := Consumer{in} go p.Produce(ctx) var wg sync.WaitGroup wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(&wg, i) } <-ctx.Done() fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in)) wg.Wait() } type Consumer struct { in chan int } func (c *Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.in { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } type Producer struct { in chan int } func (p *Producer) Produce(ctx context.Context) { task := 1 for { select { case p.in <- task: fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) case <-ctx.Done(): close(p.in) return } } }
Das obige ist der detaillierte Inhalt vonWie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!