目錄
HelloWorld
簡介
code
工作佇列(任務佇列)
訊息ack
訊息持久化
channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
登入後複製
" >
channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
登入後複製
之前已經使用過bind,即建立exchange和queue的關係(此佇列對來自該exchange的訊息有興趣),bind時可另外指定routing_key選項。使用direct exchange
傳送函數,發布不同severity的訊息:
使用topic exchange
 RPC
首頁 後端開發 Python教學 RabbitMQ快速入門python教程

RabbitMQ快速入門python教程

Mar 09, 2017 am 09:28 AM
pika python rabbitmq

HelloWorld

簡介

RabbitMQ:接受訊息再傳遞訊息,可以視為一個「郵局」。發送者和接受者透過佇列來進行交互,佇列的大小可以視為無限的,多個發送者可以發生給一個佇列,多個接收者也可以從一個佇列中接受訊息。

code

rabbitmq使用的協定是amqp,用於python的推薦客戶端是pika

pip install pika -i https://pypi.douban.com/simple/
登入後複製

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
登入後複製

這裡連結的是本機的,如果想要連接其他機器上的伺服器,只要填入位址或主機名稱即可。

接下來我們開始發送訊息了,注意要確保接受訊息的佇列是存在的,否則rabbitmq就丟棄掉該訊息

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
登入後複製

RabbitMQ預設需要1GB的空閒磁碟空間,否則發送會失敗。

這時已在本機佇列hello中存放了一個訊息,如果使用rabbitmqctl list_queues 可看到

hello 1
登入後複製

說明有一個hello佇列裡面存放了一個訊息

receive .py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
登入後複製

還是先連結到伺服器,和之前發送時相同

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
登入後複製

工作佇列(任務佇列)

工作佇列是用來分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是在安排這些任務之後執行。例如我們把task當作message送到佇列裡,啟動工作進程來接受並最終執行,且可啟動多個工作進程來運作。這適用於web應用,即不應在一個http請求的處理視窗內完成複雜任務。

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
登入後複製

分配訊息的方式為 輪詢 即每個工作進程獲得相同的訊息數。

訊息ack

如果訊息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該訊息就遺失了,因為rabbitmq一旦把一個訊息分發給工作進程,它就把該訊息刪掉了。

為了預防訊息遺失,rabbitmq提供了ack,即工作進程在收到訊息並處理後,發送ack給rabbitmq,告知rabbitmq這時候可以把該訊息從佇列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那麼會把該訊息 重新分發給其他工作進程。不需要設定timeout,即使該任務需要很長時間也可以處理。

ack預設是開啟的,之前我們的工作進程顯示指定了no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack
登入後複製

帶ack的callback:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
登入後複製

訊息持久化

#但是,有時RabbitMQ重啟了,訊息也會遺失。可在建立佇列時設定持久化:
(佇列的性質一旦確定無法改變)

channel.queue_declare(queue='task_queue', durable=True)
登入後複製

同時在傳送訊息時也得設定該訊息的持久化屬性:

channel .basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
登入後複製

但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存儲操作。如果還需要更完善的保證,需要使用publisher confirm。某些進程則會一直執行繁 重任務。 ##

channel.basic_qos(prefetch_count=1)
登入後複製
告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配訊息給它。給一個工作進程,然後完成,有時想把一條訊息同時發送給多個進程:

exchange

發送者是不是直接發送訊息到佇列中的,事實上發生者根本不知道訊息會傳送到那個佇列,發送者只能把訊息送到exchange裡。時它需要做什麼,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。訊息時,exchange的值為'' 即使用default exchange。

綁定exchange 和佇列

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
登入後複製

logs在傳送訊息時也寄一份給hello。

路由

之前已經使用過bind,即建立exchange和queue的關係(此佇列對來自該exchange的訊息有興趣),bind時可另外指定routing_key選項。使用direct exchange

將對應routing key的訊息傳送到綁定相同routing key的佇列中

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
登入後複製

傳送函數,發布不同severity的訊息:

channel.queue_bind(exchange='logs',
               queue='hello')
登入後複製

接受函數中綁定對應severity的:

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
登入後複製

使用topic exchange

之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange ,例如:

"stock.usd.nyse" "nyse.vmw"
登入後複製

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
登入後複製

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
登入後複製

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
登入後複製

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
登入後複製

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
登入後複製

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
登入後複製

                                               

以上是RabbitMQ快速入門python教程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

PHP和Python:解釋了不同的範例 PHP和Python:解釋了不同的範例 Apr 18, 2025 am 12:26 AM

PHP主要是過程式編程,但也支持面向對象編程(OOP);Python支持多種範式,包括OOP、函數式和過程式編程。 PHP適合web開發,Python適用於多種應用,如數據分析和機器學習。

在PHP和Python之間進行選擇:指南 在PHP和Python之間進行選擇:指南 Apr 18, 2025 am 12:24 AM

PHP適合網頁開發和快速原型開發,Python適用於數據科學和機器學習。 1.PHP用於動態網頁開發,語法簡單,適合快速開發。 2.Python語法簡潔,適用於多領域,庫生態系統強大。

Python vs. JavaScript:學習曲線和易用性 Python vs. JavaScript:學習曲線和易用性 Apr 16, 2025 am 12:12 AM

Python更適合初學者,學習曲線平緩,語法簡潔;JavaScript適合前端開發,學習曲線較陡,語法靈活。 1.Python語法直觀,適用於數據科學和後端開發。 2.JavaScript靈活,廣泛用於前端和服務器端編程。

PHP和Python:深入了解他們的歷史 PHP和Python:深入了解他們的歷史 Apr 18, 2025 am 12:25 AM

PHP起源於1994年,由RasmusLerdorf開發,最初用於跟踪網站訪問者,逐漸演變為服務器端腳本語言,廣泛應用於網頁開發。 Python由GuidovanRossum於1980年代末開發,1991年首次發布,強調代碼可讀性和簡潔性,適用於科學計算、數據分析等領域。

vs code 可以在 Windows 8 中運行嗎 vs code 可以在 Windows 8 中運行嗎 Apr 15, 2025 pm 07:24 PM

VS Code可以在Windows 8上運行,但體驗可能不佳。首先確保系統已更新到最新補丁,然後下載與系統架構匹配的VS Code安裝包,按照提示安裝。安裝後,注意某些擴展程序可能與Windows 8不兼容,需要尋找替代擴展或在虛擬機中使用更新的Windows系統。安裝必要的擴展,檢查是否正常工作。儘管VS Code在Windows 8上可行,但建議升級到更新的Windows系統以獲得更好的開發體驗和安全保障。

visual studio code 可以用於 python 嗎 visual studio code 可以用於 python 嗎 Apr 15, 2025 pm 08:18 PM

VS Code 可用於編寫 Python,並提供許多功能,使其成為開發 Python 應用程序的理想工具。它允許用戶:安裝 Python 擴展,以獲得代碼補全、語法高亮和調試等功能。使用調試器逐步跟踪代碼,查找和修復錯誤。集成 Git,進行版本控制。使用代碼格式化工具,保持代碼一致性。使用 Linting 工具,提前發現潛在問題。

notepad 怎麼運行python notepad 怎麼運行python Apr 16, 2025 pm 07:33 PM

在 Notepad 中運行 Python 代碼需要安裝 Python 可執行文件和 NppExec 插件。安裝 Python 並為其添加 PATH 後,在 NppExec 插件中配置命令為“python”、參數為“{CURRENT_DIRECTORY}{FILE_NAME}”,即可在 Notepad 中通過快捷鍵“F6”運行 Python 代碼。

vscode 擴展是否是惡意的 vscode 擴展是否是惡意的 Apr 15, 2025 pm 07:57 PM

VS Code 擴展存在惡意風險,例如隱藏惡意代碼、利用漏洞、偽裝成合法擴展。識別惡意擴展的方法包括:檢查發布者、閱讀評論、檢查代碼、謹慎安裝。安全措施還包括:安全意識、良好習慣、定期更新和殺毒軟件。

See all articles