Bereitstellung der Python+Pika+RabbitMQ-Umgebung und Implementierung der Arbeitswarteschlange

高洛峰
Freigeben: 2017-03-01 14:04:50
Original
1501 Leute haben es durchsucht

Die chinesische Übersetzung von Rabbitmq bezieht sich hauptsächlich auf die Buchstaben mq: Message Queue, was Nachrichtenwarteschlange bedeutet. Davor steht auch das Wort „Kaninchen“, was bedeutet, dass die Python-Sprache ziemlich humorvoll ist. Der Rabbitmq-Dienst ähnelt den MySQL- und Apache-Diensten, die bereitgestellten Funktionen sind jedoch unterschiedlich. Mit rabbimq wird ein Dienst zum Versenden von Nachrichten bereitgestellt, der zur Kommunikation zwischen verschiedenen Anwendungen genutzt werden kann.

Rabbitmq installieren
Unter Ubuntu 12.04 können Sie es direkt über apt-get installieren:

sudo apt-get install rabbitmq-server
Nach dem Login kopieren

Nach der Installation wurde der Rabbitmq-Dienst gestartet. Schauen wir uns als Nächstes ein Beispiel für das Schreiben von „Hello World!“ in Python an. Der Inhalt des Beispiels besteht darin, „Hallo Welt!“ von send.py an Rabbitmq zu senden, und Empfang.py empfängt die von send.py gesendeten Informationen von Rabbitmq.

Bereitstellung der Python+Pika+RabbitMQ-Umgebung und Implementierung der Arbeitswarteschlange

P steht für Produce, was Produzent bedeutet, und kann auch als Sender bezeichnet werden. Im Beispiel wird es als send.py dargestellt. C steht für Consumer Das heißt, es kann auch als Empfänger bezeichnet werden, was im Beispiel durch „receive.py“ dargestellt wird. Das rote Symbol in der Mitte stellt die Bedeutung der Warteschlange dar, die im Beispiel durch „Hallo Warteschlange“ dargestellt wird.

Python nutzt den Rabbitmq-Dienst. Sie können die vorgefertigten Klassenbibliotheken pika, txAMQP oder py-amqplib verwenden.

Pika installieren

Um Pika zu installieren, können Sie pip verwenden, ein Python-Softwareverwaltungspaket. Wenn es nicht installiert ist, können Sie es über apt- installieren. get

sudo apt-get install python-pip
Nach dem Login kopieren

Pika über Pip installieren:

sudo pip install pika
Nach dem Login kopieren

send.py Der Code

stellt eine Verbindung zum Rabbitmq-Server her. Da er lokal getestet wird, verwenden Sie einfach localhost.

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
Nach dem Login kopieren

Deklariert eine Nachrichtenwarteschlange, in der Nachrichten zugestellt werden. Wenn Nachrichten an eine Warteschlange gesendet werden, die nicht existiert, löscht Rabbitmq diese Nachrichten automatisch.

channel.queue_declare(queue='hello')
Nach dem Login kopieren

Senden Sie die Nachricht an die oben deklarierte Hallo-Warteschlange, wobei Exchange den Austauscher darstellt, der genau angeben kann, an welche Warteschlange die Nachricht gesendet werden soll. und Routing_key ist auf die Warteschlange eingestellt. Der Name und der Text sind der zu sendende Inhalt. Auf die spezifischen Sendedetails werden wir vorerst nicht achten.

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Nach dem Login kopieren

Verbindung schließen

connection.close()
Nach dem Login kopieren

Vollständiger Code

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
Nach dem Login kopieren

Führen Sie zuerst dieses Programm aus. Wenn die Ausführung erfolgreich ist, sollte Rabbitmqctl die Hallo-Warteschlange erfolgreich hinzufügen und es sollte eine Nachricht in der Warteschlange vorhanden sein um es zu überprüfen.

rabbitmqctl list_queues
Nach dem Login kopieren

Geben Sie die folgenden Informationen auf dem Computer des Autors aus:

Bereitstellung der Python+Pika+RabbitMQ-Umgebung und Implementierung der Arbeitswarteschlange


Es gibt tatsächlich eine Hallo-Warteschlange und eine Nachricht in der Warteschlange. Verwenden Sie als Nächstes „receive.py“, um die Informationen in der Warteschlange abzurufen.

receive.py-Code

ist derselbe wie die beiden vorherigen Schritte von send.py, die beide zuerst eine Verbindung zum Server und dann die Deklaration der Nachrichtenwarteschlange erfordern , was hier nicht besprochen wird. Derselbe Code wurde gepostet.

Der Empfang von Nachrichten ist komplizierter und erfordert die Definition einer Rückruffunktion, um sie zu verarbeiten. Die Rückruffunktion dient hier zum Ausdrucken der Informationen.

def callback(ch, method, properties, body):
  print "Received %r" % (body,)
Nach dem Login kopieren

Sagen Sie Rabbitmq, dass es Callback zum Empfangen von Nachrichten verwenden soll

channel.basic_consume(callback, queue='hello', no_ack=True)
Nach dem Login kopieren

Starten Sie den Empfang von Informationen und geben Sie den Blockierungsstatus ein. Erst wenn sich Informationen in der Warteschlange befinden, wird der Rückruf zur Verarbeitung aufgerufen. Drücken Sie Strg+C, um den Vorgang zu beenden.

channel.start_consuming()
Nach dem Login kopieren

Komplettcode

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
Nach dem Login kopieren

Führen Sie das Programm aus und Sie werden es tun in der Lage sein, die Nachricht „Hallo Welt!“ an die „Hallo“-Warteschlange zu senden und dann auf dem Bildschirm auszugeben. Wechseln Sie ein Terminal und führen Sie send.py erneut aus. Sie können sehen, dass „receiver.py“ erneut Informationen empfängt.

Beispiel für eine Arbeitswarteschlange

1. Vorbereitung

Verwenden Sie im Beispielprogramm new_task .py zur Simulation den Task-Allokator und worker.py zur Simulation des Workers.

Ändern Sie send.py, um Informationen von Befehlszeilenparametern zu empfangen und zu senden

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)
Nach dem Login kopieren

Ändern Sie die Rückruffunktion von require.py.

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
Nach dem Login kopieren

Öffnen Sie hier zunächst zwei Terminals, beide führen worker.py aus und befinden sich im Überwachungsstatus. Dies entspricht zwei Workern. Öffnen Sie das dritte Terminal, führen Sie new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
Nach dem Login kopieren

aus und beobachten Sie, dass worker.py Aufgaben erhält und ein Worker 3 Aufgaben erhält:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
Nach dem Login kopieren

Ein anderer Arbeiter hat 2 Aufgaben erhalten:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
Nach dem Login kopieren

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
Nach dem Login kopieren

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
Nach dem Login kopieren

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)
Nach dem Login kopieren

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)
Nach dem Login kopieren

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
Nach dem Login kopieren

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)
Nach dem Login kopieren

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()
Nach dem Login kopieren


更多Bereitstellung der Python+Pika+RabbitMQ-Umgebung und Implementierung der Arbeitswarteschlange相关文章请关注PHP中文网!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!