2 番目のチュートリアルでは、時間のかかるタスクを複数のワーカーに分散するためにワーク キューを使用する方法を学びました。このチュートリアルでは、ワーク キューを使用して時間のかかるタスクを複数のワーカーに分散する方法を学習しました。
しかし、リモート コンピューター上で関数を実行して結果を待つ必要がある場合はどうなるでしょうか? それは別の話です。このパターンは一般にリモート プロシージャ コールまたは RPC として知られています。リモートコンピュータ上で関数を実行し、結果を待ちますか? 関数を実行し、その結果が返されるのを待ちますか?まあ、それは2つの異なることです。このパターンは、リモート プロシージャ コールまたは RPC と呼ばれることがよくあります。
このチュートリアルでは、RabbitMQ を使用して RPC システム (クライアントとスケーラブルな RPC サーバー) を構築します。分散する価値のある時間のかかるタスクがないため、ダミーを作成します。フィボナッチ数を返す RPC サービス
今回は、RabbitMQ を使用して RPC システム (クライアントと拡張可能な RPC サーバー) を構築します。分散するほど時間のかかるタスクはないので、フィボナッチ数列を返す疑似 RPC サービスを作成します。
クライアント インターフェイス (クライアント インターフェイス)
RPC サービスの使用方法を説明するために、RPC リクエストを送信し、応答を受信するまでブロックする call という名前のメソッドを公開します。 :
$fibonacci_rpc = new FibonacciRpcClient();$response = $fibonacci_rpc->call(30);echo " [.] Got ", $response, "\n";
RPC に関するメモ
RPC はコンピューティングでは非常に一般的なパターンですが、プログラマーが関数呼び出しがローカルであるか、それとも遅い RPC であるかを認識していない場合に問題が発生します。 RPC を誤って使用すると、システムが予測不能になり、デバッグが不必要に複雑になります。RPC を誤って使用すると、保守不能なスパゲッティ コードが生成される可能性があります。
それを念頭に置いて、次のアドバイスを検討してください:
どの関数呼び出しがローカルでどれがリモートであるかが明らかであることを確認してください。
どの関数呼び出しがローカルでどれがリモートであるかが明らかであることを確認してください。 1 つはリモートです。
システムを文書化します。コンポーネント間の依存関係を明確にします。
一般的に言えば、RabbitMQ で RPC を実行するのは簡単です。クライアントはリクエスト メッセージを送信し、サーバーはレスポンス メッセージで応答します。応答 (メッセージ) を受信するには、コールバック キュー アドレスを指定して要求を送信する必要があります。デフォルトのキューを試すことができます:
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage( $payload, array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue');# ... then code to read a response message from the callback_queue ...
AMQP プロトコルは、メッセージに付随する 14 のプロパティのセットを事前に定義します。次の例外を除き、ほとんどのプロパティはめったに使用されません。プロトコルは 14 のメッセージ属性を事前定義します。以下を除いて、ほとんどのプロパティはめったに使用されません:
上記の方法では、RPC リクエストごとにコールバック キューを作成することをお勧めします。これは非常に非効率ですが、幸いなことに、より良い方法があります。上記のクライアントごとに 1 つのコールバック キューを作成しましょう。ここで説明する方法では、RPC リクエストごとにコールバック キューを作成します。これは非常に非効率的ですが、幸いなことにもっと良い方法があります。クライアントごとにコールバック キューを 1 つだけ作成できます。
これにより、そのキューで応答を受信したときに、その応答がどのリクエストに属しているかが不明になるという新たな問題が発生します。後で、このプロパティをすべてのリクエストに対して一意の値に設定します。コールバック キューでメッセージを受信すると、このプロパティを確認し、それに基づいて応答とリクエストを照合することができます。不明な correlation_id 値が見つかった場合は、メッセージを安全に破棄できます。は私たちのリクエストに属しません
しかし、これにより、キューが応答を受信したときに、それがどのリクエストに属しているかがわかりません。ここで、correlation_id が役に立ちます。リクエストごとに一意の correlation_id を設定します。後で、(応答) メッセージがコールバック キューで受信されると、このプロパティが確認され、それに基づいて要求と応答を照合できます。不明な correlation_id が検出された場合、メッセージはリクエストに属していないため、メッセージを安全に破棄できます。
なぜエラーで失敗するのではなく、コールバック キュー内の不明なメッセージを無視する必要があるのかと疑問に思われるかもしれませんが、これはサーバー側で競合状態が発生する可能性があるためです。ただし、RPC サーバーが停止する可能性があります。応答を送信した直後であり、要求に対する確認メッセージを送信する前に、再起動された RPC サーバーが要求を再度処理することになるため、クライアントでは重複した応答を適切に処理する必要があり、RPC は理想的には次のように処理する必要があります。 idempotent.
なぜ処理が失敗してエラーを返すように設定せずに、コールバック キュー内の不明なメッセージを無視する必要があるのかと疑問に思われるかもしれません。サーバー側の障害の可能性があるためです。可能性は非常に低いですが、応答を送信した後、メッセージが確認される前に RPC サービスがクラッシュする可能性があります。その場合、再起動された RPC サービスがリクエストを再度処理します。 このため、クライアントは重複した応答を適切に処理する必要があり、RPC サービスは冪等性が最も優れています。
概要
私たちの RPC は次のように動作します:
RPC 実行プロセス:
The Fibonacci task:
斐波那契任务:
function fib($n) {
if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2);}
We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).
声明斐波那契函数。假定它只接收正整数。(别指望它能处理很大的数字,它可能是最慢的递归实现)
The code for our RPC server rpc_server.php looks like this:
rpc_server.php代码:
<?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->queue_declare('rpc_queue', false, false, false, false);function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2);}echo " [x] Awaiting RPC requests\n";$callback = function($req) { $n = intval($req->body); echo " [.] fib(", $n, ")\n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to')); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag']);};$channel->basic_qos(null, 1, null);$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);while(count($channel->callbacks)) { $channel->wait();}$channel->close();$connection->close();?>
The server code is rather straightforward:
服务端代码相当直白:
The code for our RPC client rpc_client.php:
rpc_client.php代码:
connection = new AMQPConnection( 'localhost', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } public function on_response($rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while(!$this->response) { $this->channel->wait(); } return intval($this->response); }};$fibonacci_rpc = new FibonacciRpcClient();$response = $fibonacci_rpc->call(30);echo " [.] Got ", $response, "\n";?>
Now is a good time to take a look at our full example source code for rpc_client.php and rpc_server.php.
是时候看看整个例子的源码了rpc_client.php and rpc_server.php.
Our RPC service is now ready. We can start the server:
RPC服务准备就绪,启动!
$ php rpc_server.php [x] Awaiting RPC requests
To request a fibonacci number run the client:
运行客户端请求一个斐波那契数字:
$ php rpc_client.php [x] Requesting fib(30)
The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
这里展现的设计不是RPC服务的唯一可能实现,但它却有一些重要的优势:
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
我们的代码还是忒简化,并没有想解决更为复杂(但重要)的问题,像: