RabbitMQ ist ein vollständiges und wiederverwendbares Unternehmens-Messaging-System, das auf AMQP basiert. Es folgt der Open-Source-Vereinbarung Mozilla Public License. Der folgende Artikel stellt Ihnen hauptsächlich das Tutorial zur Verwendung von Python zum Betrieb der Nachrichtenwarteschlange RabbitMQ vor. Freunde, die es benötigen, können darauf zurückgreifen.
Vorwort
RabbitMQ ist ein vollständiges, wiederverwendbares Unternehmens-Messaging-System, das auf AMQP basiert. Es folgt der Open-Source-Vereinbarung Mozilla Public License.
MQ steht für Message Queue. Message Queuing (MQ) ist eine Kommunikationsmethode von Anwendung zu Anwendung. Anwendungen kommunizieren durch Lesen und Schreiben von Nachrichten (anwendungsspezifische Daten) in und aus Warteschlangen, ohne dass eine dedizierte Verbindung zu deren Verknüpfung erforderlich ist. Messaging bezieht sich auf Programme, die miteinander kommunizieren, indem sie Daten in Nachrichten senden, anstatt sich gegenseitig direkt aufzurufen, was normalerweise für Techniken wie Remote-Prozeduraufrufe verwendet wird. Unter Warteschlangen versteht man die Kommunikation von Anwendungen über Warteschlangen. Durch die Verwendung von Warteschlangen entfällt die Anforderung, dass empfangende und sendende Anwendungen gleichzeitig ausgeführt werden müssen.
Anwendungsszenarien:
RabbitMQ ist derzeit zweifellos eine der beliebtesten Nachrichtenwarteschlangen und bietet umfangreiche Unterstützung für verschiedene Sprachumgebungen. Ich muss dieses Tool lernen und verstehen. Es gibt ungefähr drei Verwendungsszenarien für Nachrichtenwarteschlangen:
1. Systemintegration und verteiltes Systemdesign. Verschiedene Subsysteme sind durch Nachrichten verbunden, und diese Lösung hat sich nach und nach zu einem Architekturstil entwickelt, nämlich „Architektur, die Nachrichten durchläuft“.
2. Wenn die Synchronisationsverarbeitungsmethode im System den Durchsatz ernsthaft beeinträchtigt, z. B. die Protokollierung. Wenn wir alle Benutzerverhaltensprotokolle im System aufzeichnen müssen, wirkt sich die synchrone Aufzeichnung der Protokolle zwangsläufig auf die Reaktionsgeschwindigkeit des Systems aus. Wenn wir Protokollnachrichten an die Nachrichtenwarteschlange senden, verbraucht das Protokollierungssubsystem die Protokollinformationen.
3. Hohe Verfügbarkeit des Systems, z. B. E-Commerce-Flash-Sale-Szenarien. Wenn der Anwendungsserver oder Datenbankserver zu einem bestimmten Zeitpunkt eine große Anzahl von Anforderungen erhält, kommt es zu Systemausfällen. Wenn die Anforderung an die Nachrichtenwarteschlange weitergeleitet werden kann und der Server diese Nachrichten dann verarbeitet, wird die Anforderung reibungsloser und die Verfügbarkeit des Systems verbessert.
1. Installationsumgebung
Installieren Sie zunächst Rabbitmq unter Linux
# 环境为CentOS 7 yum install rabbitmq-server # 安装RabbitMQ systemctl start rabbitmq-server # 启动 systemctl enable rabbitmq-server # 开机自启 systemctl stop firewall-cmd # 临时关闭防火墙
Dann verwenden Sie pip, um das Python3-Entwicklungspaket zu installieren
pip3 install pika
Nach der Installation der Software können Sie http:// besuchen 115. xx.xx.xx:15672/, um auf die integrierte Webseite zum Anzeigen und Verwalten von RabbitMQ zuzugreifen. Das Standardpasswort des Administratorbenutzers lautet „guest“
2. Fügen Sie einfach eine Nachricht zur Warteschlange hinzu
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:25 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Producer import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx')) # 创建频道对象 channel = connection.channel() # 指定一个队列,如果该队列不存在则创建 channel.queue_declare(queue='test_queue') # 提交消息 for i in range(10): channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i)) print("sent...") # 关闭连接 connection.close()
3. Einfach Nachrichten aus der Warteschlange abrufen
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:40 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Consumer import pika credentials = pika.PlainCredentials('guest', 'guest') # 连接到RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel() # 指定一个队列,如果该队列不存在则创建 channel.queue_declare(queue='test_queue') # 定义一个回调函数 def callback(ch, method, properties, body): print(body.decode('utf-8')) # 告诉RabbitMQ使用callback来接收信息 channel.basic_consume(callback, queue='test_queue', no_ack=False) print('waiting...') # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。 channel.start_consuming()
4 . Falls der Verbraucher offline geht
Stellen Sie sich eine Situation wie diese vor:
Der Verbraucher zieht sich aus der Nachrichtenwarteschlange zurück, die ich erhalten habe n Daten, und als ich sie verarbeiten wollte, stürzte die Maschine ab. In RabbieMQ gibt es ein ACK, mit dem das Ende der Verbraucherverarbeitung bestätigt werden kann. Dies ähnelt in gewisser Weise der ACK im Netzwerk. Jedes Mal, wenn der Verbraucher Daten aus der Warteschlange erhält, entfernt die Warteschlange die Daten nicht sofort, sondern wartet auf die entsprechende ACK. Nachdem der Verbraucher die Daten erhalten und die Verarbeitung abgeschlossen hat, sendet er ein ACK-Paket an die Warteschlange, um RabbitMQ darüber zu informieren, dass die Nachricht verarbeitet wurde und gelöscht werden kann. Zu diesem Zeitpunkt entfernt RabbitMQ die Daten aus der Warteschlange. In diesem Fall besteht also kein Problem, selbst wenn der Verbraucher offline geht. Die Daten bleiben weiterhin in der Warteschlange und können von anderen Verbrauchern verarbeitet werden.
wird in Python folgendermaßen implementiert:
Der Verbraucher hat eine solche Codezeile channel.basic_consume(callback, queue='test_queue', no_ack=False)
, wobei no_ack=False
bedeutet, dies nicht zu tun ein Bestätigungspaket senden. Durch Ändern in no_ack=True wird nach jeder Verarbeitung ein Bestätigungspaket an RabbitMQ gesendet, um zu bestätigen, dass die Nachricht verarbeitet wurde.
5. Was passiert, wenn RabbitMQ ausfällt?
Auch wenn RabbitMQ die Daten aufhängt? Es wird weiterhin Verluste geben. So können wir einen Datenpersistenzspeicher für RabbitMQ einrichten. RabbitMQ speichert die Daten auf der Festplatte, um sicherzustellen, dass die Warteschlange beim nächsten Start noch vorhanden ist.
wird in Python folgendermaßen implementiert:
我们声明一个队列是这样的channel.queue_declare(queue='test_queue')
,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True)
。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
。
六、最简单的发布订阅
最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。
发布者代码:
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:21 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Publisher import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx')) # 创建频道对象 channel = connection.channel() # 定义交换机,exchange表示交换机名称,type表示类型 channel.exchange_declare(exchange='my_fanout', type='fanout') message = 'Hello Python' # 将消息发送到交换机 channel.basic_publish(exchange='my_fanout', # 指定exchange routing_key='', # fanout下不需要配置,配置了也不会生效 body=message) connection.close()
订阅者代码:
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:20 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Subscriber import pika credentials = pika.PlainCredentials('guest', 'guest') # 连接到RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel() # 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型 channel.exchange_declare(exchange='my_fanout', type='fanout') # 随机创建队列 result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除 queue_name = result.method.queue # 将队列与exchange进行绑定 channel.queue_bind(exchange='my_fanout', queue=queue_name) # 定义回调方法 def callback(ch, method, properties, body): print(body.decode('utf-8')) # 从队列获取信息 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
总结
Das obige ist der detaillierte Inhalt vonTutorial zur Bedienung der Nachrichtenwarteschlange (RabbitMQ) in Python. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!