PHP는 감독자 프로세스 관리를 시뮬레이션합니다.
머리말감독자 프로세스 관리 시뮬레이션 DEMO(쉬운 구현)
예, 바퀴를 재발명하고 있습니다! 배우는 것이 목적입니다!스크린샷:
Copy
하위 프로세스 기능을 구현했습니다. AMQP에서 Consumer를 추가하거나 제거할 때 사용하면 매우 유용할 것 같습니다.
Copy
子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。
实现
1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程
不足:无法持续监听错误页面。由于socket得到的响应是通过include
函数加载的,所以在加载的页面内不能出现tail -f
命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。
知识点
代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1)
,而是用stream_select($read, $write, $except, 1)
让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open
,是一个很强大的函数。在这之前我曾用pcntl_exec
执行过外部程序,但是需要先pcntl_fork
。而用其他的如exec
,shell_exec
无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()
Implementation1. 메인 프로세스 루프에서 명령을 실행하기 위해 서브 프로세스를 시작합니다
2. 웹에서 127.0.0.1:7865를 입력하여 서브 프로세스 상태를 가져옵니다3. 메시지를 요청하고 해당 작업을 수행한 후 웹 페이지로 돌아갑니다.부적절: 오류 페이지를 지속적으로 모니터링할 수 없습니다. 소켓의 응답은4. 좀비 프로세스라고 불리는 것을 방지하기 위해 하위 프로세스를 재활용합니다.
include
함수를 통해 로드되므로 tail -f
명령은 로드된 페이지에 나타날 수 없습니다. 그렇지 않으면 스트림이 무한 루프에 빠지게 됩니다~ . 해결책이 있어야 할 것 같습니다.(fpm이 요청을 받은 후 처리하기 위해 하위 프로세스를 시작하는 모드를 모방하기 위해 소켓+멀티 프로세스 모드를 작성했지만 실행에 문제가 있습니다. 그래서 코드를 게시했습니다. 모든 사람의 조언을 얻기를 바라며). 확장: 프로세스를 잘 관리할 수 있으므로(예상) 다음과 같은 일부 요구 사항을 사용자 정의할 수 있습니다. (1) AMQP 소비자 프로세스 관리 서비스를 사용자 정의합니다. (2) crontab 타이밍 서비스를 시뮬레이션합니다.
지식 포인트
코드 구현 과정에서 배울 만한 세부 사항이 많이 있습니다.
1. while() 루프에서는 스트림의 비차단 모드가 활성화됩니다. 따라서 루프에서sleep(1)
을 사용할 수 없지만 내부적으로 스트림을 차단하려면 stream_select($read, $write, $read, 1)
를 사용하세요. 차단 모드와 비차단 모드는 여기를 참고하세요
2. 외부 프로그램을 실행할 수 있는 함수는 많지만 조금씩 다릅니다. 여기서 사용되는 것은 매우 강력한 기능인proc_open
입니다. 이전에 pcntl_exec
를 사용하여 외부 프로그램을 실행한 적이 있지만 먼저 pcntl_fork
를 사용해야 합니다. 그러나 exec
및 shell_exec
와 같은 다른 메서드는 하위 프로세스를 관리할 수 없습니다. 3. 자식 프로세스를 다시 시작하거나 중지하면 먼저 기본 프로세스의 메모리에 있는 자식 프로세스의 상태만 변경되며 실제로 자식 프로세스에서는 작동하지 않습니다. init()
는 동일한 위치에서 하위 프로세스를 처리합니다. 이렇게 하면 하위 프로세스가 시작될 때 컨텍스트로 인해 발생하는 이상한 현상을 방지할 수 있습니다.
Code
<?php require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{ /** * 待启动的消费者数组 */ protected $consumers = array(); protected $childPids = array(); const PPID_FILE = __DIR__ . '/process'; protected $serializerConsumer; public function __construct() { $this->consumers = $this->getConsumers(); } // 这里是个DEMO,实际可以用读取配置文件的方式。 public function getConsumers() { $consumer = new Consumer([ 'program' => 'test', 'command' => '/usr/bin/php test.php', 'directory' => __DIR__, 'logfile' => __DIR__ . '/test.log', 'uniqid' => uniqid(), 'auto_restart' => false, ]); return [ $consumer->uniqid => $consumer, ]; } public function run() { if (empty($this->consumers)) { // consumer empty return; } if ($this->_notifyMaster()) { // master alive return; } $pid = pcntl_fork(); if ($pid 0) { exit; } if (!posix_setsid()) { exit; } $stream = new StreamConnection('tcp://0.0.0.0:7865'); @cli_set_process_title('AMQP Master Process'); // 将主进程ID写入文件 file_put_contents(self::PPID_FILE, getmypid()); // master进程继续 while (true) { $this->init(); pcntl_signal_dispatch(); $this->waitpid(); // 如果子进程被全部回收,则主进程退出 // if (empty($this->childPids)) { // $stream->close($stream->getSocket()); // break; // } $stream->accept(function ($uniqid, $action) { $this->handle($uniqid, $action); return $this->display(); }); } } protected function init() { foreach ($this->consumers as &$c) { switch ($c->state) { case Consumer::RUNNING: case Consumer::STOP: break; case Consumer::NOMINAL: case Consumer::STARTING: $this->fork($c); break; case Consumer::STOPING: if ($c->pid && posix_kill($c->pid, SIGTERM)) { $this->reset($c, Consumer::STOP); } break; case Consumer::RESTART: if (empty($c->pid)) { $this->fork($c); break; } if (posix_kill($c->pid, SIGTERM)) { $this->reset($c, Consumer::STOP); $this->fork($c); } break; default: break; } } } protected function reset(Consumer $c, $state) { $c->pid = ''; $c->uptime = ''; $c->state = $state; $c->process = null; } protected function waitpid() { foreach ($this->childPids as $uniqid => $pid) { $result = pcntl_waitpid($pid, $status, WNOHANG); if ($result == $pid || $result == -1) { unset($this->childPids[$uniqid]); $c = &$this->consumers[$uniqid]; $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP; $this->reset($c, $state); } } } /** * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程 */ private function _notifyMaster() { $ppid = file_get_contents(self::PPID_FILE ); $isAlive = $this->checkProcessAlive($ppid); if (!$isAlive) return false; return true; } public function checkProcessAlive($pid) { if (empty($pid)) return false; $pidinfo = `ps co pid {$pid} | xargs`; $pidinfo = trim($pidinfo); $pattern = "/.*?PID.*?(\d+).*?/"; preg_match($pattern, $pidinfo, $matches); return empty($matches) ? false : ($matches[1] == $pid ? true : false); } /** * fork一个新的子进程 */ protected function fork(Consumer $c) { $descriptorspec = [2 => ['file', $c->logfile, 'a'],]; $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory); if ($process) { $ret = proc_get_status($process); if ($ret['running']) { $c->state = Consumer::RUNNING; $c->pid = $ret['pid']; $c->process = $process; $c->uptime = date('m-d H:i'); $this->childPids[$c->uniqid] = $ret['pid']; } else { $c->state = Consumer::EXITED; proc_close($process); } } else { $c->state = Consumer::ERROR; } return $c; } public function display() { $location = 'http://127.0.0.1:7865'; $basePath = Http::$basePath; $scriptName = isset($_SERVER['SCRIPT_NAME']) && !empty($_SERVER['SCRIPT_NAME']) && $_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php'; if ($scriptName == '/index.html') { return Http::status_301($location); } $sourcePath = $basePath . $scriptName; if (!is_file($sourcePath)) { return Http::status_404(); } ob_start(); include $sourcePath; $response = ob_get_contents(); ob_clean(); return Http::status_200($response); } public function handle($uniqid, $action) { if (!empty($uniqid) && !isset($this->consumers[$uniqid])) { return; } switch ($action) { case 'refresh': break; case 'restartall': $this->killall(true); break; case 'stopall': $this->killall(); break; case 'stop': $c = &$this->consumers[$uniqid]; if ($c->state != Consumer::RUNNING) break; $c->state = Consumer::STOPING; break; case 'start': $c = &$this->consumers[$uniqid]; if ($c->state == Consumer::RUNNING) break; $c->state = Consumer::STARTING; break; case 'restart': $c = &$this->consumers[$uniqid]; $c->state = Consumer::RESTART; break; case 'copy': $c = $this->consumers[$uniqid]; $newC = clone $c; $newC->uniqid = uniqid('C'); $newC->state = Consumer::NOMINAL; $newC->pid = ''; $this->consumers[$newC->uniqid] = $newC; break; default: break; } } protected function killall($restart = false) { foreach ($this->consumers as &$c) { $c->state = $restart ? Consumer::RESTART : Consumer::STOPING; } }}$cli = new Process();$cli->run();
<?php require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{ /** 开启多少个消费者 */ public $numprocs = 1; /** 当前配置的唯一标志 */ public $program; /** 执行的命令 */ public $command; /** 当前工作的目录 */ public $directory; /** 通过 $qos $queueName $duplicate 生成的 $queue */ public $queue; /** 程序执行日志记录 */ public $logfile = ''; /** 消费进程的唯一ID */ public $uniqid; /** 进程IDpid */ public $pid; /** 进程状态 */ public $state = self::NOMINAL; /** 自启动 */ public $auto_restart = false; public $process; /** 启动时间 */ public $uptime; const RUNNING = 'running'; const STOP = 'stoped'; const NOMINAL = 'nominal'; const RESTART = 'restart'; const STOPING = 'stoping'; const STARTING = 'stating'; const ERROR = 'error'; const BLOCKED = 'blocked'; const EXITED = 'exited'; const FATEL = 'fatel';}
<?php class StreamConnection{ protected $socket; protected $timeout = 2; //s protected $client; public function __construct($host) { $this->socket = $this->connect($host); } public function connect($host) { $socket = stream_socket_server($host, $errno, $errstr); if (!$socket) { exit('stream error'); } stream_set_timeout($socket, $this->timeout); stream_set_chunk_size($socket, 1024); stream_set_blocking($socket, false); $this->client = [$socket]; return $socket; } public function accept(Closure $callback) { $read = $this->client; if (stream_select($read, $write, $except, 1) socket, $read)) { $cs = stream_socket_accept($this->socket); $this->client[] = $cs; } foreach ($read as $s) { if ($s == $this->socket) continue; $header = fread($s, 1024); if (empty($header)) { $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); continue; } Http::parse_http($header); $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : ''; $action = isset($_GET['action']) ? $_GET['action'] : ''; $response = $callback($uniqid, $action); $this->write($s, $response); $index = array_search($s, $this->client); if ($index) unset($this->client[$index]); $this->close($s); } } public function write($socket, $response) { $ret = fwrite($socket, $response, strlen($response)); } public function close($socket) { $flag = fclose($socket); } public function getSocket() { return $this->socket; }}
<?php class Http{ public static $basePath = __DIR__ . '/views'; public static $max_age = 120; //秒 /* * 函数: parse_http * 描述: 解析http协议 */ public static function parse_http($http) { // 初始化 $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES = array(); $GLOBALS['HTTP_RAW_POST_DATA'] = ''; // 需要设置的变量名 $_SERVER = array( 'QUERY_STRING' => '', 'REQUEST_METHOD' => '', 'REQUEST_URI' => '', 'SERVER_PROTOCOL' => '', 'SERVER_SOFTWARE' => '', 'SERVER_NAME' => '', 'HTTP_HOST' => '', 'HTTP_USER_AGENT' => '', 'HTTP_ACCEPT' => '', 'HTTP_ACCEPT_LANGUAGE' => '', 'HTTP_ACCEPT_ENCODING' => '', 'HTTP_COOKIE' => '', 'HTTP_CONNECTION' => '', 'REMOTE_ADDR' => '', 'REMOTE_PORT' => '0', 'SCRIPT_NAME' => '', 'HTTP_REFERER' => '', 'CONTENT_TYPE' => '', 'HTTP_IF_NONE_MATCH' => '', ); // 将header分割成数组 list($http_header, $http_body) = explode("\r\n\r\n", $http, 2); $header_data = explode("\r\n", $http_header); list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]); unset($header_data[0]); foreach ($header_data as $content) { // \r\n\r\n if (empty($content)) { continue; } list($key, $value) = explode(':', $content, 2); $key = strtolower($key); $value = trim($value); switch ($key) { case 'host': $_SERVER['HTTP_HOST'] = $value; $tmp = explode(':', $value); $_SERVER['SERVER_NAME'] = $tmp[0]; if (isset($tmp[1])) { $_SERVER['SERVER_PORT'] = $tmp[1]; } break; case 'cookie': $_SERVER['HTTP_COOKIE'] = $value; parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE); break; case 'user-agent': $_SERVER['HTTP_USER_AGENT'] = $value; break; case 'accept': $_SERVER['HTTP_ACCEPT'] = $value; break; case 'accept-language': $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value; break; case 'accept-encoding': $_SERVER['HTTP_ACCEPT_ENCODING'] = $value; break; case 'connection': $_SERVER['HTTP_CONNECTION'] = $value; break; case 'referer': $_SERVER['HTTP_REFERER'] = $value; break; case 'if-modified-since': $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value; break; case 'if-none-match': $_SERVER['HTTP_IF_NONE_MATCH'] = $value; break; case 'content-type': if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) { $_SERVER['CONTENT_TYPE'] = $value; } else { $_SERVER['CONTENT_TYPE'] = 'multipart/form-data'; $http_post_boundary = '--' . $match[1]; } break; } } // script_name $_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH); // QUERY_STRING $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY); if ($_SERVER['QUERY_STRING']) { // $GET parse_str($_SERVER['QUERY_STRING'], $_GET); } else { $_SERVER['QUERY_STRING'] = ''; } // REQUEST $_REQUEST = array_merge($_GET, $_POST); return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES); } public static function status_404() { return 🎜실행할 스크립트: test.php🎜<pre class="brush:php;toolbar:false"><?php while(true) { file_put_contents(__DIR__ . '/test.log', date('Y-m-d H:i:s')); sleep(1);}
위 내용은 PHP는 감독자 프로세스 관리를 시뮬레이션합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











PHP 8.4는 상당한 양의 기능 중단 및 제거를 통해 몇 가지 새로운 기능, 보안 개선 및 성능 개선을 제공합니다. 이 가이드에서는 Ubuntu, Debian 또는 해당 파생 제품에서 PHP 8.4를 설치하거나 PHP 8.4로 업그레이드하는 방법을 설명합니다.

CakePHP는 PHP용 오픈 소스 프레임워크입니다. 이는 애플리케이션을 훨씬 쉽게 개발, 배포 및 유지 관리할 수 있도록 하기 위한 것입니다. CakePHP는 강력하고 이해하기 쉬운 MVC와 유사한 아키텍처를 기반으로 합니다. 모델, 뷰 및 컨트롤러 gu

CakePHP에 로그인하는 것은 매우 쉬운 작업입니다. 한 가지 기능만 사용하면 됩니다. cronjob과 같은 백그라운드 프로세스에 대해 오류, 예외, 사용자 활동, 사용자가 취한 조치를 기록할 수 있습니다. CakePHP에 데이터를 기록하는 것은 쉽습니다. log() 함수는 다음과 같습니다.

VS Code라고도 알려진 Visual Studio Code는 모든 주요 운영 체제에서 사용할 수 있는 무료 소스 코드 편집기 또는 통합 개발 환경(IDE)입니다. 다양한 프로그래밍 언어에 대한 대규모 확장 모음을 통해 VS Code는

CakePHP는 오픈 소스 MVC 프레임워크입니다. 이를 통해 애플리케이션 개발, 배포 및 유지 관리가 훨씬 쉬워집니다. CakePHP에는 가장 일반적인 작업의 과부하를 줄이기 위한 여러 라이브러리가 있습니다.
