この記事の内容は、Swoole と Swoft のソースコード解析 (タスク配信/スケジュールされたタスク) について、Swoole と Swoft の概要を紹介するものであり、一定の参考価値があり、困っている友人が参照することができます。
Swoft
のタスク関数は、Swoole
の タスク メカニズム
、または Swoft に基づいています。
Task
メカニズムの本質は、Swoole
の Task メカニズム
のカプセル化と拡張です。
//Swoft\Task\Task.php class Task { /** * Deliver coroutine or async task * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * * @return bool|array * @throws TaskException */ public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3) { $data = TaskHelper::pack($taskName, $methodName, $params, $type); if(!App::isWorkerStatus() && !App::isCoContext()){ return self::deliverByQueue($data);//见下文Command章节 } if(!App::isWorkerStatus() && App::isCoContext()){ throw new TaskException('Please deliver task by http!'); } $server = App::$server->getServer(); // Delier coroutine task if ($type == self::TYPE_CO) { $tasks[0] = $data; $prifleKey = 'task' . '.' . $taskName . '.' . $methodName; App::profileStart($prifleKey); $result = $server->taskCo($tasks, $timeout); App::profileEnd($prifleKey); return $result; } // Deliver async task return $server->task($data); } }
タスク配信Task::deliver()
呼び出しパラメータをパッケージ化し、$type
パラメータに従って渡しますSwoole
#$server->taskCo() または
$server->task() インターフェイスは
Task プロセス に配信されます。
Task 自体は常に同期的に実行されます。
$type は配信操作の動作にのみ影響します。
Task::TYPE_ASYNC は
$ サーバーに対応します->task() は非同期配信です。
Task::deliver() は呼び出された直後に戻ります。
Task::TYPE_CO は
$server-> に対応します。 ;taskCo() はコルーチンの配信です。配信後、コルーチンのコントロールは放棄されます。
Task::deliver() は、タスクが完了するか実行がタイムアウトするまでコルーチンから戻りません。 。
//Swoft\Task\Bootstrap\Listeners\TaskEventListener /** * The listener of swoole task * @SwooleListener({ * SwooleEvent::ON_TASK, * SwooleEvent::ON_FINISH, * }) */ class TaskEventListener implements TaskInterface, FinishInterface { /** * @param \Swoole\Server $server * @param int $taskId * @param int $workerId * @param mixed $data * @return mixed * @throws \InvalidArgumentException */ public function onTask(Server $server, int $taskId, int $workerId, $data) { try { /* @var TaskExecutor $taskExecutor*/ $taskExecutor = App::getBean(TaskExecutor::class); $result = $taskExecutor->run($data); } catch (\Throwable $throwable) { App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine())); $result = false; // Release system resources App::trigger(AppEvent::RESOURCE_RELEASE); App::trigger(TaskEvent::AFTER_TASK); } return $result; } }
swoole.onTask のイベント コールバックです。その役割は、配信された
Worker プロセスをパッケージ化することだけです。データは次のとおりです。
TaskExecutorに転送されます。
Swoole の
Task メカニズムの本質は、
Worker プロセス が時間のかかるタスクを同期された
Task プロセスに配信することです。 (
TaskWorker とも呼ばれます) 処理のため、
swoole.onTask のイベント コールバックが
Task プロセス で実行されます。前述したように、
ワーカー プロセスは、
HTTP サービス コードのほとんどが実行される環境ですが、
TaskEventListener.onTask() メソッドから始まり、コードの実行環境 これらはすべて
Task プロセス です。つまり、
TaskExecutor と特定の
TaskBean は
Task プロセス ## で実行されます。 #。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">//Swoft\Task\TaskExecutor
/**
* The task executor
*
* @Bean()
*/
class TaskExecutor
{
/**
* @param string $data
* @return mixed
*/
public function run(string $data)
{
$data = TaskHelper::unpack($data);
$name = $data['name'];
$type = $data['type'];
$method = $data['method'];
$params = $data['params'];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$collector = TaskCollector::getCollector();
if (!isset($collector['task'][$name])) {
return false;
}
list(, $coroutine) = $collector['task'][$name];
$task = App::getBean($name);
if ($coroutine) {
$result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
} else {
$result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
}
return $result;
}
}</pre><div class="contentsignin">ログイン後にコピー</div></div>
タスク実行のアイデアは非常にシンプルです。
によって送信されたデータを解凍し、元の呼び出しパラメーターに復元し、 に基づいて対応する ## を見つけます。 $name
パラメータ.#TaskBean を指定し、対応する
task() メソッドを呼び出します。このうち、
TaskBean は、クラスレベルのアノテーション
@Task(name="TaskName") または
@Task("TaskName") を使用して宣言されます。
@Task
name 属性を削除し、
coroutine 属性も存在することに注意してください。上記のコードは
Task を実行するには、コルーチンの
runCoTask() または同期の
runSyncTask() を使用することを選択します。ただし、
Swoole の
タスク プロセス の実行は完全に同期であり、コルーチンをサポートしていないため、現在のバージョンではこのパラメーターを
true に設定しないでください。 。同様に、
TaskBean で記述されたタスク コードは、同期ブロッキングであるか、環境に応じて非同期ノンブロッキングとコルーチンを同期ブロッキングに自動的にダウングレードできる必要があります。
プロセスからタスクを配信します。
Swoole
のTaskのメカニズムの本質は、
Worker プロセスが時間のかかるタスクを実行することです。タスクを同期
Task Process(別名
TaskWorker) 処理に変換します。
つまり、
Swoole
$server->taskCo() または
$server->task() は、
ワーカープロセスで使用されます。
この制限により、使用シナリオが大幅に制限されます。 プロセス
でタスクを配信するにはどうすればよいですか?
Swoftこの制限を回避するために、
Task::deliverByProcess() メソッドが提供されています。実装原理も非常にシンプルで、呼び出し情報は
Swoole の $server->sendMessage()
メソッドを通じて Process
から Worker プロセスに配信されます。
、その後、ワーカー プロセスがそれを
タスク プロセス に配信します。関連するコードは次のとおりです:
//Swoft\Task\Task.php /** * Deliver task by process * * @param string $taskName * @param string $methodName * @param array $params * @param string $type * @param int $timeout * @param int $workId * * @return bool */ public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool { /* @var PipeMessageInterface $pipeMessage */ $server = App::$server->getServer(); $pipeMessage = App::getBean(PipeMessage::class); $data = [ 'name' => $taskName, 'method' => $methodName, 'params' => $params, 'timeout' => $timeout, 'type' => $type, ]; $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data); return $server->sendMessage($message, $workId); }
データがパッケージ化された後、
$ を使用しますserver->sendMessage()Worker:
//Swoft\Bootstrap\Server\ServerTrait.php /** * onPipeMessage event callback * * @param \Swoole\Server $server * @param int $srcWorkerId * @param string $message * @return void * @throws \InvalidArgumentException */ public function onPipeMessage(Server $server, int $srcWorkerId, string $message) { /* @var PipeMessageInterface $pipeMessage */ $pipeMessage = App::getBean(PipeMessage::class); list($type, $data) = $pipeMessage->unpack($message); App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId); }
$server->sendMessage に配信した後、Worker プロセス データを受信すると ## がトリガーされます。#swoole.pipeMessage
イベント コールバック、Swoft
はそれを独自の swoft.pipeMessage
イベントとトリガーに変換します。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
* The pipe message listener
*
* @Listener(event=AppEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param \Swoft\Event\EventInterface $event
*/
public function handle(EventInterface $event)
{
$params = $event->getParams();
if (count($params) </pre><div class="contentsignin">ログイン後にコピー</div></div>
swoft.pipeMessage
イベントは最終的に
によって処理されます。関連する監視において、swoft.pipeMessage
イベントが Task::deliverByProcess()
によって生成されたことが判明した場合、Worker プロセス
はそれを 1 回実行します。 Task::deliver()
は、最終的にタスク データを TaskWorker プロセス
に配信します。 簡単な復習演習:
Task::deliverByProcess()
から特定の
の最終実行まで、どのようなプロセスが実行され、どの部分が実行されたのかを確認します。呼び出しチェーンはどのプロセスで実行されますか? <h3>从Command进程或其子进程中投递任务</h3>
<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">//Swoft\Task\QueueTask.php
/**
* @param string $data
* @param int $taskWorkerId
* @param int $srcWorkerId
*
* @return bool
*/
public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
{
if ($taskWorkerId === null) {
$taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
}
if ($srcWorkerId === null) {
$srcWorkerId = mt_rand(0, $this->workerNum - 1);
}
$this->check();
$data = $this->pack($data, $srcWorkerId);
$result = \msg_send($this->queueId, $taskWorkerId, $data, false);
if (!$result) {
return false;
}
return true;
}</pre><div class="contentsignin">ログイン後にコピー</div></div>
<p>对于<code>Command
进程的任务投递,情况会更复杂一点。
上文提到的Process
,其往往衍生于Http/Rpc
服务,作为同一个Manager
的子孙进程,他们能够拿到Swoole\Server
的句柄变量,从而通过$server->sendMessage()
,$server->task()
等方法进行任务投递。
但在Swoft
的体系中,还有一个十分路人的角色: Command
。Command
的进程从shell
或cronb
独立启动,和Http/Rpc
服务相关的进程没有亲缘关系。因此Command
进程以及从Command
中启动的Process
进程是没有办法拿到Swoole\Server
的调用句柄直接通过UnixSocket
进行任务投递的。
为了为这种进程提供任务投递支持,Swoft
利用了Swoole
的Task进程
的一个特殊功能----消息队列。
同一个项目中Command
和Http\RpcServer
通过约定一个message_queue_key
获取到系统内核中的同一条消息队列,然后Comand
进程就可以通过该消息队列向Task进程
投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()
方法中,Swoft
会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore
拓展,如果你想使用,需要在编译PHP
时加上--enable-sysvmsg
参数。
除了手动执行的普通任务,Swoft
还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab
功能.
Swoft
用两个前置Process
---任务计划进程:CronTimerProcess
和任务执行进程CronExecProcess
,和两张内存数据表-----RunTimeTable
(任务(配置)表)OriginTable
((任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:
\\Swoft\Task\Crontab\TableCrontab.php /** * 任务表,记录用户配置的任务信息 * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录 * @var array $originStruct */ private $originStruct = [ 'rule' => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性 'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名) 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法 'add_time' => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳 ]; /** * 执行表,记录短时间内要执行的任务列表及其执行状态 * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录 * @var array $runTimeStruct */ private $runTimeStruct = [ 'taskClass' => [\Swoole\Table::TYPE_STRING, 255],//同上 'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上 'minute' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi') 'sec' => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳 'runStatus' => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行) 1(已执行) 2(执行中) 三种。 //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。 ];
Swoft
的的定时任务管理是分别由 任务计划进程 和 任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()
等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table
(本文的Table
,除非特别说明,都指Swoole
的Swoole\Table
结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。
为了Table
能够在两个进程间共同使用,Table
必须在Swoole Server
启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()
中,比较简单,有兴趣的可以自行阅读。
背景介绍完了,我们来看看这两个定时任务进程的行为
//Swoft\Task\Bootstrap\Process\CronTimerProcess.php /** * Crontab timer process * * @Process(name="cronTimer", boot=true) */ class CronTimerProcess implements ProcessInterface { /** * @param \Swoft\Process\Process $process */ public function run(SwoftProcess $process) { //code.... /* @var \Swoft\Task\Crontab\Crontab $cron*/ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $time = (60 - date('s')) * 1000; $server->after($time, function () use ($server, $cron) { // Every minute check all tasks, and prepare the tasks that next execution point needs $cron->checkTask(); $server->tick(60 * 1000, function () use ($cron) { $cron->checkTask(); }); }); } }
//Swoft\Task\Crontab\Crontab.php /** * 初始化runTimeTable数据 * * @param array $task 任务 * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行 * @return bool */ private function initRunTimeTableData(array $task, array $parseResult): bool { $runTimeTableTasks = $this->getRunTimeTable()->table; $min = date('YmdHi'); $sec = strtotime(date('Y-m-d H:i')); foreach ($parseResult as $time) { $this->checkTaskQueue(false); $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec); $runTimeTableTasks->set($key, [ 'taskClass' => $task['taskClass'], 'taskMethod' => $task['taskMethod'], 'minute' => $min, 'sec' => $time + $sec, 'runStatus' => self::NORMAL ]); } return true; }
CronTimerProcess
是Swoft
的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
。
该进程使用了Swoole
的定时器功能,通过Swoole\Timer
在每分钟首秒时执行的回调,CronTimerProcess
每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行。
//Swoft\Task\Bootstrap\Process /** * Crontab process * * @Process(name="cronExec", boot=true) */ class CronExecProcess implements ProcessInterface { /** * @param \Swoft\Process\Process $process */ public function run(SwoftProcess $process) { $pname = App::$server->getPname(); $process->name(sprintf('%s cronexec process', $pname)); /** @var \Swoft\Task\Crontab\Crontab $cron */ $cron = App::getBean('crontab'); // Swoole/HttpServer $server = App::$server->getServer(); $server->tick(0.5 * 1000, function () use ($cron) { $tasks = $cron->getExecTasks(); if (!empty($tasks)) { foreach ($tasks as $task) { // Diliver task Task::deliverByProcess($task['taskClass'], $task['taskMethod']); $cron->finishTask($task['key']); } } }); } }
CronExecProcess
作为定时任务的执行者,通过Swoole\Timer
每0.5s
唤醒自身一次,然后把 执行表
遍历一次,挑选当下需要执行的任务,通过sendMessage()
投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程
中由TaskExecutor
处理。
相关推荐:
以上がSwooleとSwoftのSwoftソースコードの解析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。