목차
Remote procedure call (RPC)
(using php-amqplib)
Client interface(客户端接口)
A note on RPC(关于RPC需要注意一点)
Callback queue(回调队列)
Message properties(消息属性)
Correlation Id(关联ID)
Summary
Putting it all together(合体!!!终极~\(???)/~啦啦啦!)
백엔드 개발 PHP 튜토리얼 译-PHP rabbitMQ Tutorial-6

译-PHP rabbitMQ Tutorial-6

Jun 23, 2016 pm 01:59 PM

Remote procedure call (RPC)

(using php-amqplib)


In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.

在第二个指导中,我们学习了如何运用工作队列在多个worker之间分发耗时的任务。

But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.

但是假使我们需要运行一个远程的函数并等待其返回结果时怎么办?好吧,那是两码事。这种模式通常被称为远程过程调用或者RPC。

In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.

这回我们要用RabbitMQ打造一个RPC系统:一个客户端和一个可扩展的RPC服务端。由于我们没有什么值得分发的耗时任务,所以我们将创建一个伪RPC服务,用来返回斐波那契数列。

Client interface(客户端接口)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

我们将创建一个简单的客户端类来阐述RPC服务是如何使用的。这个类将揭示方法调用??发送一个RPC请求并阻塞直至收到回复。

$fibonacci_rpc = new FibonacciRpcClient();$response = $fibonacci_rpc->call(30);echo " [.] Got ", $response, "\n";
로그인 후 복사

A note on RPC(关于RPC需要注意一点)

Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.

尽管RPC在电脑运算中很常见,但它十分挑剔。这个问题出现的原因是程序员不知道是否调用一个本地的方法或是否是一个很慢的RPC。这样的困惑便导致不可预测的系统并增加不必要的调试复杂性。比起简化的软件,误用RPC会导致不可维护的无头绪代码。

Bearing that in mind, consider the following advice:

铭记刚才问题,考虑下面的建议:

  • Make sure it's obvious which function call is local and which is remote.
  • 确保可以明显的看出哪个方法调用的是本地的哪个是远程的。
  • Document your system. Make the dependencies between components clear.
  • 系统文档化。让组件之间的依赖变得清晰可见。
  • Handle error cases. How should the client react when the RPC server is down for a long time?
  • 错误处理。当RPC服务长时间关闭客户端该作何反应?
  • When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.

    如果有疑问,则尽量避免使用RPC。如果可以话,你应该使用异步管道??而不是RPC??像阻塞,结果被异步推送到下个计算阶段。

    Callback queue(回调队列)

    In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request. We can use the default queue. Let's try it:

    一般来讲在RabbitMQ上搞RPC很容易??客户端发送请求消息,服务端回复响应消息。为了收到响应(消息)我们得在发送请求时附带一个回调队列地址。可以试试默认队列:

    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage(    $payload,    array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue');# ... then code to read a response message from the callback_queue ...
    로그인 후 복사

    Message properties(消息属性)

    The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:

    AMQP协议预定义了14个消息属性。大部分属性都很少用到,下面的除外:

  • delivery_mode: Marks a message as persistent (with a value of 2) or transient (1). You may remember this property from the second tutorial.
  • delivery_mode:设置为2表示持久化,1为临时的。你可能还记得我们在第二节指导的时候使用过。
  • content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • content_type:用来表述编码mime-type,例如常用的JSON编码,良好的做法是设置这个属性为:application/json
  • reply_to: Commonly used to name a callback queue.
  • reply_to:常用作回调队列名。
  • correlation_id: Useful to correlate RPC responses with requests.
  • correlation_id: 用来关联RPC的请求与响应。
  • Correlation Id(关联ID)

    In the method presented above we suggest creating a callback queue for every RPC request. That's pretty inefficient, but fortunately there is a better way - let's create a single callback queue per client.

    上面的方法我们暗示为每个RPC请求创建一个回调队列。这非常低效,但幸运的是,有一种更好的方式??我们可以为每个客户端只创建一个回调队列。

    That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlation_id property is used. We're going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we'll be able to match a response with a request. If we see an unknown correlation_id value, we may safely discard the message - it doesn't belong to our requests.

    可这将带来新的问题,队列收到一个响应时,我们不清楚它属于哪个请求。这正是correlation_id发挥作用的地方。我们将为每一个请求设置一个唯一correlation_id。之后,当在回调队列中收到(响应)消息的时候我们来查看这个属性,基于它我们就可以把请求和响应进行匹配。要是我们检测到未知correlation_id,可能会安全丢弃这条消息??因为它不属于我们的请求嘛。

    You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.

    你可能会问,为喵我们该忽略回调队列中的未知消息呢,而不是置为处理失败并返回一个错误?是因为存在服务端紊乱的可能性。尽管几率很小,可还是有可能??RPC服务在给我们发送完响应后宕掉,但还没来得进行消息确认。那样的话,重启的RPC服务会再次处理这个请求。 这也就是为喵客户端必须优雅地处理重复响应,而RPC服务最好的幂等的。

    Summary

    Our RPC will work like this:

    RPC运行流程:

  • When the Client starts up, it creates an anonymous exclusive callback queue.
  • 当客户端启动,便将创建一个匿名的专用回调队列。
  • For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • RPC请求中,客户端消息发送带有两个属性:reply_to用来设置回调队列,和correlation_id用来唯一标示每一个请求。
  • The request is sent to an rpc_queue queue.
  • 请求被发送到一个叫做rpc_queue的队列。
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • RPC worker(也叫:服务)在rpc_queue上守护,等待请求。每当来一个请求,它会进行处理并将结果以消息形式发送到客户端,经由reply_to指定的队列。
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.
  • 客户端在回调队列中等待数据。当出现一条消息,就会检查correlation_id属性。如果和请求时的correlation_id匹配,便会将响应返回给应用程序。
  • Putting it all together(合体!!!终极~\(???)/~啦啦啦!)

    The Fibonacci task:

    斐波那契任务:

    function fib($n) {
    로그인 후 복사

        if ($n == 0)        return 0;    if ($n == 1)        return 1;    return fib($n-1) + fib($n-2);}
    로그인 후 복사

    We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).

    声明斐波那契函数。假定它只接收正整数。(别指望它能处理很大的数字,它可能是最慢的递归实现)

    The code for our RPC server rpc_server.php looks like this:

    rpc_server.php代码:

    <?phprequire_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPConnection;use PhpAmqpLib\Message\AMQPMessage;$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');$channel = $connection->channel();$channel->queue_declare('rpc_queue', false, false, false, false);function fib($n) {    if ($n == 0)        return 0;    if ($n == 1)        return 1;    return fib($n-1) + fib($n-2);}echo " [x] Awaiting RPC requests\n";$callback = function($req) {    $n = intval($req->body);    echo " [.] fib(", $n, ")\n";    $msg = new AMQPMessage(        (string) fib($n),        array('correlation_id' => $req->get('correlation_id'))        );    $req->delivery_info['channel']->basic_publish(        $msg, '', $req->get('reply_to'));    $req->delivery_info['channel']->basic_ack(        $req->delivery_info['delivery_tag']);};$channel->basic_qos(null, 1, null);$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);while(count($channel->callbacks)) {    $channel->wait();}$channel->close();$connection->close();?>
    로그인 후 복사

    The server code is rather straightforward:

    服务端代码相当直白:

  • As usual we start by establishing the connection, channel and declaring the queue.
  • 一如既往先建立连接,频道,然后声明队列。
  • We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetch_count setting in $channel.basic_qos.
  • 没准我们想运行多个服务端进程。为了能在多个服务之间均等负载,我们得在$channel.basic_qos中设置prefech_count。
  • We use basic_consume to access the queue. Then we enter the while loop in which we wait for request messages, do the work and send the response back.
  • 使用basic_consume访问队列。然后进入while循环等待请求消息,处理消息,返回响应信息。
  • The code for our RPC client rpc_client.php:

    rpc_client.php代码:

    connection = new AMQPConnection(            'localhost', 5672, 'guest', 'guest');        $this->channel = $this->connection->channel();        list($this->callback_queue, ,) = $this->channel->queue_declare(            "", false, false, true, false);        $this->channel->basic_consume(            $this->callback_queue, '', false, false, false, false,            array($this, 'on_response'));    }    public function on_response($rep) {        if($rep->get('correlation_id') == $this->corr_id) {            $this->response = $rep->body;        }    }    public function call($n) {        $this->response = null;        $this->corr_id = uniqid();        $msg = new AMQPMessage(            (string) $n,            array('correlation_id' => $this->corr_id,                  'reply_to' => $this->callback_queue)            );        $this->channel->basic_publish($msg, '', 'rpc_queue');        while(!$this->response) {            $this->channel->wait();        }        return intval($this->response);    }};$fibonacci_rpc = new FibonacciRpcClient();$response = $fibonacci_rpc->call(30);echo " [.] Got ", $response, "\n";?>
    로그인 후 복사

    Now is a good time to take a look at our full example source code for rpc_client.php and rpc_server.php.

    是时候看看整个例子的源码了rpc_client.php and rpc_server.php.

    Our RPC service is now ready. We can start the server:

    RPC服务准备就绪,启动!

    $ php rpc_server.php [x] Awaiting RPC requests
    로그인 후 복사

    To request a fibonacci number run the client:

    运行客户端请求一个斐波那契数字:

    $ php rpc_client.php [x] Requesting fib(30)
    로그인 후 복사

    The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:

    这里展现的设计不是RPC服务的唯一可能实现,但它却有一些重要的优势:

  • If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.php in a new console.
  • 如果RPC服务太慢,你可以按比例增加运行其数量。试试在新控制台运行第二个rpc_server.php服务
  • On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request.
  • 在客户端,RPC要求只发送和接收一条消息。必须没有像队列声明一样的同步调用。结果呢,对于单一的RPC请求,客户端仅需要一个网络往返。
  • Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:

    我们的代码还是忒简化,并没有想解决更为复杂(但重要)的问题,像:

  • How should the client react if there are no servers running?
  • 要是没有服务端在守护,客户端作何反应?
  • Should a client have some kind of timeout for the RPC?
  • RPC客户端是不是该有一些超时设置?
  • If the server malfunctions and raises an exception, should it be forwarded to the client?
  • 如果服务端障碍引发异常,是否该把它发送至客户端呢?
  • Protecting against invalid incoming messages (eg checking bounds, type) before processing.
  • 处理前防止无效消息的进入(例如:检查界限,类型)
  • If you want to experiment, you may find the rabbitmq-management plugin useful for viewing the queues.

    想尝试吗? rabbitmq-management plugin 这里你可能会发现一些有用的插件来查看队列。

    본 웹사이트의 성명
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

    핫 AI 도구

    Undresser.AI Undress

    Undresser.AI Undress

    사실적인 누드 사진을 만들기 위한 AI 기반 앱

    AI Clothes Remover

    AI Clothes Remover

    사진에서 옷을 제거하는 온라인 AI 도구입니다.

    Undress AI Tool

    Undress AI Tool

    무료로 이미지를 벗다

    Clothoff.io

    Clothoff.io

    AI 옷 제거제

    Video Face Swap

    Video Face Swap

    완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

    뜨거운 도구

    메모장++7.3.1

    메모장++7.3.1

    사용하기 쉬운 무료 코드 편집기

    SublimeText3 중국어 버전

    SublimeText3 중국어 버전

    중국어 버전, 사용하기 매우 쉽습니다.

    스튜디오 13.0.1 보내기

    스튜디오 13.0.1 보내기

    강력한 PHP 통합 개발 환경

    드림위버 CS6

    드림위버 CS6

    시각적 웹 개발 도구

    SublimeText3 Mac 버전

    SublimeText3 Mac 버전

    신 수준의 코드 편집 소프트웨어(SublimeText3)

    세션 납치는 어떻게 작동하며 PHP에서 어떻게 완화 할 수 있습니까? 세션 납치는 어떻게 작동하며 PHP에서 어떻게 완화 할 수 있습니까? Apr 06, 2025 am 12:02 AM

    세션 납치는 다음 단계를 통해 달성 할 수 있습니다. 1. 세션 ID를 얻으십시오. 2. 세션 ID 사용, 3. 세션을 활성 상태로 유지하십시오. PHP에서 세션 납치를 방지하는 방법에는 다음이 포함됩니다. 1. 세션 _regenerate_id () 함수를 사용하여 세션 ID를 재생산합니다. 2. 데이터베이스를 통해 세션 데이터를 저장하십시오.

    JWT (JSON Web Tokens) 및 PHP API의 사용 사례를 설명하십시오. JWT (JSON Web Tokens) 및 PHP API의 사용 사례를 설명하십시오. Apr 05, 2025 am 12:04 AM

    JWT는 주로 신분증 인증 및 정보 교환을 위해 당사자간에 정보를 안전하게 전송하는 데 사용되는 JSON을 기반으로 한 개방형 표준입니다. 1. JWT는 헤더, 페이로드 및 서명의 세 부분으로 구성됩니다. 2. JWT의 작업 원칙에는 세 가지 단계가 포함됩니다. JWT 생성, JWT 확인 및 Parsing Payload. 3. PHP에서 인증에 JWT를 사용하면 JWT를 생성하고 확인할 수 있으며 사용자 역할 및 권한 정보가 고급 사용에 포함될 수 있습니다. 4. 일반적인 오류에는 서명 검증 실패, 토큰 만료 및 대형 페이로드가 포함됩니다. 디버깅 기술에는 디버깅 도구 및 로깅 사용이 포함됩니다. 5. 성능 최적화 및 모범 사례에는 적절한 시그니처 알고리즘 사용, 타당성 기간 설정 합리적,

    확실한 원칙과 PHP 개발에 적용되는 방법을 설명하십시오. 확실한 원칙과 PHP 개발에 적용되는 방법을 설명하십시오. Apr 03, 2025 am 12:04 AM

    PHP 개발에서 견고한 원칙의 적용에는 다음이 포함됩니다. 1. 단일 책임 원칙 (SRP) : 각 클래스는 하나의 기능 만 담당합니다. 2. Open and Close Principle (OCP) : 변경은 수정보다는 확장을 통해 달성됩니다. 3. Lisch의 대체 원칙 (LSP) : 서브 클래스는 프로그램 정확도에 영향을 미치지 않고 기본 클래스를 대체 할 수 있습니다. 4. 인터페이스 격리 원리 (ISP) : 의존성 및 사용되지 않은 방법을 피하기 위해 세밀한 인터페이스를 사용하십시오. 5. 의존성 반전 원리 (DIP) : 높고 낮은 수준의 모듈은 추상화에 의존하며 종속성 주입을 통해 구현됩니다.

    phpstorm에서 CLI 모드를 디버그하는 방법은 무엇입니까? phpstorm에서 CLI 모드를 디버그하는 방법은 무엇입니까? Apr 01, 2025 pm 02:57 PM

    phpstorm에서 CLI 모드를 디버그하는 방법은 무엇입니까? PHPStorm으로 개발할 때 때때로 CLI (Command Line Interface) 모드에서 PHP를 디버그해야합니다 ...

    시스템 재시작 후 UnixSocket의 권한을 자동으로 설정하는 방법은 무엇입니까? 시스템 재시작 후 UnixSocket의 권한을 자동으로 설정하는 방법은 무엇입니까? Mar 31, 2025 pm 11:54 PM

    시스템이 다시 시작된 후 UnixSocket의 권한을 자동으로 설정하는 방법. 시스템이 다시 시작될 때마다 UnixSocket의 권한을 수정하려면 다음 명령을 실행해야합니다.

    PHP에서 늦은 정적 결합을 설명하십시오 (정적 : :). PHP에서 늦은 정적 결합을 설명하십시오 (정적 : :). Apr 03, 2025 am 12:04 AM

    정적 바인딩 (정적 : :)는 PHP에서 늦은 정적 바인딩 (LSB)을 구현하여 클래스를 정의하는 대신 정적 컨텍스트에서 호출 클래스를 참조 할 수 있습니다. 1) 구문 분석 프로세스는 런타임에 수행됩니다. 2) 상속 관계에서 통화 클래스를 찾아보십시오. 3) 성능 오버 헤드를 가져올 수 있습니다.

    PHP의 CURL 라이브러리를 사용하여 JSON 데이터가 포함 된 게시물 요청을 보내는 방법은 무엇입니까? PHP의 CURL 라이브러리를 사용하여 JSON 데이터가 포함 된 게시물 요청을 보내는 방법은 무엇입니까? Apr 01, 2025 pm 03:12 PM

    PHP 개발에서 PHP의 CURL 라이브러리를 사용하여 JSON 데이터를 보내면 종종 외부 API와 상호 작용해야합니다. 일반적인 방법 중 하나는 컬 라이브러리를 사용하여 게시물을 보내는 것입니다 ...

    See all articles