Éditeur PHP Zimo Dans le processus de développement logiciel, la file d'attente de messages est un mécanisme de communication courant utilisé pour réaliser une communication asynchrone entre les producteurs et les consommateurs. Cependant, nous souhaitons parfois contrôler la lecture des messages par les producteurs et les consommateurs afin de mieux gérer les ressources du système et traiter les demandes pendant les heures de pointe. Cet article présentera certaines méthodes pour empêcher les producteurs et les consommateurs de lire les messages afin d'aider les développeurs à optimiser les performances du système et à améliorer la stabilité des applications.
Je souhaite obtenir une application producteur-consommateur (fermée via un signal) en utilisant go.
Le producteur génère en continu des messages dans la file d'attente, avec une limite de 10. Certains consommateurs lisent et traitent la chaîne. Si le nombre de messages dans la file d'attente est 0, le producteur génère à nouveau 10 messages. Lorsqu'un signal d'arrêt est reçu, le producteur arrête de générer de nouveaux messages et le consommateur traite tout ce qui se passe dans le canal.
J'ai trouvé un morceau de code mais je n'arrive pas à comprendre s'il fonctionne correctement car j'ai trouvé quelque chose de bizarre :
Résultat :
Exemple de code :
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 in := make(chan int, 10) p := Producer{&in} c := Consumer{&in, make(chan int, nConsumers)} go p.Produce() ctx, cancelFunc := context.WithCancel(context.Background()) go c.Consume(ctx) wg := &sync.WaitGroup{} wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(wg, i) } termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan cancelFunc() wg.Wait() } type Consumer struct { in *chan int jobs chan int } func (c Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.jobs { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } func (c Consumer) Consume(ctx context.Context) { for { select { case job := <-*c.in: c.jobs <- job case <-ctx.Done(): close(c.jobs) fmt.Println("Consumer close channel") return } } } type Producer struct { in *chan int } func (p Producer) Produce() { task := 1 for { *p.in <- task fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) } }
Pourquoi après l'arrêt du programme, tous les messages de la file d'attente ne sont pas traités et certaines données semblent être perdues.
C'est parce que quand ctx
完成后,(consumer).consume
停止从 in
通道读取,但 go p.produce()
创建的 goroutine 仍然写入 in
ch.
La démo ci-dessous résout ce problème et simplifie le code source.
Remarques :
produce
在 ctx
完成后停止。并且它关闭了 in
Chaîne.
Champ jobs
已从 consumer
中删除,工作人员直接从 in
Lecture de la chaîne.
La demande suivante est ignorée car elle est bizarre. Le comportement courant est celui lorsqu'une tâche est générée, si le canal in
通道未满,则作业会立即发送到 in
通道;当它已满时,发送操作将阻塞,直到从 in
lit la tâche d'ici là.
Si le nombre de messages dans la file d'attente est 0, le producteur génère à nouveau 10 messages
package main import ( "context" "fmt" "math/rand" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() in := make(chan int, 10) p := Producer{in} c := Consumer{in} go p.Produce(ctx) var wg sync.WaitGroup wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(&wg, i) } <-ctx.Done() fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in)) wg.Wait() } type Consumer struct { in chan int } func (c *Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.in { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } type Producer struct { in chan int } func (p *Producer) Produce(ctx context.Context) { task := 1 for { select { case p.in <- task: fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) case <-ctx.Done(): close(p.in) return } } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!