À partir du système de tâches asynchrones initial de consommation à processus unique implémenté à l'aide de redis Avec l'ajout Grâce au modèle de consommation multi-processus de swoole, notre système de tâches asynchrones peut enfin faire un pas en avant
Grâce à l'expérience des deux systèmes simples précédents, cette fois le système de tâches asynchrones basé sur RabbitMQ est mieux conçu. , y compris la consommation multi-processus, les nouvelles tentatives d'exception, etc.
Comme vous pouvez le voir sur l'image, notre système est un système de tâches asynchrones basé sur des événements C'est-à-dire que lorsqu'un événement se produit, le producteur le renvoie au planificateur, et au planificateur. le planificateur est responsable de<.>Requérez quelles tâches se trouvent sous l'événement , puis jetez ces tâches dans la file d'attente correspondante, et enfin le consommateur consomme les tâches dans la file d'attente des tâches
dans tout le système. Principalement divisé en trois parties 1. Producteur d'événements, qui est la partie qui génère les événements de message2. .
3. Le consommateur est responsable de la consommation des tâches dans la file d'attente des tâches
Planificateur de tâches
<?php require_once DIR.'/../autoload.php'; use Asynclib\Ebats\Event; try{ $event = new Event('order_paied'); //定义事件 $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数 $event->publish(); }catch (Exception $exc){ echo $exc->getMessage(); }
De cette façon, deux événements sont enregistrés, chacun avec une tâche
//注册事件 EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务) EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货
Un processus de consommation complet
Comme vous pouvez le constater, nous utilisons ici deux commutateurs et deux files d'attente, l'une est chargée du traitement des tâches normales, à savoir ntask, et l'autre est responsable du traitement des tâches qui doivent être retardées. C'est-à-dire dtâche Décrivez brièvement le
cycle de vie
1. . La tâche est générée et entre dans l'échangeur Exchange[ebats_core_ntask] de la tâche normale. 2. L'échangeur distribue les tâches dans les files d'attente correspondantes selon le sujet
3. Le sous-processus ntask se bloque et attend le. la tâche doit être obtenue avec succès et exécute la tâche 6. Rappelle les informations d'exécution de la tâche au développeur de niveau supérieur pour les enregistrer et les visualiser
Tâche retardée
1. la tâche du sous-processus se bloque et attend que la tâche soit obtenue avec succès et exécute la tâche
2. L'exécution échoue et une RetryException est levée lorsqu'une nouvelle tentative est requise. Aucune nouvelle tentative n'est requise. la tâche de sous-processus capture l'exception de nouvelle tentative et renvoie la tâche à l'échangeur de tâches retardé Exchange[ebats_core_dtask]
Planificateur personnalisé
require_once DIR.'/../autoload.php'; require_once DIR.'/task/TaskDemoModel.php'; use Asynclib\Ebats\Worker; //执行结果回调函数 $callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){ }; $worker = new Worker($callback); //支持多进程消费默认为1 $worker->setQueue('demo'); //队列名和事件的topic一一对应 $worker->run();
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!