ホームページ バックエンド開発 Python チュートリアル Python+Pika+RabbitMQ環境の展開とワークキューの実装

Python+Pika+RabbitMQ環境の展開とワークキューの実装

Mar 01, 2017 pm 02:04 PM

Rabbitmq の中国語訳は、主にメッセージキューを意味する mq: Message Queue という文字を指します。ウサギを意味する「rabbit」という単語も付いています。これはニシキヘビという言語と同じです。外国人はとてもユーモラスです。 Rabbitmq サービスは mysql サービスや Apache サービスに似ていますが、提供される機能が異なります。 rabbimq は、異なるアプリケーション間の通信に使用できるメッセージ送信サービスを提供するために使用されます。

rabbitmq をインストールします
まず、ubuntu 12.04 では、apt-get を通じて直接インストールできます:

sudo apt-get install rabbitmq-server
ログイン後にコピー

インストール後、rabbitmq サービスが開始されています。次に、Python で Hello World! を記述する例を見てみましょう。サンプルの内容は、send.pyからrabbitmqに「Hello World!」を送信し、send.pyがrabbitmqから送信した情報をreceive.pyが受け取るというものです。

Python+Pika+RabbitMQ環境の展開とワークキューの実装

ここで、P は生産者を意味するプロデュースを表し、送信者とも呼ばれ、例では send.py として示されています。C は消費者を意味し、消費者とも呼ばれます。これは受信者であり、例ではreceive.pyとして示されています。中央の赤いものはキューを意味し、この例ではhello queueです。

Python は、rabbitmq サービスを使用します。ここでは、pika、txAMQP、または py-amqplib を使用します。

pikaをインストールする

pikaをインストールするには、pipを使用できます。pipがインストールされていない場合は、apt-get

sudo apt-get install python-pip
ログイン後にコピー

を介してインストールできます。 pip:

sudo pip install pika
ログイン後にコピー

send.py code

はローカルでテストされているため、localhost を使用します。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
ログイン後にコピー

メッセージが配信されるメッセージキューを宣言します。存在しないキューにメッセージが送信された場合、rabbitmq はこれらのメッセージを自動的にクリアします。

channel.queue_declare(queue='hello')
ログイン後にコピー

上で宣言された hello キューにメッセージを送信します。ここで、exchange は、メッセージがどのキューに送信されるかを正確に指定できるエクスチェンジャーを表します。routing_key はキューの名前に設定され、body はキューの名前に設定されます。具体的な送信内容は一時的には関係ありません。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
ログイン後にコピー

接続を閉じます

connection.close()
ログイン後にコピー

完全なコード

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
ログイン後にコピー

まずこのプログラムを実行します。実行が成功すると、rabbitmqctlはhelloキューを正常に追加し、helloキューが1つ追加されるはずです。キュー内の情報を表示するには、rabbitmqctl コマンドを使用します。

rabbitmqctl list_queues
ログイン後にコピー

次の情報が作成者のコンピュータに出力されます:

Python+Pika+RabbitMQ環境の展開とワークキューの実装


確かに hello キューがあり、メッセージがあります待ち行列。次に、receive.py を使用してキュー内の情報を取得します。

receive.py コード

は、最初にサーバーに接続し、次にメッセージキューを宣言する必要がある send.py の前の 2 つのステップと同じです。同じコードはここには掲載されません。

メッセージの受信は、それを処理するためのコールバック関数を定義する必要があります。ここでのコールバック関数は、情報を出力することです。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)
ログイン後にコピー

は、rabbitmqにコールバックを使用して情報を受信するように指示します

channel.basic_consume(callback, queue='hello', no_ack=True)
ログイン後にコピー

は、キューに情報がある場合にのみ、コールバックが呼び出されて処理されます。 Ctrl+C を押して終了します。

channel.start_consuming()
ログイン後にコピー

完全なコード

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
ログイン後にコピー

プログラムを実行すると、キュー hello に Hello World! というメッセージが表示され、画面に表示されます。端末を変更して再度send.pyを実行すると、receive.pyが再度情報を受信することがわかります。

ワークキューの例

1.準備

サンプルプログラムでは、new_task.pyを使用してタスクアロケーターをシミュレートし、worker.pyを使用してワーカーをシミュレートします。

コマンドラインパラメータから情報を受信して​​送信するようにsend.pyを変更します

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)
ログイン後にコピー

receive.pyのコールバック関数を変更します。

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
ログイン後にコピー

まずここで 2 つのターミナルを開き、どちらも worker.py を実行し、listen 状態にします。これは 2 つのワーカーに相当します。 3 番目のターミナルを開き、new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
ログイン後にコピー

worker.py が 3 つのタスクを受信することを確認します:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
ログイン後にコピー

もう 1 つのワーカーは 2 つのミッションを受信します:

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
ログイン後にコピー

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
ログイン後にコピー

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
ログイン後にコピー

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)
ログイン後にコピー

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)
ログイン後にコピー

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
ログイン後にコピー

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)
ログイン後にコピー

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()
ログイン後にコピー


更多Python+Pika+RabbitMQ環境の展開とワークキューの実装相关文章请关注PHP中文网!

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Pythonを使用してテキストファイルのZIPF配布を見つける方法 Pythonを使用してテキストファイルのZIPF配布を見つける方法 Mar 05, 2025 am 09:58 AM

このチュートリアルでは、Pythonを使用してZIPFの法則の統計的概念を処理する方法を示し、法律の処理時にPythonの読み取りおよび並べ替えの効率性を示します。 ZIPF分布という用語が何を意味するのか疑問に思うかもしれません。この用語を理解するには、まずZIPFの法律を定義する必要があります。心配しないでください、私は指示を簡素化しようとします。 ZIPFの法則 ZIPFの法則は単に意味します。大きな自然言語のコーパスでは、最も頻繁に発生する単語は、2番目の頻繁な単語のほぼ2倍の頻度で表示されます。 例を見てみましょう。アメリカ英語の茶色のコーパスを見ると、最も頻繁な言葉は「thであることに気付くでしょう。

HTMLを解析するために美しいスープを使用するにはどうすればよいですか? HTMLを解析するために美しいスープを使用するにはどうすればよいですか? Mar 10, 2025 pm 06:54 PM

この記事では、Pythonライブラリである美しいスープを使用してHTMLを解析する方法について説明します。 find()、find_all()、select()、およびget_text()などの一般的な方法は、データ抽出、多様なHTML構造とエラーの処理、および代替案(SEL

Pythonでの画像フィルタリング Pythonでの画像フィルタリング Mar 03, 2025 am 09:44 AM

ノイズの多い画像を扱うことは、特に携帯電話や低解像度のカメラの写真でよくある問題です。 このチュートリアルでは、OpenCVを使用してPythonの画像フィルタリング手法を調査して、この問題に取り組みます。 画像フィルタリング:強力なツール 画像フィルター

Pythonを使用してPDFドキュメントの操作方法 Pythonを使用してPDFドキュメントの操作方法 Mar 02, 2025 am 09:54 AM

PDFファイルは、クロスプラットフォームの互換性に人気があり、オペレーティングシステム、読み取りデバイス、ソフトウェア間でコンテンツとレイアウトが一貫しています。ただし、Python Plansing Plain Text Filesとは異なり、PDFファイルは、より複雑な構造を持つバイナリファイルであり、フォント、色、画像などの要素を含んでいます。 幸いなことに、Pythonの外部モジュールでPDFファイルを処理することは難しくありません。この記事では、PYPDF2モジュールを使用して、PDFファイルを開き、ページを印刷し、テキストを抽出する方法を示します。 PDFファイルの作成と編集については、私からの別のチュートリアルを参照してください。 準備 コアは、外部モジュールPYPDF2を使用することにあります。まず、PIPを使用してインストールします。 ピップはpです

DjangoアプリケーションでRedisを使用してキャッシュする方法 DjangoアプリケーションでRedisを使用してキャッシュする方法 Mar 02, 2025 am 10:10 AM

このチュートリアルでは、Redisキャッシングを活用して、特にDjangoフレームワーク内でPythonアプリケーションのパフォーマンスを向上させる方法を示しています。 Redisのインストール、Django構成、およびパフォーマンスの比較をカバーして、Beneを強調します

TensorflowまたはPytorchで深い学習を実行する方法は? TensorflowまたはPytorchで深い学習を実行する方法は? Mar 10, 2025 pm 06:52 PM

この記事では、深い学習のためにTensorflowとPytorchを比較しています。 関連する手順、データの準備、モデルの構築、トレーニング、評価、展開について詳しく説明しています。 特に計算グラップに関して、フレームワーク間の重要な違い

Pythonの並列および同時プログラミングの紹介 Pythonの並列および同時プログラミングの紹介 Mar 03, 2025 am 10:32 AM

データサイエンスと処理のお気に入りであるPythonは、高性能コンピューティングのための豊富なエコシステムを提供します。ただし、Pythonの並列プログラミングは、独自の課題を提示します。このチュートリアルでは、これらの課題を調査し、グローバルな承認に焦点を当てています

Pythonで独自のデータ構造を実装する方法 Pythonで独自のデータ構造を実装する方法 Mar 03, 2025 am 09:28 AM

このチュートリアルでは、Python 3にカスタムパイプラインデータ構造を作成し、機能を強化するためにクラスとオペレーターのオーバーロードを活用していることを示しています。 パイプラインの柔軟性は、一連の機能をデータセットに適用する能力にあります。

See all articles