RabbitMQ ist ein auf MQ basierender Server, der über die Pika-Bibliothek zur Programmsteuerung verwendet werden kann. Hier erklären wir ausführlich die Remote-Ergebnisrückgabe der RabbitMQ-Server-Nachrichtenwarteschlange:
Lassen Sie uns Sprechen Sie zuerst über den Test des Autors. Umgebung: Ubuntu14.04 + Python 2.7.4
RabbitMQ-Server
sudo apt-get install rabbitmq-server
Python benötigt die Pika-Bibliothek, um RabbitMQ zu verwenden
sudo pip install pika
Remote-Ergebnisrückgabe
Es wird kein Ergebnis zurückgegeben, nachdem das Ende der Nachricht die Nachricht gesendet hat. Wenn Sie nur eine Nachricht senden, stellt dies natürlich kein Problem dar, aber in der Praxis muss der Empfänger häufig die empfangene Nachricht verarbeiten und an den Sender zurücksenden.
Beschreibung der Verarbeitungsmethode: Vor dem Senden von Informationen generiert der Sender eine temporäre Warteschlange zum Empfangen von Nachrichten. Diese Warteschlange wird zum Empfang der zurückgegebenen Ergebnisse verwendet. Tatsächlich sind die Konzepte des empfangenden Endes und des sendenden Endes hier relativ verschwommen, da das sendende Ende auch Nachrichten empfangen muss und das empfangende Ende auch Nachrichten senden muss. Daher verwende ich hier ein weiteres Beispiel, um diesen Prozess zu demonstrieren.
Beispielinhalt: Angenommen, es gibt ein Kontrollzentrum und einen Rechenknoten. Das Kontrollzentrum sendet eine natürliche Zahl N an den Rechenknoten. Der Rechenknoten addiert 1 zum N-Wert und gibt ihn an das Kontrollzentrum zurück . Hier wird „center.py“ zur Simulation des Kontrollzentrums und „compute.py“ zur Simulation der Rechenknoten verwendet.
compute.py-Codeanalyse
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, properties, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心 ch.basic_publish(exchange='', routing_key=properties.reply_to, body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
Der Code des Berechnungsknotens ist relativ einfach Methode ist Die Nachricht wird direkt gedruckt, hier wird die Berechnung von plus eins durchgeführt und das Ergebnis an das Kontrollzentrum zurückgesendet.
center.py-Codeanalyse
#!/usr/bin/env python #coding=utf8 import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求,并声明返回队列 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() print " [x] Requesting increase(30)" response = center.request(30) print " [.] Got %r" % (response,)
Der obige Beispielcode definiert die Warteschlange und die Verarbeitungsmethode zum Empfangen und Senden der zurückgegebenen Daten Bei der Zuweisung der Warteschlange zu „reply_to“ wird dieser Parameter verwendet, um die Rückgabewarteschlange im Berechnungsknotencode abzurufen.
Öffnen Sie zwei Terminals, eines führt den Code python compute.py aus und das andere Terminal führt center.py aus. Wenn die Ausführung erfolgreich ist, sollten Sie den Effekt sehen können.
Während des Tests sind beim Autor einige kleinere Probleme aufgetreten, d Beim Berechnen der Ergebnisse wird angezeigt, dass „routing_key“ nicht gefunden wurde. Bei der erneuten Ausführung wird ein Fehler gemeldet. Verwenden Sie Rabbitmqctl list_queues, um die Warteschlange zu überprüfen und festzustellen, dass sich 1 Datenelement in der Warteschlange „compute_queue“ befindet. Diese Daten werden bei jeder erneuten Ausführung von „compute.py“ erneut verarbeitet. Später habe ich /etc/init.d/rabbitmq-server restart verwendet, um Rabbitmq neu zu starten, und alles war in Ordnung.
Korrelations-ID
Das vorherige Beispiel zeigte ein Beispiel für die Rückgabe eines Remote-Ergebnisses, aber eines wurde nicht erwähnt, nämlich die Korrelations-ID. Was ist das?
Angenommen, es gibt mehrere Rechenknoten und das Kontrollzentrum startet mehrere Threads, sendet Zahlen an diese Rechenknoten, fordert Berechnungsergebnisse an und gibt sie zurück, aber das Kontrollzentrum öffnet nur eine Warteschlange und alle Threads beginnen von dieser Wie stellt jeder Thread fest, dass die empfangene Nachricht diesem Thread entspricht, um eine Nachricht zu erhalten? Dies ist die Verwendung der Korrelations-ID. Korrelation wird im Chinesischen als gegenseitige Korrelation übersetzt, was auch diese Bedeutung zum Ausdruck bringt.
Funktionsprinzip der Korrelations-ID: Das Kontrollzentrum legt die Korrelations-ID fest, wenn es eine Berechnungsanforderung sendet, und dann gibt der Berechnungsknoten das Berechnungsergebnis zusammen mit der empfangenen Korrelations-ID zurück, sodass das Kontrollzentrum die Anforderung identifizieren kann die Korrelations-ID. Tatsächlich kann die Korrelations-ID auch als eindeutiger Identifikationscode der Anfrage verstanden werden.
Beispielinhalt: Das Kontrollzentrum startet mehrere Threads und jeder Thread initiiert eine Berechnungsanforderung. Über die Korrelations-ID kann jeder Thread die entsprechenden Berechnungsergebnisse genau empfangen.
compute.py-Codeanalyse
Im Vergleich zum vorherigen Artikel muss nur eine Stelle geändert werden: Wenn Sie die Berechnungsergebnisse an das Kontrollzentrum zurücksenden, fügen Sie die Parameterkorrelation_id-Einstellung und den Wert hinzu Dieser Parameter wurde tatsächlich vom Kontrollzentrum gesendet und wird einfach wieder zurückgesendet. Der Code lautet wie folgt:
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, props, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心,增加correlation_id的设定 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
center.py-Codeanalyse
Der Kontrollzentrumscode ist etwas komplizierter, und es gibt sie Drei wichtige Punkte:
Verwenden Sie die UUID von Python, um eine eindeutige Korrelations-ID zu generieren.
Legen Sie beim Senden einer Berechnungsanforderung den Parameter „korrelation_id“ fest.
Definieren Sie ein Wörterbuch zum Speichern der zurückgegebenen Daten. Der Schlüsselwert ist die vom entsprechenden Thread generierte Korrelations-ID.
Der Code lautet wie folgt:
#!/usr/bin/env python #coding=utf8 import pika, threading, uuid #自定义线程类,继承threading.Thread class MyThread(threading.Thread): def __init__(self, func, num): super(MyThread, self).__init__() self.func = func self.num = num def run(self): print " [x] Requesting increase(%d)" % self.num response = self.func(self.num) print " [.] increase(%d)=%d" % (self.num, response) #控制中心类 class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #返回的结果都会存储在该字典里 self.response = {} #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response[props.correlation_id] = body def request(self, n): corr_id = str(uuid.uuid4()) self.response[corr_id] = None #发送计算请求,并设定返回队列和correlation_id self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = corr_id, ), body=str(n)) #接收返回的数据 while self.response[corr_id] is None: self.connection.process_data_events() return int(self.response[corr_id]) center = Center() #发起5次计算请求 nums= [10, 20, 30, 40 ,50] threads = [] for num in nums: threads.append(MyThread(center.request, num)) for thread in threads: thread.start() for thread in threads: thread.join()
Der Autor öffnete zwei Terminals, um Compute.py auszuführen, öffnete ein Terminal, um Center.py auszuführen, und schließlich Der Screenshot der Ergebnisausgabe lautet wie folgt:
Sie können sehen, dass die erhaltenen Ergebnisse zwar nicht nacheinander ausgegeben werden, die Ergebnisse jedoch den Quelldaten entsprechen.
Das Beispiel hier besteht darin, eine Warteschlange zu erstellen und die Korrelations-ID zu verwenden, um jede Anfrage zu identifizieren. Es gibt auch eine Möglichkeit, die Korrelations-ID nicht zu verwenden, indem bei jeder Anforderung eine temporäre Warteschlange erstellt wird. Dies verbraucht jedoch zu viel Leistung und wird offiziell nicht empfohlen.
Weitere Artikel zum Betrieb der RabbitMQ-Server-Nachrichtenwarteschlange durch Python und zur Rückgabe von Remote-Ergebnissen finden Sie auf der chinesischen PHP-Website!