Inhaltsverzeichnis
HelloWorld
Einführung
Code
Arbeitswarteschlange (Aufgabenwarteschlange)
Nachrichtenbestätigung
Nachrichtenpersistenz
Austausch
Bind-Austausch und Warteschlange
, um die Nachricht, die dem Routing-Schlüssel entspricht, an die Warteschlange zu senden, die an denselben Routing-Schlüssel gebunden ist
Binden Sie den entsprechenden Schweregrad in der Akzeptanzfunktion:
Themenaustausch verwenden
 RPC
Heim Backend-Entwicklung Python-Tutorial RabbitMQ-Schnellstart-Python-Tutorial

RabbitMQ-Schnellstart-Python-Tutorial

Mar 09, 2017 am 09:28 AM
pika python rabbitmq

HelloWorld

Einführung

RabbitMQ: Wenn es Nachrichten annimmt und dann zustellt, kann es als „Postamt“ betrachtet werden. Sender und Empfänger interagieren über Warteschlangen. Die Größe der Warteschlange kann als unbegrenzt angesehen werden. Mehrere Absender können Nachrichten an eine Warteschlange senden, und mehrere Empfänger können auch Nachrichten von einer Warteschlange empfangen.

Code

Das von Rabbitmq verwendete Protokoll ist amqp und der empfohlene Client für Python ist pika

pip install pika -i https://pypi.douban.com/simple/
Nach dem Login kopieren

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
Nach dem Login kopieren

Der Link hier gilt für diesen Computer. Wenn Sie eine Verbindung zu einem Server auf einem anderen Computer herstellen möchten, geben Sie einfach die Adresse oder den Hostnamen ein.

Als nächstes stellen wir sicher, dass die Warteschlange, die die Nachricht akzeptiert, vorhanden ist, andernfalls verwirft RabbitMQ die Nachricht.

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
Nach dem Login kopieren

RabbitMQ benötigt standardmäßig 1 GB freien Speicherplatz Das Senden schlägt fehl.

Zu diesem Zeitpunkt wurde eine Nachricht in der lokalen Warteschlange „Hallo“ gespeichert. Wenn Sie Rabbitmqctl list_queues verwenden, können Sie

hello 1
Nach dem Login kopieren

sehen, dass eine Nachricht in der „Hallo“-Warteschlange gespeichert ist

receive.py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
Nach dem Login kopieren

Stellen Sie immer noch zuerst eine Verbindung zum Server her, genau wie beim vorherigen Senden

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
Nach dem Login kopieren

Arbeitswarteschlange (Aufgabenwarteschlange)

Arbeitswarteschlange wird verwendet, um zeitaufwändige Aufgaben auf mehrere Arbeitsprozesse zu verteilen. Anstatt ressourcenintensive Aufgaben sofort auszuführen (Sie müssen warten, bis diese Aufgaben abgeschlossen sind), planen Sie diese Aufgaben für die spätere Ausführung. Beispielsweise senden wir die Aufgabe als Nachricht an die Warteschlange, starten einen Worker-Prozess, um sie anzunehmen und schließlich auszuführen, und können mehrere Worker-Prozesse starten, um zu arbeiten. Dies gilt für Webanwendungen, bei denen komplexe Aufgaben nicht innerhalb des Verarbeitungsfensters einer http-Anfrage erledigt werden sollen.

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
Nach dem Login kopieren

Die Art und Weise, Nachrichten zu verteilen, ist Polling, das heißt, jeder Arbeitsprozess erhält die gleiche Anzahl von Nachrichten.

Nachrichtenbestätigung

Wenn eine Nachricht einem Arbeitsprozess zugewiesen wird, der Arbeitsprozess jedoch abstürzt, bevor die Verarbeitung abgeschlossen ist, kann die Nachricht verloren gehen, da Rabbitmq einmal eine Nachricht an den Arbeitsprozess verteilt , löscht es die Nachricht.

Um den Verlust von Nachrichten zu verhindern, stellt Rabbitmq eine Bestätigung bereit. Das heißt, nachdem der Arbeitsprozess die Nachricht empfangen und verarbeitet hat, sendet er eine Bestätigung an Rabbitmq, um Rabbitmq darüber zu informieren, dass die Nachricht zu diesem Zeitpunkt aus der Warteschlange gelöscht werden kann Zeit. Wenn der Arbeitsprozess abstürzt und Rabbitmq die Bestätigung nicht erhält, wird die Nachricht an andere Arbeitprozesse weitergegeben. Es ist nicht erforderlich, ein Timeout festzulegen, auch wenn die Aufgabe lange dauert, kann sie bearbeitet werden.

ack ist standardmäßig aktiviert. Zuvor gab unser Arbeitsprozess no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack
Nach dem Login kopieren

Rückruf mit ack:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
Nach dem Login kopieren

Nachrichtenpersistenz

Aber manchmal wird RabbitMQ neu gestartet und Nachrichten gehen verloren. Die Persistenz kann beim Erstellen der Warteschlange festgelegt werden:

(die Art der Warteschlange kann nach ihrer Festlegung nicht mehr geändert werden)

channel.queue_declare(queue='task_queue', durable=True)
Nach dem Login kopieren
Gleichzeitig muss beim Senden auch das Persistenzattribut der Nachricht festgelegt werden die Nachricht:

channel.basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
Nach dem Login kopieren
Wenn RabbitMQ jedoch gerade eine Nachricht empfangen hat und keine Zeit hatte, sie zu speichern, wird die Nachricht trotzdem angezeigt Gleichzeitig empfängt RabbitMQ nicht jede Nachricht. Wenn Sie eine vollständigere Garantie benötigen, müssen Sie die

Faire Nachrichtenverteilung verwenden Die Nachrichtenverteilung im Modus ist möglicherweise nicht fair, z. B. sind alle ungeraden Nachrichten bei schweren Aufgaben ausgeführt, selbst wenn bei einem bestimmten Arbeitsprozess ein Rückstand an Nachrichten vorhanden ist, die beispielsweise nicht verarbeitet wurden. Viele Bestätigungen wurden nicht gesendet, RabbitMQ sendet weiterhin nacheinander Nachrichten an ihn. Fügen Sie Einstellungen hinzu:

Informieren Sie RabbitMQ, damit, wenn ein Arbeitsprozess keine Bestätigung zurücksendet, keine Nachrichten zugewiesen werden

channel.basic_qos(prefetch_count=1)
Nach dem Login kopieren
Gruppenversand

Im Allgemeinen wird eine Nachricht an einen Arbeitsprozess gesendet und dann abgeschlossen. Manchmal möchte ich eine Nachricht an mehrere Prozesse gleichzeitig senden

Austausch

Sendet der Absender die Nachricht direkt an die Warteschlange? Der Absender weiß tatsächlich nicht, an welche Warteschlange die Nachricht gesendet wird Einerseits empfängt die Vermittlungsstelle die Nachrichten des Produzenten und schiebt sie andererseits in die Warteschlange. Als Vermittlungsstelle müssen Sie also wissen, was zu tun ist, wenn eine Nachricht empfangen wird, und ob sie einer Sondernachricht hinzugefügt werden soll Es gibt Direkt-, Themen-, Header- und Fanout-Typen sowie Massenversand, bei denen der Wert „Fanout“ lautet. Der Standardaustausch wurde verwendet. , kann beim Senden oder Empfangen verwendet werden.

Bind-Austausch und Warteschlange

Protokolle werden beim Senden auch an Hallo gesendet Beim Senden wird die Nachricht über den neu erstellten Protokollaustausch
channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
Nach dem Login kopieren

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
Nach dem Login kopieren

weitergeleitet. Bind wurde zuvor verwendet, das heißt, die Beziehung zwischen dem Austausch und der Warteschlange wird hergestellt ist an Nachrichten vom Austausch interessiert. Sie können beim Binden auch die Option „routing_key“ angeben. Verwenden Sie den direkten Austausch

, um die Nachricht, die dem Routing-Schlüssel entspricht, an die Warteschlange zu senden, die an denselben Routing-Schlüssel gebunden ist

channel.queue_bind(exchange='logs',
               queue='hello')
Nach dem Login kopieren
Sendefunktion, Veröffentlichen von Nachrichten mit unterschiedlichem Schweregrad:

Binden Sie den entsprechenden Schweregrad in der Akzeptanzfunktion:

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
Nach dem Login kopieren

Themenaustausch verwenden

Der zuvor verwendete direkte Austausch kann nur zum Binden eines Routing-Schlüssels verwendet werden. Sie können diesen Themenaustausch verwenden, der die Routing-Schlüssel trennt, zum Beispiel:

"stock.usd.nyse" "nyse.vmw"
Nach dem Login kopieren

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
Nach dem Login kopieren

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
Nach dem Login kopieren

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
Nach dem Login kopieren

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
Nach dem Login kopieren

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
Nach dem Login kopieren

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
Nach dem Login kopieren

                                               

Das obige ist der detaillierte Inhalt vonRabbitMQ-Schnellstart-Python-Tutorial. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Heiße KI -Werkzeuge

Undresser.AI Undress

Undresser.AI Undress

KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover

AI Clothes Remover

Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool

Undress AI Tool

Ausziehbilder kostenlos

Clothoff.io

Clothoff.io

KI-Kleiderentferner

AI Hentai Generator

AI Hentai Generator

Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

R.E.P.O. Energiekristalle erklärten und was sie tun (gelber Kristall)
4 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Beste grafische Einstellungen
4 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. So reparieren Sie Audio, wenn Sie niemanden hören können
4 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Chat -Befehle und wie man sie benutzt
4 Wochen vor By 尊渡假赌尊渡假赌尊渡假赌

Heiße Werkzeuge

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Python: Spiele, GUIs und mehr Python: Spiele, GUIs und mehr Apr 13, 2025 am 12:14 AM

Python zeichnet sich in Gaming und GUI -Entwicklung aus. 1) Spielentwicklung verwendet Pygame, die Zeichnungen, Audio- und andere Funktionen bereitstellt, die für die Erstellung von 2D -Spielen geeignet sind. 2) Die GUI -Entwicklung kann Tkinter oder Pyqt auswählen. Tkinter ist einfach und einfach zu bedienen. PYQT hat reichhaltige Funktionen und ist für die berufliche Entwicklung geeignet.

PHP und Python: Vergleich von zwei beliebten Programmiersprachen PHP und Python: Vergleich von zwei beliebten Programmiersprachen Apr 14, 2025 am 12:13 AM

PHP und Python haben jeweils ihre eigenen Vorteile und wählen nach den Projektanforderungen. 1.PHP ist für die Webentwicklung geeignet, insbesondere für die schnelle Entwicklung und Wartung von Websites. 2. Python eignet sich für Datenwissenschaft, maschinelles Lernen und künstliche Intelligenz mit prägnanter Syntax und für Anfänger.

Wie Debian Readdir sich in andere Tools integriert Wie Debian Readdir sich in andere Tools integriert Apr 13, 2025 am 09:42 AM

Die Readdir -Funktion im Debian -System ist ein Systemaufruf, der zum Lesen des Verzeichnisgehalts verwendet wird und häufig in der C -Programmierung verwendet wird. In diesem Artikel wird erläutert, wie Readdir in andere Tools integriert wird, um seine Funktionalität zu verbessern. Methode 1: Kombinieren Sie C -Sprachprogramm und Pipeline zuerst ein C -Programm, um die Funktion der Readdir aufzurufen und das Ergebnis auszugeben:#include#include#includeIntmain (intargc, char*argv []) {Dir*Dir; structDirent*Eintrag; if (argc! = 2) {{

Python und Zeit: Machen Sie das Beste aus Ihrer Studienzeit Python und Zeit: Machen Sie das Beste aus Ihrer Studienzeit Apr 14, 2025 am 12:02 AM

Um die Effizienz des Lernens von Python in einer begrenzten Zeit zu maximieren, können Sie Pythons DateTime-, Zeit- und Zeitplanmodule verwenden. 1. Das DateTime -Modul wird verwendet, um die Lernzeit aufzuzeichnen und zu planen. 2. Das Zeitmodul hilft, die Studie zu setzen und Zeit zu ruhen. 3. Das Zeitplanmodul arrangiert automatisch wöchentliche Lernaufgaben.

Nginx SSL -Zertifikat -Aktualisierung Debian Tutorial Nginx SSL -Zertifikat -Aktualisierung Debian Tutorial Apr 13, 2025 am 07:21 AM

In diesem Artikel werden Sie begleitet, wie Sie Ihr NginXSSL -Zertifikat auf Ihrem Debian -System aktualisieren. Schritt 1: Installieren Sie zuerst CertBot und stellen Sie sicher, dass Ihr System Certbot- und Python3-CertBot-Nginx-Pakete installiert hat. If not installed, please execute the following command: sudoapt-getupdatesudoapt-getinstallcertbotpython3-certbot-nginx Step 2: Obtain and configure the certificate Use the certbot command to obtain the Let'sEncrypt certificate and configure Nginx: sudocertbot--nginx Follow the prompts to select

Gitlabs Plug-in-Entwicklungshandbuch zu Debian Gitlabs Plug-in-Entwicklungshandbuch zu Debian Apr 13, 2025 am 08:24 AM

Die Entwicklung eines Gitlab -Plugins für Debian erfordert einige spezifische Schritte und Kenntnisse. Hier ist ein grundlegender Leitfaden, mit dem Sie mit diesem Prozess beginnen können. Wenn Sie zuerst GitLab installieren, müssen Sie GitLab in Ihrem Debian -System installieren. Sie können sich auf das offizielle Installationshandbuch von GitLab beziehen. Holen Sie sich API Access Token, bevor Sie die API -Integration durchführen. Öffnen Sie das GitLab -Dashboard, finden Sie die Option "AccessTokens" in den Benutzereinstellungen und generieren Sie ein neues Zugriffs -Token. Wird generiert

So konfigurieren Sie den HTTPS -Server in Debian OpenSSL So konfigurieren Sie den HTTPS -Server in Debian OpenSSL Apr 13, 2025 am 11:03 AM

Das Konfigurieren eines HTTPS -Servers auf einem Debian -System umfasst mehrere Schritte, einschließlich der Installation der erforderlichen Software, der Generierung eines SSL -Zertifikats und der Konfiguration eines Webservers (z. B. Apache oder NGINX) für die Verwendung eines SSL -Zertifikats. Hier ist eine grundlegende Anleitung unter der Annahme, dass Sie einen Apacheweb -Server verwenden. 1. Installieren Sie zuerst die erforderliche Software, stellen Sie sicher, dass Ihr System auf dem neuesten Stand ist, und installieren Sie Apache und OpenSSL: sudoaptupdatesudoaptupgradesudoaptinsta

Welcher Dienst ist Apache Welcher Dienst ist Apache Apr 13, 2025 pm 12:06 PM

Apache ist der Held hinter dem Internet. Es ist nicht nur ein Webserver, sondern auch eine leistungsstarke Plattform, die enormen Datenverkehr unterstützt und dynamische Inhalte bietet. Es bietet eine extrem hohe Flexibilität durch ein modulares Design und ermöglicht die Ausdehnung verschiedener Funktionen nach Bedarf. Modularität stellt jedoch auch Konfigurations- und Leistungsherausforderungen vor, die ein sorgfältiges Management erfordern. Apache eignet sich für Serverszenarien, die hoch anpassbare und entsprechende komplexe Anforderungen erfordern.

See all articles