首頁 > 後端開發 > Python教學 > RabbitMQ快速入門python教程

RabbitMQ快速入門python教程

高洛峰
發布: 2017-03-09 09:28:19
原創
1893 人瀏覽過

HelloWorld

簡介

RabbitMQ:接受訊息再傳遞訊息,可以視為一個「郵局」。發送者和接受者透過佇列來進行交互,佇列的大小可以視為無限的,多個發送者可以發生給一個佇列,多個接收者也可以從一個佇列中接受訊息。

code

rabbitmq使用的協定是amqp,用於python的推薦客戶端是pika

pip install pika -i https://pypi.douban.com/simple/
登入後複製

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
登入後複製

這裡連結的是本機的,如果想要連接其他機器上的伺服器,只要填入位址或主機名稱即可。

接下來我們開始發送訊息了,注意要確保接受訊息的佇列是存在的,否則rabbitmq就丟棄掉該訊息

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
登入後複製

RabbitMQ預設需要1GB的空閒磁碟空間,否則發送會失敗。

這時已在本機佇列hello中存放了一個訊息,如果使用rabbitmqctl list_queues 可看到

hello 1
登入後複製

說明有一個hello佇列裡面存放了一個訊息

receive .py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
登入後複製

還是先連結到伺服器,和之前發送時相同

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
登入後複製

工作佇列(任務佇列)

工作佇列是用來分發耗時任務給多個工作進程的。不立即做那些耗費資源的任務(需要等待這些任務完成),而是在安排這些任務之後執行。例如我們把task當作message送到佇列裡,啟動工作進程來接受並最終執行,且可啟動多個工作進程來運作。這適用於web應用,即不應在一個http請求的處理視窗內完成複雜任務。

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
登入後複製

分配訊息的方式為 輪詢 即每個工作進程獲得相同的訊息數。

訊息ack

如果訊息分配給某個工作進程,但是該工作進程未處理完成就崩潰了,可能該訊息就遺失了,因為rabbitmq一旦把一個訊息分發給工作進程,它就把該訊息刪掉了。

為了預防訊息遺失,rabbitmq提供了ack,即工作進程在收到訊息並處理後,發送ack給rabbitmq,告知rabbitmq這時候可以把該訊息從佇列中刪除了。如果工作進程掛掉 了,rabbitmq沒有收到ack,那麼會把該訊息 重新分發給其他工作進程。不需要設定timeout,即使該任務需要很長時間也可以處理。

ack預設是開啟的,之前我們的工作進程顯示指定了no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack
登入後複製

帶ack的callback:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
登入後複製

訊息持久化

#但是,有時RabbitMQ重啟了,訊息也會遺失。可在建立佇列時設定持久化:
(佇列的性質一旦確定無法改變)

channel.queue_declare(queue='task_queue', durable=True)
登入後複製

同時在傳送訊息時也得設定該訊息的持久化屬性:

channel .basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
登入後複製

但是,如果在RabbitMQ剛接收到消息還沒來得及存儲,消息還是會丟失。同時,RabbitMQ也不是在接受到每個消息都進行存儲操作。如果還需要更完善的保證,需要使用publisher confirm。某些進程則會一直執行繁 重任務。 ##

channel.basic_qos(prefetch_count=1)
登入後複製
告知RabbitMQ,這樣在一個工作進程沒回發ack情況下是不會再分配訊息給它。給一個工作進程,然後完成,有時想把一條訊息同時發送給多個進程:

exchange

發送者是不是直接發送訊息到佇列中的,事實上發生者根本不知道訊息會傳送到那個佇列,發送者只能把訊息送到exchange裡。時它需要做什麼,是應該把它加到一個特殊的隊列中還是放到很多的隊列中,或者丟棄。訊息時,exchange的值為'' 即使用default exchange。

綁定exchange 和佇列

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
登入後複製

logs在傳送訊息時也寄一份給hello。

路由

之前已經使用過bind,即建立exchange和queue的關係(此佇列對來自該exchange的訊息有興趣),bind時可另外指定routing_key選項。使用direct exchange

將對應routing key的訊息傳送到綁定相同routing key的佇列中

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
登入後複製

傳送函數,發布不同severity的訊息:

channel.queue_bind(exchange='logs',
               queue='hello')
登入後複製

接受函數中綁定對應severity的:

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
登入後複製

使用topic exchange

之前使用的direct exchange 只能綁定一個routing key,可以使用這種可以拿.隔開routing key的topic exchange ,例如:

"stock.usd.nyse" "nyse.vmw"
登入後複製

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
登入後複製

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
登入後複製

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
登入後複製

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
登入後複製

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
登入後複製

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
登入後複製

                                               

以上是RabbitMQ快速入門python教程的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板