PHP はマルチスレッドをサポートしていませんが、完全なシステムとしては、非同期で完了する必要がある操作が多数あります。これらの非同期操作を完了するために、Redis キュー タスク システムを構築しました。
ご存知のとおり、メッセージ キュー処理システムは主にコンシューマーとプロデューサーの 2 つの部分に分かれています。
私たちのシステムでは、メインシステムがプロデューサーとして機能し、タスクシステムがコンシューマーとして機能します。
具体的なワークフローは次のとおりです: 1. メイン システムは、処理する必要があるタスク名とタスク パラメーターをキューにプッシュします。 2. タスク システムは、タスク キューをリアルタイムでポップアウトし、サブプロセスをフォークし、サブプロセスが特定のタスク ロジックを完了します。
具体的なコードは次のとおりです:
/** * 启动守护进程 */public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit;}/** * 创建子进程 */private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子进程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主进程 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } }}/** * 业务任务队列处理 */private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE;}
これは単純なタスク処理システムです。
このタスク システムは非同期実装の実現に役立ち、これまでのところ 1 年近く安定して実行されています。
しかし、残念ながら、これは単一プロセス システムです。常にフォークしており、タスクがあれば処理され、タスクがなければスキップされます。
これは非常に安定しています。
しかし、問題が 2 つあります。1 つは、常にフォークとポップを行うとサーバー リソースが無駄になるということです。もう 1 つは、同時実行がサポートされていないということです。
最初の問題は大丈夫ですが、2 番目の問題は非常に深刻です。
メインシステム が同時に多数のタスクをスローすると、タスクの処理時間は無限に延長されます。
新しいデザインPHP7 より前はマルチスレッドがサポートされていなかったため、マルチプロセスを使用します。
インターネット上で多くの情報を見つけました。いわゆるマルチプロセスのほとんどは、バックグラウンドで同時に実行されている N 個のプロセスです。
明らかにこれは不適切です。
私の期待は次のとおりです:
タスクが飛び出すたびにタスクをフォークし、タスクの実行が完了した後に子プロセスが終了します。
発生した問題自動インクリメントには問題はありません。メインプロセスで操作するだけです。では、自分で減らすにはどうすればよいでしょうか?
おそらく、それは子プロセス内にあると言うかもしれません。ただし、ここで注意が必要です。フォークすると、リソースがメインプロセスから子プロセスにコピーされるため、メインプロセスのカウンタを子プロセスで操作することはできません。
つまり、ここで理解する必要がある知識点があります。それはシグナルです。
詳細については自分で Google で調べたり、ここでコードを直接確認したりできます。
// install signal handler for dead kidspcntl_signal(SIGCHLD, array($this, "sig_handler"));
declare(ticks = 1);
このコードの意味は、低レベルのステートメントが実行されるたびにシグナルプロセッサを呼び出すことです。
このようにして、子プロセスが終了するたびにシグナルプロセッサが呼び出され、シグナルプロセッサ内でそれをデクリメントすることができます。
2. プロセス残留問題の解決方法
プロセス残留物の問題を解決するには、子プロセスをリサイクルする必要があります。
次に、子プロセスをどのようにリサイクルするかが技術的なポイントになります。
多くのブログ投稿を含む pcntl のデモでは、子プロセスがメインプロセスで再利用されていると言われています。
しかし、私たちは Redis の brpop に基づいており、brpop はブロックしています。
これにより問題が発生します。N 個のタスクを実行した後、タスク システムがアイドル状態のときにメイン プロセスがブロックされ、ブロックが発生したときに子プロセスがまだ実行中であるため、最後のいくつかの子プロセスのプロセス リサイクルを完了できません。 。 。
これはいつも複雑な場所でしたが、信号プロセッサを手に入れてからは非常に簡単になりました。
プロセスリサイクルも信号プロセッサに配置されます。
新システムの評価
ここでは Swoole 拡張機能のプロセスが使用されています。
具体的なコードは次のとおりです:
declare(ticks = 1);class JobDaemonController extends Yaf_Controller_Abstract{ use Trait_Redis; private $maxProcesses = 800; private $child; private $masterRedis; private $redis_task_wing = 'task:wing'; //待处理队列 public function init(){ // install signal handler for dead kids pcntl_signal(SIGCHLD, array($this, "sig_handler")); set_time_limit(0); ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection } private function redis_client(){ $rds = new Redis(); $rds->connect('redis.master.host',6379); return $rds; } public function process(swoole_process $worker){// 第一个处理 $GLOBALS['worker'] = $worker; swoole_event_add($worker->pipe, function($pipe) { $worker = $GLOBALS['worker']; $recv = $worker->read(); //send data to master sleep(rand(1, 3)); echo "From Master: $recv\n"; $worker->exit(0); }); exit; } public function testAction(){ for ($i = 0; $i < 10000; $i++){ $data = [ 'abc' => $i, 'timestamp' => time().rand(100,999) ]; $this->masterRedis->lpush($this->redis_task_wing, json_encode($data)); } exit; } public function runAction(){ while (1){// echo "\t now we de have $this->child child processes\n"; if ($this->child < $this->maxProcesses){ $rds = $this->redis_client(); $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待 if (!$data_pop){ continue; } echo "\t Starting new child | now we de have $this->child child processes\n"; $this->child++; $process = new swoole_process([$this, 'process']); $process->write(json_encode($data_pop)); $pid = $process->start(); } } } private function sig_handler($signo) {// echo "Recive: $signo \r\n"; switch ($signo) { case SIGCHLD: while($ret = swoole_process::wait(false)) {// echo "PID={$ret['pid']}\n"; $this->child--; } } }}
私はヤンおじさん、2 週間一生懸命働いてきた野生のプログラマーです