Maison > développement back-end > tutoriel php > PHP et RabbitMQ: exemples avancés

PHP et RabbitMQ: exemples avancés

Jennifer Aniston
Libérer: 2025-02-19 09:44:12
original
545 Les gens l'ont consulté

PHP and RabbitMQ: Advanced Examples

PHP et RabbitMQ: exemples avancés

Dans la partie 1, nous avons couvert la théorie et un cas d'utilisation simple du protocole AMQP en PHP avec RabbitMQ en tant que courtier. Maintenant, plongeons dans des exemples plus avancés.

Les plats clés

  • Utilisez PHP et RabbitMQ pour traiter les données de manière asynchrone parmi plusieurs travailleurs, améliorant l'efficacité dans des environnements de transaction élevée.
  • Implémentez les messages persistants dans RabbitMQ pour éviter la perte de données pendant les plantages du serveur en définissant le message «Delivery_Mode» à 2 et en déclarant que les files d'attente sont durables.
  • Utilisez les paramètres de qualité de service (QoS) dans RabbitMQ pour contrôler la distribution des messages entre les travailleurs, garantissant qu'aucun travailleur n'est dépassé.
  • Utilisez RabbitMQ pour RPC (appel de procédure à distance) en envoyant des messages qui nécessitent une réponse, utile pour des tâches comme l'authentification des utilisateurs.
  • Configurer des files d'attente exclusives et temporaires pour les réponses RPC pour garantir que les messages sont dirigés correctement et en toute sécurité entre le client et le serveur.
  • Gérer efficacement les réponses RPC en faisant correspondre «Correlation_ID» dans la réponse avec la demande, en assurant la gestion et le traitement corrects des réponses.

Exemple 1: Envoyer une demande pour traiter les données de manière asynchrone parmi plusieurs travailleurs

Dans l'exemple de la partie précédente, nous avions un producteur, un consommateur. Si le consommateur mourait, les messages continueraient de s'accumuler dans la file d'attente jusqu'à ce que le consommateur recommence. Il traiterait alors tous les messages, un par un.

Cela peut être loin d'être idéal dans un environnement utilisateur simultané avec une bonne quantité de demandes par minute. Heureusement, la mise à l'échelle des consommateurs est super facile, mais mettons en œuvre un autre exemple.

Supposons que nous ayons un service de génération de factures, où les utilisateurs ont juste besoin de fournir le numéro de facture, et le système générera automatiquement un fichier PDF et l'envoyer par e-mail à l'utilisateur. La génération et l'envoi de l'e-mail peuvent prendre plusieurs secondes si le serveur sur lequel le processus de génération s'exécute est limité Resource. Supposons maintenant que nous sommes tenus de prendre en charge plusieurs transactions par seconde, comment accomplir cela sans submerger le serveur?

Nous devons implémenter le modèle suivant:

Regardons notre classe de producteurs:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

La méthode WorkerSender :: EXECUTE () recevra un numéro de facture. Ensuite, nous créons une connexion, un canal et une file d'attente comme d'habitude.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Veuillez noter que cette fois, lors de la création de l'objet de message, le constructeur reçoit un deuxième paramètre: Array ('Delivery_Mode' => 2). Dans ce cas, nous voulons indiquer que le message ne doit pas être perdu si le serveur Rabbitmq se bloque. Veuillez noter que pour que cela fonctionne, la file d'attente doit également être déclarée durable.

Le code suivant peut être utilisé pour recevoir les données du formulaire et exécuter le producteur:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Veuillez utiliser la désinfection / validation des entrées avec lesquelles vous vous sentez à l'aise.

Les choses deviennent un peu intéressantes du côté du consommateur:

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Comme d'habitude, nous devons créer une connexion, dériver un canal et déclarer une file d'attente (les paramètres de la file d'attente doivent être de même que le producteur).

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion

Pour avoir un comportement des travailleurs (répartir les messages entre plusieurs procédures), nous devons déclarer les paramètres de qualité de service (QoS) avec $ canal-> basic_qos ():

  • Taille de la pré-feste : Aucune limite spécifique, nous pourrions avoir autant de travailleurs que nous avons besoin
  • PREFETCH COUNT : Combien de messages à récupérer par travailleur avant de renvoyer un accusé de réception. Cela fera que le travailleur traite 1 message à la fois.
  • global : Null signifie que les paramètres ci-dessus s'appliqueront à ce consommateur uniquement

Ensuite, nous commencerons à consommer, avec une différence clé dans les paramètres. Nous désactiverons ACK automatique, car nous le dirons au serveur RabbitMQ lorsque nous aurons terminé le traitement du message et être prêt à en recevoir un nouveau.

Maintenant, comment envoyons-nous cet ACK? Veuillez jeter un œil à la méthode Workerreceiver :: process () (qui est déclarée comme une méthode de rappel lorsqu'un message est reçu). Les appels vers les méthodes générésPdf () et SendEmail () ne sont que des méthodes factices qui simulent le temps passé pour accomplir les deux tâches. Le paramètre $ MSG contient non seulement la charge utile envoyée par le producteur, mais contient également des informations sur les objets utilisés par le producteur. Nous pouvons extraire des informations sur le canal utilisé par le producteur avec $ msg-> livraison_info ['canal'] (qui est le même type d'objet au canal que nous avons ouvert pour le consommateur avec $ connection-> canal ();). Puisque nous devons envoyer le canal du producteur un accusé de réception que nous avons terminé le processus, nous utiliserons sa méthode basic_ack (), en envoyant en tant que paramètre la balise de livraison ($ msg-> livraison_info ['livraison_tag']) Rabbitmq généré automatiquement dans l'ordre dans l'ordre dans l'ordre dans l'ordre dans l'ordre dans l'ordre dans l'ordre dans l'ordre dans l'ordre dans Pour s'associer correctement au message auquel l'appartenance ACK appartient.

Comment pouvons-nous tirer les travailleurs? Créez simplement un fichier comme celui qui suit, invoquant la méthode workerreceiver :: écouter ():

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Copier après la connexion
Copier après la connexion

Utilisez maintenant la commande PHP (par exemple PHP Worker.php ou le nom que vous avez donné au fichier ci-dessus) pour tirer le travailleur. Mais attendez, le but était d'avoir deux ou plusieurs travailleurs, n'est-ce pas? Pas de problème, lancez plus de travailleurs de la même manière afin d'avoir plusieurs processus du même fichier, et RabbitMQ enregistrera les consommateurs et distribuera le travail entre eux en fonction des paramètres QoS.

Exemple 2: Envoyez des demandes RPC et attendez-vous à une réponse

Jusqu'à présent, nous avons envoyé des messages au serveur RabbitMQ sans que l'utilisateur n'ait à attendre une réponse. C'est OK pour les processus asynchrones qui pourraient prendre plus de temps que l'utilisateur est prêt à dépenser juste pour voir un message «OK». Mais que se passe-t-il si nous avons réellement besoin d'une réponse? Disons que certains résultats d'un calcul complexe, afin que nous puissions le montrer à l'utilisateur?

Supposons que nous ayons un serveur de connexion centralisé (signe unique) qui fonctionnera comme un mécanisme d'authentification isolé du reste de nos applications. La seule façon d'atteindre ce serveur est via RabbitMQ. Nous devons implémenter un moyen d'envoyer les informations d'identification de connexion à ce serveur et d'attendre une réponse d'accès de subvention / refuser.

Nous devons implémenter le modèle suivant:

Comme d'habitude, regardons d'abord le producteur:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

En regardant RPCSender :: Exécuter la méthode, veuillez noter que le paramètre $ Identials est un tableau sous la forme de ['username' => 'x', 'mot de passe' => 'y']. Encore une fois, nous ouvrons une nouvelle connexion et créons un canal comme d'habitude.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

La première différence vient de la déclaration de file d'attente. Remarquez d'abord que nous utilisons la construction list () pour prendre le résultat de $ canal-> queue_declare (). En effet, nous n'envoyons pas explicitement un nom de file d'attente tout en le déclarant, nous devons donc savoir comment cette file d'attente est identifiée. Nous ne sommes intéressés que par le premier élément du tableau de résultat, qui sera un identifiant unique de la file d'attente (quelque chose comme AMQ.GEN-_U0KJVM8HELFZQK9P0Z9GG). Le deuxième changement est que nous devons déclarer cette file d'attente comme exclusive, il n'y a donc pas de mélange dans les résultats d'autres processus simultanés.

Un autre grand changement est que le producteur sera également un consommateur d'une file d'attente, lors de l'exécution de $ Channel-> Basic_Consume (), veuillez remarquer que nous fournissons la valeur $ callback_queue que nous avons obtenue lors de la déclaration de la file d'attente. Et comme chaque consommateur, nous déclarerons un rappel pour exécuter lorsque le processus recevra une réponse.

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion

Ensuite, nous devons créer un ID de corrélation pour le message, ce n'est rien de plus qu'un identifiant unique pour chaque message. Dans l'exemple, nous utilisons la sortie d'Uniqid (), mais vous pouvez utiliser le mécanisme que vous préférez (tant qu'il ne crée pas de condition de course, n'a pas besoin d'être un RNG solide et à sécurité crypto).

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerReceiver
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Process incoming request to generate pdf invoices and send them through 
</span></span><span><span>     * email.
</span></span><span><span>     */ 
</span></span><span>    <span>public function listen()
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue
</span></span><span>            <span>false,              #passive
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs
</span></span><span>            <span>false,              #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false               #auto delete - the queue is deleted when all consumers have finished using it
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>/**
</span></span><span><span>         * don't dispatch a new message to a worker until it has processed and 
</span></span><span><span>         * acknowledged the previous one. Instead, it will dispatch it to the 
</span></span><span><span>         * next worker that is not still busy.
</span></span><span><span>         */
</span></span><span>        <span>$channel->basic_qos(
</span></span><span>            <span>null,   #prefetch size - prefetch window size in octets, null meaning "no specific limit"
</span></span><span>            <span>1,      #prefetch count - prefetch window in terms of whole messages
</span></span><span>            <span>null    #global - global=null to mean that the QoS settings should apply per-consumer, global=true to mean that the QoS settings should apply per-channel
</span></span><span>            <span>);
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * indicate interest in consuming messages from a particular queue. When they do 
</span></span><span><span>         * so, we say that they register a consumer or, simply put, subscribe to a queue.
</span></span><span><span>         * Each consumer (subscription) has an identifier called a consumer tag
</span></span><span><span>         */ 
</span></span><span>        <span>$channel->basic_consume(
</span></span><span>            <span>'invoice_queue',        #queue
</span></span><span>            <span>'',                     #consumer tag - Identifier for the consumer, valid within the current channel. just string
</span></span><span>            <span>false,                  #no local - TRUE: the server will not send messages to the connection that published them
</span></span><span>            <span>false,                  #no ack, false - acks turned on, true - off.  send a proper acknowledgment from the worker, once we're done with a task
</span></span><span>            <span>false,                  #exclusive - queues may only be accessed by the current connection
</span></span><span>            <span>false,                  #no wait - TRUE: the server will not respond to the method. The client should not wait for a reply method
</span></span><span>            <span>array($this, 'process') #callback
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>while(count($channel->callbacks)) {
</span></span><span>            <span>$this->log->addInfo('Waiting for incoming messages');
</span></span><span>            <span>$channel->wait();
</span></span><span>        <span>}
</span></span><span>        
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * process received request
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param AMQPMessage $msg
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function process(AMQPMessage $msg)
</span></span><span>    <span>{
</span></span><span>        <span>$this->generatePdf()->sendEmail();
</span></span><span>        
</span><span>        <span>/**
</span></span><span><span>         * If a consumer dies without sending an acknowledgement the AMQP broker 
</span></span><span><span>         * will redeliver it to another consumer or, if none are available at the 
</span></span><span><span>         * time, the broker will wait until at least one consumer is registered 
</span></span><span><span>         * for the same queue before attempting redelivery
</span></span><span><span>         */ 
</span></span><span>        <span>$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Generates invoice's pdf
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function generatePdf()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking a pdf generation processing time.  This will take between
</span></span><span><span>         * 2 and 5 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(2, 5));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends email
</span></span><span><span>     * 
</span></span><span><span>     * <span>@return WorkerReceiver
</span></span></span><span><span>     */ 
</span></span><span>    <span>private function sendEmail()
</span></span><span>    <span>{
</span></span><span>        <span>/**
</span></span><span><span>         * Mocking email sending time.  This will take between 1 and 3 seconds
</span></span><span><span>         */ 
</span></span><span>        <span>sleep(mt_rand(1,3));
</span></span><span>        <span>return $this;
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Copier après la connexion
Copier après la connexion

Créons maintenant un message, qui a des changements importants par rapport à ce que nous étions habitués dans les 2 premiers exemples. En plus d'attribuer une chaîne codée JSON contenant les informations d'identification que nous voulons authentifier, nous devons fournir au constructeur AMQPMessage un tableau avec deux propriétés définies:

  • corrélation_id : une balise pour le message
  • Répondre_to : l'identifiant de file d'attente généré tout en le déclarant

Après avoir publié le message, nous évaluerons la réponse, qui sera vide au début. Bien que la valeur de réponse reste vide, nous attendrons une réponse du canal avec $ canal-> wait ();.

Une fois que nous aurons reçu une réponse du canal, la méthode de rappel sera invoquée (RPCSender :: OnResponse ()). Cette méthode correspondra à l'ID de corrélation reçue contre celui généré, et s'ils sont les mêmes, définira le corps de réponse, brisant ainsi la boucle while.

qu'en est-il du consommateur RPC? Ici, c'est:

<span><span><?php
</span></span><span><span>namespace Acme<span>\AmqpWrapper</span>;
</span></span><span>
</span><span><span>use PhpAmqpLib<span>\Connection\AMQPConnection</span>;
</span></span><span><span>use PhpAmqpLib<span>\Message\AMQPMessage</span>;
</span></span><span>
</span><span><span>class WorkerSender
</span></span><span><span>{
</span></span><span>    <span>/* ... SOME OTHER CODE HERE ... */
</span></span><span>    
</span><span>    <span>/**
</span></span><span><span>     * Sends an invoice generation task to the workers
</span></span><span><span>     * 
</span></span><span><span>     * <span>@param <span>int</span> $invoiceNum
</span></span></span><span><span>     */ 
</span></span><span>    <span>public function execute($invoiceNum)
</span></span><span>    <span>{
</span></span><span>        <span>$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
</span></span><span>        <span>$channel = $connection->channel();
</span></span><span>        
</span><span>        <span>$channel->queue_declare(
</span></span><span>            <span>'invoice_queue',    #queue - Queue names may be up to 255 bytes of UTF-8 characters
</span></span><span>            <span>false,              #passive - can use this to check whether an exchange exists without modifying the server state
</span></span><span>            <span>true,               #durable, make sure that RabbitMQ will never lose our queue if a crash occurs - the queue will survive a broker restart
</span></span><span>            <span>false,              #exclusive - used by only one connection and the queue will be deleted when that connection closes
</span></span><span>            <span>false               #auto delete - queue is deleted when last consumer unsubscribes
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->basic_publish(
</span></span><span>            <span>$msg,               #message 
</span></span><span>            <span>'',                 #exchange
</span></span><span>            <span>'invoice_queue'     #routing key (queue)
</span></span><span>            <span>);
</span></span><span>            
</span><span>        <span>$channel->close();
</span></span><span>        <span>$connection->close();
</span></span><span>    <span>}
</span></span><span><span>}</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

Même connexion ancienne et création de canaux :)

Identique à déclarer la file d'attente, mais cette file d'attente aura un nom prédéfini (' rpc_queue ‘). Nous définirons les paramètres de QoS car nous désactiverons les ACK automatique, nous pouvons donc informer quand nous aurons terminé la vérification des informations d'identification et avons un résultat.

<span><span><?php
</span></span><span><span>/* ... SOME CODE HERE ... */
</span></span><span>
</span><span>        <span>$msg = new AMQPMessage(
</span></span><span>            <span>$invoiceNum,
</span></span><span>            <span>array('delivery_mode' => 2) # make message persistent, so it is not lost if server crashes or quits
</span></span><span>            <span>);
</span></span><span>
</span><span><span>/* ... SOME CODE HERE ... */</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion
Copier après la connexion

La magie vient de l'intérieur du rappel déclaré. Une fois que nous avons fini d'authentifier les informations d'identification (oui, je sais que le processus se fait par rapport aux valeurs statiques de nom d'utilisateur / mot de passe, ce didacticiel ne concerne pas comment authentifier les informations d'identification;)), nous devons créer le message de retour avec le même ID de corrélation au producteur créé. Nous pouvons extraire cela du message de demande avec $ req-> get ('corrélation_id'), passant cette valeur de la même manière que nous l'avons fait chez le producteur.

<span><span><?php
</span></span><span><span>chdir(dirname(__DIR__));
</span></span><span><span>require_once('vendor/autoload.php');
</span></span><span>
</span><span><span>use Acme<span>\AmqpWrapper\WorkerSender</span>;
</span></span><span>
</span><span><span>$inputFilters = array(
</span></span><span>    <span>'invoiceNo' => FILTER_SANITIZE_NUMBER_INT,
</span></span><span><span>);
</span></span><span><span>$input = filter_input_array(INPUT_POST, $inputFilters);
</span></span><span><span>$sender = new WorkerSender();
</span></span><span><span>$sender->execute($input['invoiceNo']);</span></span>
Copier après la connexion
Copier après la connexion
Copier après la connexion

Maintenant, nous devons publier ce message dans la même file d'attente qui a été créé dans le producteur (celui du nom «aléatoire»). Nous extraissons le nom de la file d'attente avec $ req-> get ('Reply_To') et l'utilisons comme clé de routage dans Basic_Publish ().

Une fois que nous avons publié le message, nous devons envoyer le préavis ACK à la chaîne avec $ req-> Delivery_info ['Channel'] -> Basic_ack (), en utilisant la balise de livraison dans $ req-> Delivery_info ['Delivery_Tag' ] Ainsi, le producteur peut arrêter d'attendre.

Encore une fois, lancez un processus d'écoute et vous êtes prêt à partir. Vous pouvez même combiner des exemples 2 et 3 pour avoir un processus RPC multi-travailleur pour effectuer les demandes d'authentification que ce qui peut être mis à l'échelle simplement en tirant plusieurs travailleurs.

Il y a beaucoup plus à dire sur RabbitMQ et AMQP, comme les hôtes virtuels, les types d'échange, l'administration du serveur, etc… vous pouvez trouver plus de modèles d'application (comme le routage, des sujets) ici et sur la page de documentation. Il existe également un outil de ligne de commande pour gérer RabbitMQ, ainsi qu'une interface Web.

Si vous aimiez cette série de tutoriels et que vous souhaitez en voir plus sur MQ et plus de cas d'utilisation du monde réel, veuillez nous en informer dans les commentaires ci-dessous!

Questions fréquemment posées (FAQ) sur les exemples avancés PHP Rabbitmq

Quel est le rôle de RabbitMQ dans PHP?

RabbitMQ est un courtier de messages qui permet aux applications de communiquer entre eux de manière asynchrone. Il joue un rôle crucial dans les applications PHP en leur permettant de gérer des charges élevées et des tâches complexes plus efficacement. RabbitMQ utilise le protocole de file d'attente de messages avancé (AMQP) pour faciliter l'échange de messages entre différentes parties d'une application. Cela permet le découplage des processus, ce qui rend l'application plus évolutive et résiliente.

Comment installer RabbitMQ pour php?

votre machine. Cela peut être fait via le site officiel de RabbitMQ. Une fois le serveur installé, vous pouvez installer l'extension PHP AMQP, qui fournit les fonctions nécessaires pour interagir avec RabbitMQ. Cela peut être fait en utilisant l'installateur PECL avec la commande PECL Installer AMQP.

Comment puis-je créer un échange RabbitMQ dans PHP?

En php, vous pouvez créer un échange Rabbitmq en utilisant la méthode Exchange_declare de la classe AMQPChannel. Cette méthode prend plusieurs paramètres, y compris le nom de l'échange, le type de l'échange (direct, le sujet, le fanout ou les en-têtes) et les paramètres facultatifs tels que passif, durable, auto_delete et arguments.

Comment Dois-je envoyer un message à une file d'attente Rabbitmq dans PHP?

Classe AMQPMessage avec le contenu du message. Ensuite, vous pouvez utiliser la méthode Basic_Publish de la classe AMQPChannel pour envoyer le message à la file d'attente. La méthode de base_publish prend le message, l'échange et la clé de routage comme paramètres.

Comment puis-je consommer des messages à partir d'une file d'attente de lapin dans PHP?

En php, vous pouvez consommer des messages à partir d'un Fitre Rabbitmq à l'aide de la méthode Basic_Consume de la classe AMQPChannel. Cette méthode prend plusieurs paramètres, y compris le nom de la file d'attente, la balise de consommation, NO_Local, NO_ACK, exclusive et une fonction de rappel qui sera exécutée lorsqu'un message est reçu.

Comment puis-je gérer les erreurs dans Rabbitmq avec PHP ?

La gestion des erreurs dans RabbitMQ avec PHP peut être effectuée à l'aide de blocs de capture d'essai. L'extension PHP AMQP lance des exceptions de la classe AMQPException lorsqu'une erreur se produit. Vous pouvez attraper ces exceptions et les gérer en fonction des besoins de votre application.

Comment puis-je assurer la durabilité des messages dans RabbitMQ avec PHP?

Pour assurer la durabilité des messages dans RabbitMQ avec PHP, vous pouvez définir le Delivery_Mode Propriété de la classe AMQPMessage à 2. Cela fera que RabbitMQ stockera le message sur le disque, en s'assurant qu'il ne sera pas perdu même si le serveur Rabbitmq se bloque ou redémarre.

Comment puis-je implémenter les files d'attente prioritaires dans RabbitMQ avec PHP?

Les files d'attente de priorité dans RabbitMQ peuvent être implémentées en PHP en définissant l'argument X-MAX-priority lors de la déclaration de la file d'attente. Ensuite, lors de l'envoi d'un message, vous pouvez définir la propriété prioritaire de la classe AMQPMessage sur une valeur entre 0 et la priorité maximale que vous avez spécifiée.

Comment puis-je utiliser RabbitMQ pour RPC dans PHP?

RabbitMQ peut être utilisé pour l'appel de procédure distante (RPC) dans PHP en envoyant un message avec une propriété Répondre-to définie dans une file d'attente de rappel. Le serveur peut ensuite envoyer la réponse à la file d'attente de rappel, et le client peut consommer la réponse à partir de là.

Comment puis-je surveiller RabbitMQ dans PHP?

La surveillance du lapin de lapin dans PHP peut être effectuée en utilisant Le plugin de gestion RabbitMQ, qui fournit une interface Web pour surveiller et gérer votre serveur RabbitMQ. Vous pouvez également utiliser les méthodes de la classe AMQPChannel pour obtenir des informations sur l'état du canal, comme le nombre de messages non reconnus.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal