首頁 後端開發 Python教學 Python+Pika+RabbitMQ環境部署及實作工作佇列

Python+Pika+RabbitMQ環境部署及實作工作佇列

Mar 01, 2017 pm 02:04 PM

rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,也就是訊息佇列的意思。前面還有rabbit單詞,就是兔子的意思,跟python語言叫python一樣,老外還蠻幽默的。 rabbitmq服務類似mysql、apache服務,只是提供的功能不一樣。 rabbimq是用來提供發送訊息的服務,可以用在不同的應用程式之間進行通訊。

安裝rabbitmq
先來安裝下rabbitmq,在ubuntu 12.04下可以直接透過apt-get安裝:

#
sudo apt-get install rabbitmq-server
登入後複製

安裝好後,rabbitmq服務就已經啟動好了。接下來看下python編寫Hello World!的實例。實例的內容就是從send.py發送「Hello World!」到rabbitmq,receive.py從rabbitmq接收send.py發送的訊息。

Python+Pika+RabbitMQ環境部署及實作工作佇列

其中P表示produce,生產者的意思,也可以稱為發送者,實例中表現為send.py;C表示consumer,消費者的意思,也可以稱為接收者,實例中表現為receive.py;中間紅色的表示佇列的意思,實例中表現為hello佇列。

python使用rabbitmq服務,可以使用現成的類別庫pika、txAMQP或py-amqplib,這裡選擇了pika。

安裝pika

安裝pika可以使用pip來進行安裝,pip是python的軟體管理包,如果沒有安裝,可以透過apt-get安裝

sudo apt-get install python-pip
登入後複製

透過pip安裝pika:

#
sudo pip install pika
登入後複製

##send.py代碼

連線到rabbitmq伺服器,因為是在本地測試,所以就用localhost就可以了。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
登入後複製

聲明訊息佇列,訊息將在這個佇列中傳遞。如果將訊息傳送到不存在的佇列,rabbitmq將會自動清除這些訊息。

channel.queue_declare(queue='hello')
登入後複製

發送訊息到上面宣告的hello佇列,其中exchange表示交換器,能精確指定訊息應該傳送到哪個佇列,routing_key設定為佇列的名稱,body就是發送的內容,具體發送細節暫時先不關注。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
登入後複製

關閉連線

#

connection.close()
登入後複製

完整程式碼


Python+Pika+RabbitMQ環境部署及實作工作佇列


#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
登入後複製

先來執行下這個程序,執行成功的話,rabbitmqctl應該成功增加了hello隊列,並且隊列裡應該有一條信息,用rabbitmqctl命令來查看下

rabbitmqctl list_queues
登入後複製

在筆者的電腦上輸出以下訊息:

##確實有一個hello佇列,並且佇列裡有一個訊息。接下來用receive.py來取得佇列裡的資訊。

receive.py程式碼

和send.py的前面兩個步驟一樣,都是要先連接伺服器,然後宣告訊息的佇列,這裡就不再貼同樣代碼了。

接收訊息更為複雜一些,需要定義一個回呼函數來處理,這邊的回呼函數就是將訊息列印出來。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)
登入後複製

告訴rabbitmq使用callback來接收訊息

channel.basic_consume(callback, queue='hello', no_ack=True)
登入後複製

開始接收訊息,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理。按ctrl+c退出。

channel.start_consuming()
登入後複製

完整程式碼

#

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
登入後複製

執行程序,就能夠接收到佇列hello裡的訊息Hello World!,然後印在螢幕上。換一個終端,再次執行send.py,可以看到receive.py這邊會再次接收到訊息。

工作佇列範例

1.準備工作(Preparation)

在實例程式中,用new_task .py來模擬任務分配者, worker.py來模擬工作者。

修改send.py,從命令列參數接收訊息,並發送

#
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)
登入後複製

修改receive.py的回呼函數。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
登入後複製

###這邊先打開兩個終端,都執行worker.py,處於監聽狀態,這邊就相當於兩個工作者。開啟第三個終端,執行new_task.py#########
$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
登入後複製
##########觀察worker.py接收到任務,其中一個工作者接收到3個任務:### ######
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
登入後複製
#########另外一個工作者接收到2個任務:#########
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
登入後複製
######

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
登入後複製

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
登入後複製

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)
登入後複製

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)
登入後複製

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
登入後複製

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)
登入後複製

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()
登入後複製


更多Python+Pika+RabbitMQ環境部署及實作工作佇列相关文章请关注PHP中文网!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何解決Linux終端中查看Python版本時遇到的權限問題? 如何解決Linux終端中查看Python版本時遇到的權限問題? Apr 01, 2025 pm 05:09 PM

Linux終端中查看Python版本時遇到權限問題的解決方法當你在Linux終端中嘗試查看Python的版本時,輸入python...

如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? 如何在10小時內通過項目和問題驅動的方式教計算機小白編程基礎? Apr 02, 2025 am 07:18 AM

如何在10小時內教計算機小白編程基礎?如果你只有10個小時來教計算機小白一些編程知識,你會選擇教些什麼�...

如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? 如何在使用 Fiddler Everywhere 進行中間人讀取時避免被瀏覽器檢測到? Apr 02, 2025 am 07:15 AM

使用FiddlerEverywhere進行中間人讀取時如何避免被檢測到當你使用FiddlerEverywhere...

在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? 在Python中如何高效地將一個DataFrame的整列複製到另一個結構不同的DataFrame中? Apr 01, 2025 pm 11:15 PM

在使用Python的pandas庫時,如何在兩個結構不同的DataFrame之間進行整列複製是一個常見的問題。假設我們有兩個Dat...

Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Uvicorn是如何在沒有serve_forever()的情況下持續監聽HTTP請求的? Apr 01, 2025 pm 10:51 PM

Uvicorn是如何持續監聽HTTP請求的? Uvicorn是一個基於ASGI的輕量級Web服務器,其核心功能之一便是監聽HTTP請求並進�...

如何繞過Investing.com的反爬蟲機制獲取新聞數據? 如何繞過Investing.com的反爬蟲機制獲取新聞數據? Apr 02, 2025 am 07:03 AM

攻克Investing.com的反爬蟲策略許多人嘗試爬取Investing.com(https://cn.investing.com/news/latest-news)的新聞數據時,常常�...

在Linux終端中使用python --version命令時如何解決權限問題? 在Linux終端中使用python --version命令時如何解決權限問題? Apr 02, 2025 am 06:36 AM

Linux終端中使用python...

See all articles