2 年前に「Python で RabbitMQ を試してみる」という記事を投稿しましたが、2 年経った今でも、元の拡張機能も次々と廃止されているとは予想していませんでした。 amqp だけが残っており、PECL に含まれています。付属していますが、公式マニュアルの情報はまだ少し薄いです。
ここでは、amqp 拡張機能のインストールについては詳しく説明しません。数日前に投稿した「PHP 用 amqp 拡張機能のインストール」の記事を参照してください。
amqp をインストールしたら、コードの記述を開始できます:
ロジック:
接続の作成 --> チャネルの作成 --> スイッチの作成 --> キューの作成 --> スイッチ/キュー/ルーティング キーのバインド --> メッセージの受信
[php]
/*************************************
* PHP amqp(RabbitMQ) デモ - コンシューマ
* 著者: リンボ
* 日付: 2012/7/30
************************************/
//設定情報
$conn_args = array(
'ホスト' => '192.168.1.93',
'ポート' => '5672'、
'ログイン' => 'ゲスト'、
「パスワード」 => 「ゲスト」、
'vhost'=>'/'
);
$e_name = 'e_linvo' //スイッチ名
;
$q_name = 'q_linvo' //キュー名
;
$k_route = 'key_1' //ルーティングキー
;
//接続とチャネルを作成します
$conn = 新しい AMQPConnection($conn_args);
if (!$conn->connect()) {
Die("ブローカーに接続できません!n");
}
$channel = 新しい AMQPChannel($conn);
//スイッチを作成する
$ex = 新しい AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT) //ダイレクトタイプ
;
$ex->setFlags(AMQP_DURABLE) //永続化
;
echo "Exchange ステータス:".$ex->declare()."n";
//キューを作成する
$q = 新しい AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE) //永続化
echo "メッセージ合計:".$q->declare()."n";
//スイッチとキューをバインドし、ルーティングキーを指定します
echo 'キューバインド: '.$q->bind($e_name, $k_route)."n";
// ブロックモードでメッセージを受信します
「メッセージ:n」をエコーします。
while(True){
$q->consume('プロセスメッセージ');
//$q->consume('processMessage', AMQP_AUTOACK) //自動 ACK 応答
;
}
$conn->切断();
/**
*消費コールバック関数
* メッセージを処理する
*/
関数 processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."n" //メッセージを処理しています
;
$queue->ack($envelope->getDeliverTag()) //手動でACK応答を送信する
}
プロデューサー: メッセージを送信
ロジック:
接続の作成 --> チャネルの作成 --> スイッチ オブジェクトの作成 --> メッセージの送信
[php]
/*************************************
* PHP amqp(RabbitMQ) デモ - 発行者
* 著者: リンボ
* 日付: 2012/7/30
************************************/
//設定情報
$conn_args = array(
'ホスト' => '192.168.1.93',
'ポート' => '5672'、
'ログイン' => 'ゲスト'、
「パスワード」 => 「ゲスト」、
'vhost'=>'/'
);
$e_name = 'e_linvo' //スイッチ名
;
//$q_name = 'q_linvo' //キュー名は必要ありません
;
$k_route = 'key_1' //ルーティングキー
;
//接続とチャネルを作成します
$conn = 新しい AMQPConnection($conn_args);
if (!$conn->connect()) {
Die("ブローカーに接続できません!n");
}
$channel = 新しい AMQPChannel($conn);
//メッセージ内容
$message = "テストメッセージ! テストメッセージ!";
//スイッチオブジェクトを作成する
$ex = 新しい AMQPExchange($channel);
$ex->setName($e_name);
//メッセージを送信
//$channel->startTransaction() //トランザクションを開始します
for($i=0; $i
echo "メッセージの送信:".$ex->publish($message, $k_route)."n";
}
//$channel->commitTransaction() //トランザクションをコミットする
$conn->切断();
注意すべき点は次のとおりです:
キュー オブジェクトには、メッセージを取得するための 2 つのメソッド (consume と get) があります。
前者はブロックしており、メッセージがない場合は一時停止されるため、ループでの使用に適しています。
後者はノンブロッキングで、取得するメッセージがある場合は取得され、メッセージがない場合は false を返します。
スクリーンショットをテストする
コンシューマーを実行します:
プロデューサーを実行してメッセージを送信します:
作者: リンボ
http://www.bkjia.com/PHPjc/478053.html
www.bkjia.com
true
技術記事