Wie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?

WBOY
Freigeben: 2024-02-11 18:00:11
nach vorne
905 Leute haben es durchsucht

Wie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?

PHP-Editor Zimo Im Softwareentwicklungsprozess ist die Nachrichtenwarteschlange ein gängiger Kommunikationsmechanismus, der verwendet wird, um eine asynchrone Kommunikation zwischen Produzenten und Verbrauchern zu erreichen. Manchmal möchten wir jedoch das Lesen von Nachrichten durch Produzenten und Verbraucher kontrollieren, um die Systemressourcen besser zu verwalten und Anfragen während der Spitzenzeiten zu bearbeiten. In diesem Artikel werden einige Methoden vorgestellt, mit denen Produzenten und Verbraucher daran gehindert werden, Nachrichten zu lesen, um Entwicklern dabei zu helfen, die Systemleistung zu optimieren und die Anwendungsstabilität zu verbessern.

Frageninhalt

Ich möchte mithilfe von go eine Producer-Consumer-Anwendung (über ein Signal geschlossen) erhalten.

Der Produzent generiert kontinuierlich Nachrichten in der Warteschlange, mit einem Limit von 10. Einige Verbraucher lesen und verarbeiten den Kanal. Wenn die Anzahl der Nachrichten in der Warteschlange 0 beträgt, generiert der Produzent erneut 10 Nachrichten. Wenn ein Stoppsignal empfangen wird, stoppt der Produzent die Generierung neuer Nachrichten und der Verbraucher verarbeitet alles im Kanal.

Ich habe einen Code gefunden, kann aber nicht verstehen, ob er richtig funktioniert, weil ich etwas Seltsames gefunden habe:

  1. Warum nach dem Stoppen des Programms nicht alle Nachrichten in der Warteschlange verarbeitet werden und einige Daten verloren zu gehen scheinen. (Im Screenshot wurden 15 Nachrichten gesendet, aber 5 wurden verarbeitet)
  2. Wie kann man die Warteschlange richtig auf 10 Nachrichten begrenzen, d. h. 10 Nachrichten schreiben, auf die Verarbeitung warten, wenn der Warteschlangenzähler 0 erreicht, und dann 10 weitere schreiben?
  3. Ist es möglich, den Produzenten nach einem Stoppsignal zu benachrichtigen, damit er keine neuen Nachrichten mehr an den Kanal generiert? (Im Screenshot schreibt der Produzent erfolgreich in die Warteschlange – 12,13,14,15)

Ergebnis:

Codebeispiel:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    in := make(chan int, 10)
    p := Producer{&in}
    c := Consumer{&in, make(chan int, nConsumers)}
    go p.Produce()
    ctx, cancelFunc := context.WithCancel(context.Background())
    go c.Consume(ctx)
    wg := &sync.WaitGroup{}
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(wg, i)
    }
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    <-termChan

    cancelFunc()
    wg.Wait()
}

type Consumer struct {
    in   *chan int
    jobs chan int
}

func (c Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.jobs {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

func (c Consumer) Consume(ctx context.Context) {
    for {
        select {
        case job := <-*c.in:
            c.jobs <- job
        case <-ctx.Done():
            close(c.jobs)
            fmt.Println("Consumer close channel")
            return
        }
    }
}

type Producer struct {
    in *chan int
}

func (p Producer) Produce() {
    task := 1
    for {
        *p.in <- task
        fmt.Printf("Send value %d\n", task)
        task++
        time.Sleep(time.Millisecond * 500)
    }
}
Nach dem Login kopieren

Lösung

Warum nach dem Stoppen des Programms nicht alle Nachrichten in der Warteschlange verarbeitet werden und einige Daten verloren zu gehen scheinen.

Das liegt daran, dass wenn ctx 完成后,(consumer).consume 停止从 in 通道读取,但 go p.produce() 创建的 goroutine 仍然写入 inch.

Die folgende Demo löst dieses Problem und vereinfacht den Quellcode.

Notizen:

  1. producectx 完成后停止。并且它关闭了 in Kanal.

  2. Feld jobs 已从 consumer 中删除,工作人员直接从 in Kanal gelesen.

  3. Die folgende Anfrage wird ignoriert, weil sie seltsam ist. Ein übliches Verhalten besteht darin, dass bei der Generierung eines Jobs der in 通道未满,则作业会立即发送到 in 通道;当它已满时,发送操作将阻塞,直到从 in-Kanal den Job bis dahin liest.

    Wenn die Anzahl der Nachrichten in der Warteschlange 0 beträgt, generiert der Produzent erneut 10 Nachrichten

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func main() {
    const nConsumers = 2

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    in := make(chan int, 10)
    p := Producer{in}
    c := Consumer{in}
    go p.Produce(ctx)

    var wg sync.WaitGroup
    wg.Add(nConsumers)
    for i := 1; i <= nConsumers; i++ {
        go c.Work(&wg, i)
    }

    <-ctx.Done()
    fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in))
    wg.Wait()
}

type Consumer struct {
    in chan int
}

func (c *Consumer) Work(wg *sync.WaitGroup, i int) {
    defer wg.Done()
    for job := range c.in {
        fmt.Printf("Worker #%d start job %d\n", i, job)
        time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000)))
        fmt.Printf("Worker #%d finish job %d\n", i, job)
    }
    fmt.Printf("Worker #%d interrupted\n", i)
}

type Producer struct {
    in chan int
}

func (p *Producer) Produce(ctx context.Context) {
    task := 1
    for {
        select {
        case p.in <- task:
            fmt.Printf("Send value %d\n", task)
            task++
            time.Sleep(time.Millisecond * 500)
        case <-ctx.Done():
            close(p.in)
            return
        }
    }
}
Nach dem Login kopieren

Das obige ist der detaillierte Inhalt vonWie kann man Produzenten und Konsumenten daran hindern, Nachrichten zu lesen?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:stackoverflow.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage