스크린샷:
Copy
하위 프로세스 기능을 구현했습니다. AMQP에서 Consumer를 추가하거나 제거할 때 사용하면 매우 유용할 것 같습니다.
Copy
子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。
1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程
不足:无法持续监听错误页面。由于socket得到的响应是通过include
函数加载的,所以在加载的页面内不能出现tail -f
命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。
代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1)
,而是用stream_select($read, $write, $except, 1)
让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open
,是一个很强大的函数。在这之前我曾用pcntl_exec
执行过外部程序,但是需要先pcntl_fork
。而用其他的如exec
,shell_exec
无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()
3. 메시지를 요청하고 해당 작업을 수행한 후 웹 페이지로 돌아갑니다.부적절: 오류 페이지를 지속적으로 모니터링할 수 없습니다. 소켓의 응답은4. 좀비 프로세스라고 불리는 것을 방지하기 위해 하위 프로세스를 재활용합니다.
include
함수를 통해 로드되므로 tail -f
명령은 로드된 페이지에 나타날 수 없습니다. 그렇지 않으면 스트림이 무한 루프에 빠지게 됩니다~ . 해결책이 있어야 할 것 같습니다.(fpm이 요청을 받은 후 처리하기 위해 하위 프로세스를 시작하는 모드를 모방하기 위해 소켓+멀티 프로세스 모드를 작성했지만 실행에 문제가 있습니다. 그래서 코드를 게시했습니다. 모든 사람의 조언을 얻기를 바라며). 확장: 프로세스를 잘 관리할 수 있으므로(예상) 다음과 같은 일부 요구 사항을 사용자 정의할 수 있습니다. (1) AMQP 소비자 프로세스 관리 서비스를 사용자 정의합니다. (2) crontab 타이밍 서비스를 시뮬레이션합니다.
지식 포인트
코드 구현 과정에서 배울 만한 세부 사항이 많이 있습니다.
1. while() 루프에서는 스트림의 비차단 모드가 활성화됩니다. 따라서 루프에서sleep(1)
을 사용할 수 없지만 내부적으로 스트림을 차단하려면 stream_select($read, $write, $read, 1)
를 사용하세요. 차단 모드와 비차단 모드는 여기를 참고하세요
2. 외부 프로그램을 실행할 수 있는 함수는 많지만 조금씩 다릅니다. 여기서 사용되는 것은 매우 강력한 기능인proc_open
입니다. 이전에 pcntl_exec
를 사용하여 외부 프로그램을 실행한 적이 있지만 먼저 pcntl_fork
를 사용해야 합니다. 그러나 exec
및 shell_exec
와 같은 다른 메서드는 하위 프로세스를 관리할 수 없습니다. 3. 자식 프로세스를 다시 시작하거나 중지하면 먼저 기본 프로세스의 메모리에 있는 자식 프로세스의 상태만 변경되며 실제로 자식 프로세스에서는 작동하지 않습니다. init()
는 동일한 위치에서 하위 프로세스를 처리합니다. 이렇게 하면 하위 프로세스가 시작될 때 컨텍스트로 인해 발생하는 이상한 현상을 방지할 수 있습니다.
Code
<?php require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{ /** * 待启动的消费者数组 */ protected $consumers = array(); protected $childPids = array(); const PPID_FILE = __DIR__ . '/process'; protected $serializerConsumer; public function __construct() { $this->consumers = $this->getConsumers(); } // 这里是个DEMO,实际可以用读取配置文件的方式。 public function getConsumers() { $consumer = new Consumer([ 'program' => 'test', 'command' => '/usr/bin/php test.php', 'directory' => __DIR__, 'logfile' => __DIR__ . '/test.log', 'uniqid' => uniqid(), 'auto_restart' => false, ]); return [ $consumer->uniqid => $consumer, ]; } public function run() { if (empty($this->consumers)) { // consumer empty return; } if ($this->_notifyMaster()) { // master alive return; } $pid = pcntl_fork(); if ($pid 0) { exit; } if (!posix_setsid()) { exit; } $stream = new StreamConnection('tcp://0.0.0.0:7865'); @cli_set_process_title('AMQP Master Process'); // 将主进程ID写入文件 file_put_contents(self::PPID_FILE, getmypid()); // master进程继续 while (true) { $this->init(); pcntl_signal_dispatch(); $this->waitpid(); // 如果子进程被全部回收,则主进程退出 // if (empty($this->childPids)) { // $stream->close($stream->getSocket()); // break; // } $stream->accept(function ($uniqid, $action) { $this->handle($uniqid, $action); return $this->display(); }); } } protected function init() { foreach ($this->consumers as &$c) { switch ($c->state) { case Consumer::RUNNING: case Consumer::STOP: break; case Consumer::NOMINAL: case Consumer::STARTING: $this->fork($c); break; case Consumer::STOPING: if ($c->pid && posix_kill($c->pid, SIGTERM)) { $this->reset($c, Consumer::STOP); } break; case Consumer::RESTART: if (empty($c->pid)) { $this->fork($c); break; } if (posix_kill($c->pid, SIGTERM)) { $this->reset($c, Consumer::STOP); $this->fork($c); } break; default: break; } } } protected function reset(Consumer $c, $state) { $c->pid = ''; $c->uptime = ''; $c->state = $state; $c->process = null; } protected function waitpid() { foreach ($this->childPids as $uniqid => $pid) { $result = pcntl_waitpid($pid, $status, WNOHANG); if ($result == $pid || $result == -1) { unset($this->childPids[$uniqid]); $c = &$this->consumers[$uniqid]; $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP; $this->reset($c, $state); } } } /** * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程 */ private function _notifyMaster() { $ppid = file_get_contents(self::PPID_FILE ); $isAlive = $this->checkProcessAlive($ppid); if (!$isAlive) return false; return true; } public function checkProcessAlive($pid) { if (empty($pid)) return false; $pidinfo = `ps co pid {$pid} | xargs`; $pidinfo = trim($pidinfo); $pattern = "/.*?PID.*?(\d+).*?/"; preg_match($pattern, $pidinfo, $matches); return empty($matches) ? false : ($matches[1] == $pid ? true : false); } /** * fork一个新的子进程 */ protected function fork(Consumer $c) { $descriptorspec = [2 => ['file', $c->logfile, 'a'],]; $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory); if ($process) { $ret = proc_get_status($process); if ($ret['running']) { $c->state = Consumer::RUNNING; $c->pid = $ret['pid']; $c->process = $process; $c->uptime = date('m-d H:i'); $this->childPids[$c->uniqid] = $ret['pid']; } else { $c->state = Consumer::EXITED; proc_close($process); } } else { $c->state = Consumer::ERROR; } return $c; } public function display() { $location = 'http://127.0.0.1:7865'; $basePath = Http::$basePath; $scriptName = isset($_SERVER['SCRIPT_NAME']) && !empty($_SERVER['SCRIPT_NAME']) && $_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php'; if ($scriptName == '/index.html') { return Http::status_301($location); } $sourcePath = $basePath . $scriptName; if (!is_file($sourcePath)) { return Http::status_404(); } ob_start(); include $sourcePath; $response = ob_get_contents(); ob_clean(); return Http::status_200($response); } public function handle($uniqid, $action) { if (!empty($uniqid) && !isset($this->consumers[$uniqid])) { return; } switch ($action) { case 'refresh': break; case 'restartall': $this->killall(true); break; case 'stopall': $this->killall(); break; case 'stop': $c = &$this->consumers[$uniqid]; if ($c->state != Consumer::RUNNING) break; $c->state = Consumer::STOPING; break; case 'start': $c = &$this->consumers[$uniqid]; if ($c->state == Consumer::RUNNING) break; $c->state = Consumer::STARTING; break; case 'restart': $c = &$this->consumers[$uniqid]; $c->state = Consumer::RESTART; break; case 'copy': $c = $this->consumers[$uniqid]; $newC = clone $c; $newC->uniqid = uniqid('C'); $newC->state = Consumer::NOMINAL; $newC->pid = ''; $this->consumers[$newC->uniqid] = $newC; break; default: break; } } protected function killall($restart = false) { foreach ($this->consumers as &$c) { $c->state = $restart ? Consumer::RESTART : Consumer::STOPING; } }}$cli = new Process();$cli->run();
<?php require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{ /** 开启多少个消费者 */ public $numprocs = 1; /** 当前配置的唯一标志 */ public $program; /** 执行的命令 */ public $command; /** 当前工作的目录 */ public $directory; /** 通过 $qos $queueName $duplicate 生成的 $queue */ public $queue; /** 程序执行日志记录 */ public $logfile = ''; /** 消费进程的唯一ID */ public $uniqid; /** 进程IDpid */ public $pid; /** 进程状态 */ public $state = self::NOMINAL; /** 自启动 */ public $auto_restart = false; public $process; /** 启动时间 */ public $uptime; const RUNNING = 'running'; const STOP = 'stoped'; const NOMINAL = 'nominal'; const RESTART = 'restart'; const STOPING = 'stoping'; const STARTING = 'stating'; const ERROR = 'error'; const BLOCKED = 'blocked'; const EXITED = 'exited'; const FATEL = 'fatel';}
<?php class StreamConnection{ protected $socket; protected $timeout = 2; //s protected $client; public function __construct($host) { $this->socket = $this->connect($host); } public function connect($host) { $socket = stream_socket_server($host, $errno, $errstr); if (!$socket) { exit('stream error'); } stream_set_timeout($socket, $this->timeout); stream_set_chunk_size($socket, 1024); stream_set_blocking($socket, false); $this->client = [$socket]; return $socket; } public function accept(Closure $callback) { $read = $this->client; if (stream_select($read, $write, $except, 1) socket, $read)) { $cs = stream_socket_accept($this->socket); $this->client[] = $cs; } foreach ($read as $s) { if ($s == $this->socket) continue; $header = fread($s, 1024); if (empty($header)) { $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); continue; } Http::parse_http($header); $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : ''; $action = isset($_GET['action']) ? $_GET['action'] : ''; $response = $callback($uniqid, $action); $this->write($s, $response); $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); } } public function write($socket, $response) { $ret = fwrite($socket, $response, strlen($response)); } public function close($socket) { $flag = fclose($socket); } public function getSocket() { return $this->socket; }}
<?php class Http{ public static $basePath = __DIR__ . '/views'; public static $max_age = 120; //秒 /* * 函数: parse_http * 描述: 解析http协议 */ public static function parse_http($http) { // 初始化 $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES = array(); $GLOBALS['HTTP_RAW_POST_DATA'] = ''; // 需要设置的变量名 $_SERVER = array( 'QUERY_STRING' => '', 'REQUEST_METHOD' => '', 'REQUEST_URI' => '', 'SERVER_PROTOCOL' => '', 'SERVER_SOFTWARE' => '', 'SERVER_NAME' => '', 'HTTP_HOST' => '', 'HTTP_USER_AGENT' => '', 'HTTP_ACCEPT' => '', 'HTTP_ACCEPT_LANGUAGE' => '', 'HTTP_ACCEPT_ENCODING' => '', 'HTTP_COOKIE' => '', 'HTTP_CONNECTION' => '', 'REMOTE_ADDR' => '', 'REMOTE_PORT' => '0', 'SCRIPT_NAME' => '', 'HTTP_REFERER' => '', 'CONTENT_TYPE' => '', 'HTTP_IF_NONE_MATCH' => '', ); // 将header分割成数组 list($http_header, $http_body) = explode("\r\n\r\n", $http, 2); $header_data = explode("\r\n", $http_header); list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]); unset($header_data[0]); foreach ($header_data as $content) { // \r\n\r\n if (empty($content)) { continue; } list($key, $value) = explode(':', $content, 2); $key = strtolower($key); $value = trim($value); switch ($key) { case 'host': $_SERVER['HTTP_HOST'] = $value; $tmp = explode(':', $value); $_SERVER['SERVER_NAME'] = $tmp[0]; if (isset($tmp[1])) { $_SERVER['SERVER_PORT'] = $tmp[1]; } break; case 'cookie': $_SERVER['HTTP_COOKIE'] = $value; parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE); break; case 'user-agent': $_SERVER['HTTP_USER_AGENT'] = $value; break; case 'accept': $_SERVER['HTTP_ACCEPT'] = $value; break; case 'accept-language': $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value; break; case 'accept-encoding': $_SERVER['HTTP_ACCEPT_ENCODING'] = $value; break; case 'connection': $_SERVER['HTTP_CONNECTION'] = $value; break; case 'referer': $_SERVER['HTTP_REFERER'] = $value; break; case 'if-modified-since': $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value; break; case 'if-none-match': $_SERVER['HTTP_IF_NONE_MATCH'] = $value; break; case 'content-type': if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) { $_SERVER['CONTENT_TYPE'] = $value; } else { $_SERVER['CONTENT_TYPE'] = 'multipart/form-data'; $http_post_boundary = '--' . $match[1]; } break; } } // script_name $_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH); // QUERY_STRING $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY); if ($_SERVER['QUERY_STRING']) { // $GET parse_str($_SERVER['QUERY_STRING'], $_GET); } else { $_SERVER['QUERY_STRING'] = ''; } // REQUEST $_REQUEST = array_merge($_GET, $_POST); return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES); } public static function status_404() { return 🎜실행할 스크립트: test.php🎜<pre class="brush:php;toolbar:false"><?php while(true) { file_put_contents(__DIR__ . '/test.log', date('Y-m-d H:i:s')); sleep(1);}
위 내용은 PHP는 감독자 프로세스 관리를 시뮬레이션합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!