Python+Pika+RabbitMQ環境部署及實作工作佇列

高洛峰
發布: 2017-03-01 14:04:50
原創
1500 人瀏覽過

rabbitmq中文翻譯的話,主要還是mq字母上:Message Queue,也就是訊息佇列的意思。前面還有rabbit單詞,就是兔子的意思,跟python語言叫python一樣,老外還蠻幽默的。 rabbitmq服務類似mysql、apache服務,只是提供的功能不一樣。 rabbimq是用來提供發送訊息的服務,可以用在不同的應用程式之間進行通訊。

安裝rabbitmq
先來安裝下rabbitmq,在ubuntu 12.04下可以直接透過apt-get安裝:

#
sudo apt-get install rabbitmq-server
登入後複製

安裝好後,rabbitmq服務就已經啟動好了。接下來看下python編寫Hello World!的實例。實例的內容就是從send.py發送「Hello World!」到rabbitmq,receive.py從rabbitmq接收send.py發送的訊息。

Python+Pika+RabbitMQ環境部署及實作工作佇列

其中P表示produce,生產者的意思,也可以稱為發送者,實例中表現為send.py;C表示consumer,消費者的意思,也可以稱為接收者,實例中表現為receive.py;中間紅色的表示佇列的意思,實例中表現為hello佇列。

python使用rabbitmq服務,可以使用現成的類別庫pika、txAMQP或py-amqplib,這裡選擇了pika。

安裝pika

安裝pika可以使用pip來進行安裝,pip是python的軟體管理包,如果沒有安裝,可以透過apt-get安裝

sudo apt-get install python-pip
登入後複製

透過pip安裝pika:

#
sudo pip install pika
登入後複製

##send.py代碼

連線到rabbitmq伺服器,因為是在本地測試,所以就用localhost就可以了。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
登入後複製

聲明訊息佇列,訊息將在這個佇列中傳遞。如果將訊息傳送到不存在的佇列,rabbitmq將會自動清除這些訊息。

channel.queue_declare(queue='hello')
登入後複製

發送訊息到上面宣告的hello佇列,其中exchange表示交換器,能精確指定訊息應該傳送到哪個佇列,routing_key設定為佇列的名稱,body就是發送的內容,具體發送細節暫時先不關注。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
登入後複製

關閉連線

#

connection.close()
登入後複製

完整程式碼


Python+Pika+RabbitMQ環境部署及實作工作佇列


#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
登入後複製

先來執行下這個程序,執行成功的話,rabbitmqctl應該成功增加了hello隊列,並且隊列裡應該有一條信息,用rabbitmqctl命令來查看下

rabbitmqctl list_queues
登入後複製

在筆者的電腦上輸出以下訊息:

##確實有一個hello佇列,並且佇列裡有一個訊息。接下來用receive.py來取得佇列裡的資訊。

receive.py程式碼

和send.py的前面兩個步驟一樣,都是要先連接伺服器,然後宣告訊息的佇列,這裡就不再貼同樣代碼了。

接收訊息更為複雜一些,需要定義一個回呼函數來處理,這邊的回呼函數就是將訊息列印出來。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)
登入後複製

告訴rabbitmq使用callback來接收訊息

channel.basic_consume(callback, queue='hello', no_ack=True)
登入後複製

開始接收訊息,並進入阻塞狀態,佇列裡有資訊才會呼叫callback進行處理。按ctrl+c退出。

channel.start_consuming()
登入後複製

完整程式碼

#

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
登入後複製

執行程序,就能夠接收到佇列hello裡的訊息Hello World!,然後印在螢幕上。換一個終端,再次執行send.py,可以看到receive.py這邊會再次接收到訊息。

工作佇列範例

1.準備工作(Preparation)

在實例程式中,用new_task .py來模擬任務分配者, worker.py來模擬工作者。

修改send.py,從命令列參數接收訊息,並發送

#
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)
登入後複製

修改receive.py的回呼函數。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
登入後複製

###這邊先打開兩個終端,都執行worker.py,處於監聽狀態,這邊就相當於兩個工作者。開啟第三個終端,執行new_task.py#########
$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
登入後複製
##########觀察worker.py接收到任務,其中一個工作者接收到3個任務:### ######
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
登入後複製
#########另外一個工作者接收到2個任務:#########
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
登入後複製
######

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
登入後複製

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
登入後複製

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)
登入後複製

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)
登入後複製

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
登入後複製

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)
登入後複製

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()
登入後複製


更多Python+Pika+RabbitMQ環境部署及實作工作佇列相关文章请关注PHP中文网!

來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!