L'article précédent présentait l'installation de RabbitMQ et le classique Hello World ! Exemple. Ici, nous aurons une compréhension des files d’attente de travail. Parce qu'il s'agit d'une suite de l'article précédent, si vous n'avez pas lu l'article précédent, cet article peut être difficile à comprendre. L'adresse de l'article précédent est : Comment installer RabbitMQ et Python sur Ubuntu
Les messages peuvent également être compris comme des tâches, et l'expéditeur du message peut être compris comme l'allocateur de tâches et le destinataire du message.Il peut être compris comme un travailleur.Lorsque le travailleur reçoit une tâche et ne l'a pas terminée, l'allocateur de tâches envoie une autre tâche, et elle est trop occupée, donc plusieurs travailleurs sont nécessaires pour gérer ces tâches ensemble. sont appelées files d’attente de travail. Le diagramme de structure est le suivant :
file d'attente de travail de l'instance Python de Rabbitmq
Préparation
Dans l'exemple de programme, utilisez new_task.py pour simuler l'allocateur de tâches et worker.py pour simuler le travailleur.
Modifiez send.py pour recevoir les informations des paramètres de ligne de commande et envoyez
import sys message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
Modifiez la fonction de rappel de recevoir.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
Ouvrez deux terminaux ici, tous deux exécutent Worker.py et sont en état d'écoute. Cela équivaut à deux Workers. Ouvrez le troisième terminal, exécutez new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
et observez que worker.py reçoit des tâches :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
Un autre travailleur a reçu 2 tâches : <🎜. >
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag)
Faites une pause ici pendant 5 secondes pour faciliter la sortie de ctrl c.
Vous pouvez également supprimer le paramètre no_ack=True ou le définir sur False.
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent ))
channel.basic_qos(prefetch_count=1)
#!/usr/bin/env python import pika import sys connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent )) print " [x] Sent %r" % (message,) connection.close()
#!/usr/bin/env python import pika import time connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()