ホームページ > バックエンド開発 > Python チュートリアル > Python+Pika+RabbitMQ環境の展開とワークキューの実装

Python+Pika+RabbitMQ環境の展開とワークキューの実装

高洛峰
リリース: 2017-03-01 14:04:50
オリジナル
1620 人が閲覧しました

Rabbitmq の中国語訳は、主にメッセージキューを意味する mq: Message Queue という文字を指します。ウサギを意味する「rabbit」という単語も付いています。これはニシキヘビという言語と同じです。外国人はとてもユーモラスです。 Rabbitmq サービスは mysql サービスや Apache サービスに似ていますが、提供される機能が異なります。 rabbimq は、異なるアプリケーション間の通信に使用できるメッセージ送信サービスを提供するために使用されます。

rabbitmq をインストールします
まず、ubuntu 12.04 では、apt-get を通じて直接インストールできます:

sudo apt-get install rabbitmq-server
ログイン後にコピー

インストール後、rabbitmq サービスが開始されています。次に、Python で Hello World! を記述する例を見てみましょう。サンプルの内容は、send.pyからrabbitmqに「Hello World!」を送信し、send.pyがrabbitmqから送信した情報をreceive.pyが受け取るというものです。

Python+Pika+RabbitMQ環境の展開とワークキューの実装

ここで、P は生産者を意味するプロデュースを表し、送信者とも呼ばれ、例では send.py として示されています。C は消費者を意味し、消費者とも呼ばれます。これは受信者であり、例ではreceive.pyとして示されています。中央の赤いものはキューを意味し、この例ではhello queueです。

Python は、rabbitmq サービスを使用します。ここでは、pika、txAMQP、または py-amqplib を使用します。

pikaをインストールする

pikaをインストールするには、pipを使用できます。pipがインストールされていない場合は、apt-get

sudo apt-get install python-pip
ログイン後にコピー

を介してインストールできます。 pip:

sudo pip install pika
ログイン後にコピー

send.py code

はローカルでテストされているため、localhost を使用します。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
ログイン後にコピー

メッセージが配信されるメッセージキューを宣言します。存在しないキューにメッセージが送信された場合、rabbitmq はこれらのメッセージを自動的にクリアします。

channel.queue_declare(queue='hello')
ログイン後にコピー

上で宣言された hello キューにメッセージを送信します。ここで、exchange は、メッセージがどのキューに送信されるかを正確に指定できるエクスチェンジャーを表します。routing_key はキューの名前に設定され、body はキューの名前に設定されます。具体的な送信内容は一時的には関係ありません。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
ログイン後にコピー

接続を閉じます

connection.close()
ログイン後にコピー

完全なコード

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
ログイン後にコピー

まずこのプログラムを実行します。実行が成功すると、rabbitmqctlはhelloキューを正常に追加し、helloキューが1つ追加されるはずです。キュー内の情報を表示するには、rabbitmqctl コマンドを使用します。

rabbitmqctl list_queues
ログイン後にコピー

次の情報が作成者のコンピュータに出力されます:

Python+Pika+RabbitMQ環境の展開とワークキューの実装


確かに hello キューがあり、メッセージがあります待ち行列。次に、receive.py を使用してキュー内の情報を取得します。

receive.py コード

は、最初にサーバーに接続し、次にメッセージキューを宣言する必要がある send.py の前の 2 つのステップと同じです。同じコードはここには掲載されません。

メッセージの受信は、それを処理するためのコールバック関数を定義する必要があります。ここでのコールバック関数は、情報を出力することです。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)
ログイン後にコピー

は、rabbitmqにコールバックを使用して情報を受信するように指示します

channel.basic_consume(callback, queue='hello', no_ack=True)
ログイン後にコピー

は、キューに情報がある場合にのみ、コールバックが呼び出されて処理されます。 Ctrl+C を押して終了します。

channel.start_consuming()
ログイン後にコピー

完全なコード

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
ログイン後にコピー

プログラムを実行すると、キュー hello に Hello World! というメッセージが表示され、画面に表示されます。端末を変更して再度send.pyを実行すると、receive.pyが再度情報を受信することがわかります。

ワークキューの例

1.準備

サンプルプログラムでは、new_task.pyを使用してタスクアロケーターをシミュレートし、worker.pyを使用してワーカーをシミュレートします。

コマンドラインパラメータから情報を受信して​​送信するようにsend.pyを変更します

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)
ログイン後にコピー

receive.pyのコールバック関数を変更します。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
ログイン後にコピー

まずここで 2 つのターミナルを開き、どちらも worker.py を実行し、listen 状態にします。これは 2 つのワーカーに相当します。 3 番目のターミナルを開き、new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
ログイン後にコピー

worker.py が 3 つのタスクを受信することを確認します:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
ログイン後にコピー

もう 1 つのワーカーは 2 つのミッションを受信します:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
ログイン後にコピー

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
ログイン後にコピー

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
ログイン後にコピー

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)
ログイン後にコピー

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)
ログイン後にコピー

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
ログイン後にコピー

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)
ログイン後にコピー

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()
ログイン後にコピー


更多Python+Pika+RabbitMQ環境の展開とワークキューの実装相关文章请关注PHP中文网!

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート