由於PHP不支援多線程,但是作為一個完善的系統,有很多操作都是需要非同步完成的。為了完成這些非同步操作,我們做了一個基於Redis隊列任務系統。
大家知道,一個訊息佇列處理系統主要分成兩大部分:消費者和製造者。
在我們的系統中,主系統作為製造者,任務系統作為消費者。
具體的工作流程如下: 1、主系統將需要需要處理的任務名稱 任務參數push到佇列中。 2.任務系統即時的對任務隊列進行pop,pop出來一個任務就fork一個子進程,由子進程完成具體的任務邏輯。
/** * 启动守护进程 */ public function runAction() { Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-'); while (true) { $this->fork_process(); } exit; } /** * 创建子进程 */ private function fork_process() { $ppid = getmypid(); $pid = pcntl_fork(); if ($pid == 0) {//子进程 $pid = posix_getpid(); //echo "* Process {$pid} was created \n\n"; $this->mq_process(); exit; } else {//主进程 $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态 if (pcntl_wifexited($status)) { //echo "\n\n* Sub process: {$pid} exited with {$status}"; //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid ); } else { Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-'); } } } /** * 业务任务队列处理 */ private function mq_process() { $data_pop = $this->masterRedis->rPop($this->redis_list_key); $data = json_decode($data_pop, 1); if (!$data) { return FALSE; } $worker = '_task_' . $data['worker']; $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel'; $params = $data['params']; $class = new $class_name(); $class->$worker($params); return TRUE; }
+VX:PHPopen888PHP中高级教程分享,专注laravel,swoole,tp,分布式高并发处理教程分享