A complete asynchronous task system based on RabbitMQ and Swoole

PHPz
Release: 2023-03-07 10:22:01
Original
3760 people have browsed it

From the initial single-process consumption asynchronous task system implemented using redis to With the addition of swoole's multi-process consumption model, our asynchronous task system can finally take another step forward.

Because of the experience of the previous two simple systems, this time the asynchronous task system based on RabbitMQ is more designed. Improved, including multi-process consumption, exception retry, etc.

System introduction

A complete asynchronous task system based on RabbitMQ and Swoole

#Consumer end

ArchitectureFigure

As you can see from the picture, our system is an asynchronous task system based on

events. That is to say, when an event occurs, the producer throws the event to the scheduler, and the scheduler is responsible#. ##Query what tasks are under the event, then throw these tasks into the corresponding queue, and finally the consumer consumes the tasks in the task queue throughout the system. Mainly divided into three parts

1. Event producer, the party that generates message events

2. Task scheduler (Scheduler), responsible for registering events and scheduling tasks

3. Consumption. Worker is responsible for consuming tasks in the task queue.

Event producer is very simple and can be called directly in the business system. The code is as follows.

Task Scheduler

The scheduler mainly does two things, one is to register events, and the other is to schedule tasks.

The registration event code is as follows:

<?php
require_once DIR.&#39;/../autoload.php&#39;;
use Asynclib\Ebats\Event;
try{
    $event = new Event(&#39;order_paied&#39;);  //定义事件
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件产生的参数
    $event->publish();
}catch (Exception $exc){
    echo $exc->getMessage();
}
Copy after login

This way. Two events are registered, and each event has a task.

The specific scheduling code is very simple, so I won’t go into details. If you are interested, you can read the code

Consumer##.

#The highlight is here. The most important thing about an asynchronous task system is the consumer. Now let’s take a look at the flow chart of the Worker.

## The complete consumption process

As you can see, here we use two switches and two queues, one is responsible for processing normal tasks, namely ntask, and the other is responsible for processing tasks that need to be delayed, namely dtask. Simple. Describe the

life cycle of the next taskA complete asynchronous task system based on RabbitMQ and Swoole.

Normal task

1. Task is generated and enters the normal task exchange Exchange[ebats_core_ntask]

2. Exchange Distribute the task to the corresponding queue according to the topic3. The sub-process ntask blocks and waits for the task to be successfully obtained and executes the task4. The execution fails and a RetryException is thrown when a retry is required. When a retry is not required, a RetryException is thrown. Throws TaskException

5. The sub-process ntask catches the retry exception and throws the task to the delayed task exchanger Exchange[ebats_core_dtask]
6. Calls back the task execution information to the upper developer for saving and viewing

Delayed Task

1. The child process dtask blocks and waits for the task to be successfully obtained and executes the task
2. If the execution fails, RetryException is thrown when retry is needed, and TaskException is thrown when retry is not required.
3. The sub-process dtask captures the retry exception and throws the task to the delayed task exchange Exchange [ebats_core_dtask]
4. Calls back the task execution information to the upper-level developer for saving and viewing

Consumption The code is as follows:
//注册事件
EventManager::register('order_create', 'closeOrder', 'demo', 10);//关闭未付款订单(延迟任务)
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虚拟商品自动发货
Copy after login

Custom scheduler

Generally speaking, this is an event-based task system, so can tasks be generated directly? The answer is yes.

You only need to create a custom scheduler, implement the scheduling logic by yourself, and finally generate a task. The code is as follows:

require_once DIR.'/../autoload.php';
require_once DIR.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;

//执行结果回调函数
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){

};
$worker = new Worker($callback);  //支持多进程消费默认为1
$worker->setQueue('demo');  //队列名和事件的topic一一对应
$worker->run();
Copy after login
In this way, an orderAsync task will be generated when a message is received. You only need to start a Worker to consume this Topic.

Maybe you will think that it is enough to write the business logic code directly here, and in fact it is indeed possible. You can do this when you can tolerate a process consuming slowly. But in most cases we still want it to be consumed as soon as possible, so it is recommended that we only create tasks here, and the business logic of specific tasks is executed by workers.

The above is the detailed content of A complete asynchronous task system based on RabbitMQ and Swoole. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template