PHPおよびRabbitMQ:高度な例
キーテイクアウト
- PHPとRabbitMQを採用して、複数の労働者の間で非同期にデータを処理し、高遷移環境での効率を向上させます。
- rabbitmqに永続的なメッセージを実装して、メッセージ「配信_mode」を2に設定し、キューを耐久性があると宣言することにより、サーバーのクラッシュ中のデータ損失を防ぐ。 rabbitmqのサービス品質(QOS)設定を使用して、労働者間のメッセージ配信を制御し、単一の労働者が圧倒されないようにします。
- RPC(リモートプロシージャコール)のRabbitMQを利用して、ユーザー認証などのタスクに役立つ応答を必要とするメッセージを送信して使用します。 RPC応答の排他的な一時的なキューを設定して、クライアントとサーバーの間でメッセージが正しくかつ安全に指示されるようにします。
- 応答の「Correlation_id」をリクエストと一致させ、返信の正しい処理と処理を確保することにより、RPC応答を効果的に処理します。
- 例1:複数の労働者の間で非同期データを処理するためのリクエストを送信
- 前の部分の例では、1人の生産者、1人の消費者がいました。 消費者が死亡した場合、消費者が再び始まるまでメッセージはキューに積み上げられ続けます。その後、すべてのメッセージを1つずつ処理します。
- これは、1分あたりかなりの量のリクエストがある同時ユーザー環境では理想的ではありません。 幸いなことに、消費者のスケーリングは非常に簡単ですが、別の例を実装しましょう。 請求書世代サービスがあるとしましょう。ユーザーは請求書番号を提供するだけで、システムはPDFファイルを自動的に生成してユーザーにメールで送信します。 生成プロセスが実行されるサーバーがリソース制限である場合、電子メールの生成と送信には数秒かかる場合があります。 ここで、1秒あたりのいくつかのトランザクションをサポートする必要があると仮定します。サーバーを圧倒することなくこれをどのように達成できますか?
プロデューサーのクラスを見てみましょう:
Workersender :: execute()メソッドは請求書番号を受け取ります。 次に、通常どおり、接続、チャネル、キューを作成します。
今回は、メッセージオブジェクトを作成しているときに、コンストラクターが2番目のパラメーターを受信していることに注意してください。 この場合、RabbitMQサーバーがクラッシュした場合、メッセージは失われるべきではないと述べたいと思います。 これが機能するためには、キューも耐久性があると宣言する必要があることに注意してください。
次のコードを使用してフォームデータを受信し、プロデューサーを実行できます。<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
快適な入力の消毒/検証のいずれかを使用してください
消費者の面で物事が少し面白くなります:
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
- :具体的な制限はありません。
- プリフェッチcount:承認を送信する前に、労働者ごとに取得するメッセージの数。これにより、労働者は一度に1つのメッセージを処理できます。
- グローバル:nullとは、上記の設定がこの消費者のみに適用されることを意味します
- 次に、パラメーターに重要な違いがあるため、消費を開始します。 RabbitMQサーバーにメッセージの処理が完了し、新しいものを受信する準備ができている場合に、自動ACKをオフにします。 さて、どうやってそのackを送信するのですか? workerreceiver :: process()メソッド(メッセージが受信されたときにコールバックメソッドとして宣言されます)をご覧ください。 generatedpdf()およびsendemail()メソッドへの呼び出しは、両方のタスクを達成するために費やされた時間をシミュレートするダミーメソッドにすぎません。 $ MSGパラメーターには、プロデューサーから送信されたペイロードが含まれているだけでなく、プロデューサーが使用するオブジェクトに関する情報も含まれています。 プロデューサーが$ MSG-> DERVILY_INFO ['Channel']で使用するチャネルに関する情報を抽出できます(これは、$ connection-> channel();)で消費者向けに開いたチャネルと同じオブジェクトタイプです。 プロデューサーのチャンネルにプロセスが完了したことを確認する必要があるため、basic_ack()メソッドを使用して、パラメーターとして配信タグ($ msg-> deleviry_info ['delivery_tag'])を送信します。 ACKがどのメッセージに属するかに正しく関連付けるには。 どのようにして労働者を解雇しますか?次のようなファイルを作成するだけで、workerreceiver :: listen()方法を呼び出します。
PHPコマンド(例:PHP Worker.phpまたは上記のファイルに与えた名前)を使用して、ワーカーを起動します。 しかし、待って、目的は2人以上の労働者がいることでしたよね?問題ありませんが、同じファイルの複数のプロセスを作成するために、同じ方法でより多くの労働者を起動します。RabbitMQは、QoSパラメーターに従って消費者を登録し、それらの間に作業を分配します。
例2:RPCリクエストを送信し、返信を期待してください
これまでのところ、ユーザーが返信を待つ必要なく、rabbitmqサーバーにメッセージを送信しています。 これは、ユーザーが「OK」メッセージを表示するためだけに費やすことをいとわないよりも時間がかかる非同期プロセスではOKです。 しかし、実際に返信が必要な場合はどうなりますか?複雑な計算の結果を使用して、ユーザーに表示できるとしましょう。
アプリケーションの残りの部分から分離された認証メカニズムとして機能する集中ログインサーバー(シングルサインオン)があるとしましょう。 このサーバーに到達する唯一の方法は、rabbitmqを使用することです。 このサーバーにログイン資格情報を送信し、助成金/拒否アクセス応答を待つ方法を実装する必要があります。次のパターンを実装する必要があります
いつものように、最初にプロデューサーを見てみましょう:
rpcsender :: executeメソッドを見ると、$ credentialsパラメーターは['username' => 'x' '、' password '=>' y ']の形式の配列であることに注意してください。 繰り返しますが、新しい接続を開き、いつものようにチャネルを作成します。
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
最初の違いは、キューを宣言することからです。 最初に、$ channel-> queue_declare()の結果をキャッチするためにlist()constructを使用していることに注意してください。これは、宣言しながらキュー名を明示的に送信しないため、このキューがどのように識別されるかを調べる必要があるためです。 結果アレイの最初の要素にのみ興味があります。これは、キューの一意の識別子(amq.gen-_u0kjvm8helfzqk9pz9ggのようなもの)になります。 2番目の変更は、このキューを排他的であると宣言する必要があることです。そのため、他の同時プロセスの結果に混同はありません。
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
次に、メッセージの相関IDを作成する必要があります。これは、各メッセージの一意の識別子にすぎません。 この例では、uniqid()の出力を使用していますが、好みのメカニズムを使用できます(人種条件を作成しない限り、強力な暗号セーフRNGである必要はありません)。
ここで、メッセージを作成しましょう。これには、最初の2つの例で慣れていたものと比較して重要な変更があります。認証する資格情報を含むJSONエンコード文字列を割り当てることは別として、AMQPMessageコンストラクターに2つのプロパティを定義した配列を提供する必要があります。
- correlation_id:メッセージのタグ
- REPLY_TO:キュー識別子が生成されたときに生成されます
メッセージを公開した後、最初に空になる応答を評価します。 応答値が空のままですが、$ channel-> wait();
チャネルから応答を受信すると、コールバックメソッドが呼び出されます(rpcsender :: onresponse())。 この方法は、受信した相関IDと生成された相関IDと一致し、それらが同じ場合、応答本体を設定し、したがってwhileループを破壊します。RPC消費者はどうですか? ここにあります:
同じ古い接続とチャネル作成:)
<span><span><?php </span></span><span><span>namespace Acme<span>\AmqpWrapper</span>; </span></span><span> </span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>; </span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>; </span></span><span> </span><span><span>class WorkerSender </span></span><span><span>{ </span></span><span> <span>/* ... SOME OTHER CODE HERE ... */ </span></span><span> </span><span> <span>/** </span></span><span><span> * Sends an invoice generation task to the workers </span></span><span><span> * </span></span><span><span> * <span>@param <span>int</span> $invoiceNum </span></span></span><span><span> */ </span></span><span> <span>public function execute($invoiceNum) </span></span><span> <span>{ </span></span><span> <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); </span></span><span> <span>$channel = $connection->channel(); </span></span><span> </span><span> <span>$channel->queue_declare( </span></span><span> <span>'invoice_queue', #queue - Queue names may be up to 255 bytes of UTF-8 characters </span></span><span> <span>false, #passive - can use this to check whether an exchange exists without modifying the server state </span></span><span> <span>true, #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart </span></span><span> <span>false, #exclusive - used by only one connection and the queue will be deleted when that connection closes </span></span><span> <span>false #auto delete - queue is deleted when last consumer unsubscribes </span></span><span> <span>); </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->basic_publish( </span></span><span> <span>$msg, #message </span></span><span> <span>'', #exchange </span></span><span> <span>'invoice_queue' #routing key (queue) </span></span><span> <span>); </span></span><span> </span><span> <span>$channel->close(); </span></span><span> <span>$connection->close(); </span></span><span> <span>} </span></span><span><span>}</span></span>
>このキューには事前定義された名前( '
rpc_queue’)があります。 自動アクセスを無効にするため、QoSパラメーターを定義します。そのため、資格情報の検証が完了して結果が得られることを通知できます。 魔法は宣言されたコールバック内から来ます。資格情報の認証が完了したら(はい、静的ユーザー名/パスワード値に対してプロセスが実行されることがわかります。このチュートリアルは、クレデンシャルを認証する方法に関するものではありません;))作成されました。 $ req-> get( 'correlation_id')でリクエストメッセージからこれを抽出し、プロデューサーで行ったのと同じようにこの値を渡すことができます。
<span><span><?php </span></span><span><span>/* ... SOME CODE HERE ... */ </span></span><span> </span><span> <span>$msg = new AMQPMessage( </span></span><span> <span>$invoiceNum, </span></span><span> <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits </span></span><span> <span>); </span></span><span> </span><span><span>/* ... SOME CODE HERE ... */</span></span>
メッセージを公開したら、$ req-> delivice_info ['channel'] - > basic_ack()でack通知をチャネルに送信する必要があります。 ]したがって、プロデューサーは待つのをやめることができます
繰り返しますが、リスニングプロセスを起動すると、準備ができています。 例2と3を組み合わせて、複数の労働者を起動するだけでスケーリングできるマルチワーカーRPCプロセスを実行するためのマルチワーカーRPCプロセスを実行することもできます。<span><span><?php </span></span><span><span>chdir(dirname(__DIR__)); </span></span><span><span>require_once('vendor/autoload.php'); </span></span><span> </span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>; </span></span><span> </span><span><span>$inputFilters = array( </span></span><span> <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT, </span></span><span><span>); </span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters); </span></span><span><span>$sender = new WorkerSender(); </span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
このチュートリアルシリーズが気に入っていて、MQやより現実的なユースケースについて詳しく見たい場合は、以下のコメントでお知らせください!
PHP Rabbitmqの高度な例に関するよくある質問(FAQ)phpにおけるrabbitmqの役割は何ですか?rabbitmqは、アプリケーションが互いに非同期に通信できるメッセージブローカーです。 PHPアプリケーションでは、高負荷と複雑なタスクをより効率的に処理できるようにすることにより、PHPアプリケーションで重要な役割を果たします。 RabbitMQは、高度なメッセージキューイングプロトコル(AMQP)を使用して、アプリケーションのさまざまな部分間のメッセージの交換を容易にします。これにより、プロセスのデカップリングが可能になり、アプリケーションがよりスケーラブルで回復力が高くなります。あなたのマシン。これは、公式のRabbitMQ Webサイトで実行できます。サーバーがインストールされた後、PHP AMQP拡張機能をインストールできます。これにより、RabbitMQと対話するために必要な機能が提供されます。これは、Command PECLインストールAMQPを使用してPECLインストーラーを使用して実行できます。 amqpchannelクラスの。このメソッドは、交換の名前、交換のタイプ(直接、トピック、ファンアウト、またはヘッダー)、パッシブ、耐久性、auto_delete、引数などのオプションのパラメーターを含むいくつかのパラメーターを取ります。 phpのrabbitmq queueにメッセージを送信しますか?phpのrabbitmqキューにメッセージを送信するには、最初に作成する必要がありますメッセージコンテンツを含むAMQPMessageクラスのインスタンス。次に、amqpchannelクラスのBasic_publishメソッドを使用して、キューにメッセージを送信できます。 BASIC_PUBLISHメソッドは、メッセージ、交換、およびルーティングキーをパラメーターとして使用します。 rabbitmq queue amqpchannelクラスのbasic_consumeメソッドを使用します。この方法では、キュー名、コンシューマータグ、NO_LOCAL、NO_ACK、排他的、およびメッセージが受信されたときに実行されるコールバック関数など、いくつかのパラメーターが必要です。 ?PHPを使用したRabbitMQでのエラー処理は、トライキャッチブロックを使用して実行できます。 PHP AMQP拡張は、エラーが発生したときにAMQPExceptionクラスの例外をスローします。これらの例外をキャッチし、アプリケーションのニーズに応じて処理できます。 AMQPMESSAGEクラスの配信のプロパティ2から2。サーバーがクラッシュまたは再起動します。
rabbitmqを使用してrabbitmqで優先キューを実装するには、rabbitmqの優先キューをPHPに実装できます。次に、メッセージを送信するときに、AMQPMessageクラスの優先プロパティを指定した最大優先度の値に設定できます。 > RabbitMQは、Reply-toプロパティをコールバックキューに設定したメッセージを送信することにより、PHPのリモートプロシージャコール(RPC)に使用できます。サーバーはコールバックキューに応答を送信でき、クライアントはそこからの応答を消費できます。 RabbitMQマネジメントプラグインは、RabbitMQサーバーの監視と管理のためのWebベースのインターフェイスを提供します。また、AMQPCHANNELクラスのメソッドを使用して、承認されていないメッセージの数など、チャネルの状態に関する情報を取得することもできます。
以上がPHPおよびRabbitMQ:高度な例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









JWTは、JSONに基づくオープン標準であり、主にアイデンティティ認証と情報交換のために、当事者間で情報を安全に送信するために使用されます。 1。JWTは、ヘッダー、ペイロード、署名の3つの部分で構成されています。 2。JWTの実用的な原則には、JWTの生成、JWTの検証、ペイロードの解析という3つのステップが含まれます。 3. PHPでの認証にJWTを使用する場合、JWTを生成および検証でき、ユーザーの役割と許可情報を高度な使用に含めることができます。 4.一般的なエラーには、署名検証障害、トークンの有効期限、およびペイロードが大きくなります。デバッグスキルには、デバッグツールの使用とロギングが含まれます。 5.パフォーマンスの最適化とベストプラクティスには、適切な署名アルゴリズムの使用、有効期間を合理的に設定することが含まれます。

記事では、PHP 5.3で導入されたPHPの後期静的結合(LSB)について説明し、より柔軟な継承を求める静的メソッドコールのランタイム解像度を可能にします。 LSBの実用的なアプリケーションと潜在的なパフォーマ

PHP開発における固体原理の適用には、次のものが含まれます。1。単一責任原則(SRP):各クラスは1つの機能のみを担当します。 2。オープンおよびクローズ原理(OCP):変更は、変更ではなく拡張によって達成されます。 3。Lischの代替原則(LSP):サブクラスは、プログラムの精度に影響を与えることなく、基本クラスを置き換えることができます。 4。インターフェイス分離原理(ISP):依存関係や未使用の方法を避けるために、細粒インターフェイスを使用します。 5。依存関係の反転原理(DIP):高レベルのモジュールと低レベルのモジュールは抽象化に依存し、依存関係噴射を通じて実装されます。

記事では、入力検証、認証、定期的な更新など、脆弱性から保護するためのフレームワークの重要なセキュリティ機能について説明します。

この記事では、フレームワークにカスタム機能を追加し、アーキテクチャの理解、拡張ポイントの識別、統合とデバッグのベストプラクティスに焦点を当てています。

PHP開発でPHPのCurlライブラリを使用してJSONデータを送信すると、外部APIと対話する必要があることがよくあります。一般的な方法の1つは、Curlライブラリを使用して投稿を送信することです。

システムが再起動した後、UnixSocketの権限を自動的に設定する方法。システムが再起動するたびに、UnixSocketの許可を変更するために次のコマンドを実行する必要があります:sudo ...
