PHP는 비동기 다중 프로세스 모드(코드)에서 작업 처리를 구현하기 위해 명령줄을 어떻게 사용합니까?

不言
풀어 주다: 2023-04-04 20:36:02
앞으로
2846명이 탐색했습니다.

이 기사의 내용은 PHP가 명령줄을 사용하여 비동기 다중 프로세스 모드에서 작업 처리(코드)를 구현하는 방법에 대한 내용입니다. 필요한 친구들이 참고할 수 있기를 바랍니다. .

PHP를 사용하여 비동기 작업을 구현하는 것은 항상 문제였습니다. 기존 솔루션 중: PHP의 잘 알려진 비동기 프레임워크에는 swoole 및 Workerman이 포함되어 있지만 웹 환경을 강제로 구축하더라도 웹 환경에서 직접 사용할 수 없습니다. , 비동기 호출 다중 프로세스 모드를 사용하여 구현되기도 합니다. 그러나 때로는 서비스를 시작하고 서버가 클라이언트 메시지를 기다리도록 할 필요가 없으며 서버 코드가 중간에 변경될 수 없다는 점은 말할 것도 없습니다. 이 글에서는 프레임워크나 타사 라이브러리를 사용하지 않고 CLI 환경에서 웹 환경에서 다중 프로세스 및 비동기 호출을 구현하는 방법을 소개합니다.

웹 환경에서 비동기 호출에 일반적으로 사용되는 두 가지 방법이 있습니다

1. 소켓 연결 사용

이 방법은 일반적인 C/S 아키텍처이며 서버 지원이 필요합니다.

// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);
로그인 후 복사

2. popen을 사용하여 프로세스 파이프라인 열기

이 방법은 운영 체제 명령을 사용하며 운영 체제에서 직접 실행됩니다.

이 문서에서 설명하는 비동기 호출은 이 방법을 사용합니다.

$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件
$op = 'call'; //脚本文件接收的参数1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL.'异步任务已执行。'.PHP_EOL;
로그인 후 복사

이 방법의 장점은 한 단계로 해결되며 현재 프로세스에 대한 오버헤드가 필요하지 않다는 것입니다.
단점도 분명합니다. 작업 스크립트의 실행 상태를 추적할 수 없습니다.
그래서 하이라이트는 작업을 실행하는 스크립트 파일이 될 것입니다. 다음에서는 작업 처리 및 다중 처리 구현을 소개합니다.

CLI 환경의 다중 프로세스 작업 처리

CLI 环境的多进程任务处理

注意:多进程模式仅支持Linux,不支持Windows!!

这里会从0开始(未使用任何框架和类库)介绍每一个步骤,最后会附带一份完整的代码

1. 创建脚本

  • 任何脚本不可忽视的地方就是错误处理。所以写一个任务处理脚本首先就是写错误处理方式。

在PHP中就是调用 set_exception_handler set_error_handler register_shutdown_function 这三个函数,然后写上自定义的处理方法。

  • 接着是定义自动加载函数 spl_autoload_register 免去每使用一个新类都要 require / include 的烦恼。

  • 定义日志操作方法。

  • 定义任务处理方法。

  • 读取来自命令行的参数,开始执行任务。

2. 多进程处理

PHP 创建多进程是使用 pcntl_fork 函数,该函数会 fork 一份当前进程(影分身术),于是就有了两个进程,当前进程是主进程(本体),fork 出的进程是子进程(影分身)。需要注意的是两个进程代码环境是一样的,两个进程都是执行到了 pcntl_fork 函数位置。区别就是 getmypid 获得的进程号不一样,最重要的区分是当调用 pcntl_fork函数时,子进程获得的返回值是 0,而主进程获得的是子进程的进程号 pid。

好了,当我们知道谁是子进程后,就可以让该子进程执行任务了。

那么主进程是如何得知子进程的状态呢?
使用 pcntl_wait。该函数有两个参数 $status 和 $options ,$status 是引用类型,用来存储子进程的状态,$options 有两个可选常量WNOHANG| WUNTRACED,分别表示不等待子进程结束立即返回和等待子进程结束。很明显使用WUNTRACED会阻塞主进程。(也可以使用 pcntl_waitpid 函数获取特定 pid 子进程状态)

在多进程中,主进程要做的就是管理每个子进程的状态,否则子进程很可能无法退出而变成僵尸进程。

关于多进程间的消息通信
这一块需要涉及具体的业务逻辑,所以只能简单的提一下。不考虑使用第三方比如 redis참고: 다중 프로세스 모드는 Windows가 아닌

Linux

만 지원합니다! ! 모든 단계는 프레임워크나 클래스 라이브러리를 사용하지 않고 0부터 시작하여 여기에서 소개되며 전체 코드는 마지막에 첨부됩니다.

1. 스크립트 만들기

  • 🎜모든 스크립트에서 무시할 수 없는 중요한 측면은 🎜오류 처리🎜입니다. 따라서 작업 처리 스크립트를 작성할 때 첫 번째 단계는 오류 처리 방법을 작성하는 것입니다. 🎜
🎜PHP에서는 set_Exception_handler, set_error_handler, Register_shutdown_function 세 가지 함수를 호출한 다음 사용자 정의 처리 방법을 작성하면 됩니다. 🎜
  • 🎜다음 단계는 새 클래스를 사용할 때마다 필수/포함 문제를 방지하기 위해 자동 로딩 함수 spl_autoload_register를 정의하는 것입니다. 🎜
  • 🎜로그 작업 방법을 정의합니다. 🎜
  • 🎜작업 처리 방법을 정의합니다. 🎜
  • 🎜명령줄에서 매개변수를 읽고 작업 실행을 시작하세요. 🎜

2. 다중 프로세스 처리

🎜PHP는 pcntl_fork 함수를 사용하여 다중 프로세스를 생성합니다(섀도우 복제 기술). 두 개의 A 프로세스가 있으며 현재 프로세스는 기본 프로세스(온톨로지)이고 분기된 프로세스는 하위 프로세스(섀도 클론)입니다. 두 프로세스의 코드 환경은 동일하며 두 프로세스 모두 pcntl_fork 함수 위치에서 실행되었다는 점에 유의해야 합니다. 차이점은 getmypid로 얻은 프로세스 번호가 다르다는 점입니다. 가장 중요한 차이점은 pcntl_fork 함수가 호출되면 하위 프로세스에서 얻은 반환 값은 0인 반면, 기본 프로세스는 하위 프로세스의 프로세스 번호 pid를 가져옵니다. . 🎜🎜자, 자식 프로세스가 누구인지 알고 나면 자식 프로세스가 작업을 수행하도록 할 수 있습니다. 🎜🎜그렇다면 메인 프로세스는 자식 프로세스의 상태를 어떻게 알 수 있을까요? 🎜pcntl_wait를 사용하세요. 이 함수에는 $status 및 $options라는 두 개의 매개변수가 있습니다. $status는 하위 프로세스의 상태를 저장하는 데 사용되는 참조 유형입니다. $options에는 두 개의 선택적 상수 WNOHANG|WUNTRACED가 있으며 이는 하위 프로세스가 끝날 때까지 기다리지 않고 즉시 반환됨을 의미합니다. 그리고 각각 자식 프로세스를 기다리면 프로세스가 종료됩니다. 분명히 WUNTRACED를 사용하면 기본 프로세스가 차단됩니다. (pcntl_waitpid 함수를 사용하여 특정 pid 하위 프로세스 상태를 얻을 수도 있습니다.) 🎜🎜여러 프로세스에서 기본 프로세스가 해야 할 일은 각 하위 프로세스의 상태를 관리하는 것입니다. 그렇지 않으면 하위 프로세스가 다음을 수행하지 못할 가능성이 높습니다. 종료하고 좀비 프로세스가 됩니다. 🎜🎜🎜다중 프로세스 간의 메시지 통신에 대하여🎜🎜이 영역에는 특정 비즈니스 로직이 포함되어야 하므로 간략하게만 언급하겠습니다. redis와 같은 타사 서비스 사용을 고려하지 않고도 PHP는 🎜파이프라인 통신🎜 및 🎜공유 메모리🎜를 기본적으로 구현할 수 있습니다. 구현은 비교적 간단하지만, 사용 가능한 데이터 용량이 제한되어 있고, 단순 텍스트 프로토콜을 통해서만 데이터를 교환할 수 있다는 단점이 있습니다. 🎜🎜🎜모든 프로세스 작업을 수동으로 종료하는 방법🎜🎜

如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。

以下是完整的任务执行脚本代码:

可能无法直接使用,需要修改的地方有:

  1. 脚本目录和日志目录常量

  2. 自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

  3. 其他的如:错误和日志处理方式和文本格式就随意吧...

  4. 如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

  5. 多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

<?php

error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);
@ini_set(&#39;display_errors&#39;, 0);
@ini_set(&#39;date.timezone&#39;, &#39;PRC&#39;);

chdir(__DIR__);

/* 任务脚本目录 */
defined(&#39;TASK_PATH&#39;) or define(&#39;TASK_PATH&#39;, realpath(__DIR__ .&#39;/tasks&#39;));
/* 任务日志目录 */
defined(&#39;TASK_LOGS_PATH&#39;) or define(&#39;TASK_LOGS_PATH&#39;, __DIR__ .&#39;/tasks/logs&#39;);

if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);

set_exception_handler(function($e) {
    $time = date(&#39;H:i:s&#39;, time());
    $msg = sprintf(&#39;&#39;. &#39;<h3>[%s] %s (%s)</h3>&#39;. "\n". &#39;<pre class="brush:php;toolbar:false">%s
', $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString() ); file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) { if (!(error_reporting() & $errno)) return; ob_start(); debug_print_backtrace(); $backtrace = ob_get_contents(); ob_end_clean(); $datetime = date('Y-m-d H:i:s', time()); $msg = << $header) { if (!is_numeric($_k)) $header = sprintf('%s: %s', $_k, $header); $_headers .= $header . "\r\n"; } } $headers = "Connection: close\r\n" . $_headers; $opts = array( 'http' => array( 'method' => strtoupper(@$job['method'] ?: 'get'), 'content' => @$job['data'] ?: null, 'header' => $headers, 'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)', 'proxy' => @$job['args']['proxy'] ?: null, 'timeout' => intval(@$job['args']['timeout'] ?: 120), 'protocol_version' => @$job['args']['protocol_version'] ?: '1.1', 'max_redirects' => 3, 'ignore_errors' => true ) ); $ret = @file_get_contents($url, false, stream_context_create($opts)); //debug_log($url.' -->'.strlen($ret)); if ($ret and isset($job['callback'])) { $postdata = http_build_query(array( 'msg_id' => @$job['msg_id'] ?: 0, 'url' => @$job['url'], 'result' => $ret )); $opts = array( 'http' => array( 'method' => 'POST', 'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n", 'content' => $postdata, 'timeout' => 30 ) ); file_get_contents($job['callback'], false, stream_context_create($opts)); //debug_log(json_encode(@$http_response_header)); //debug_log($job['callback'].' -->'.$ret2); } return $ret; } function clean($tmpdirs, $expires=3600*24*7) { $ret = []; foreach ((array)$tmpdirs as $tmpdir) { $ret[$tmpdir] = 0; foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) { if (fileatime($_file) < (time()-$expires)) { if (@unlink($_file)) $ret[$tmpdir]++; } } } return $ret; } function backup($file, $dest) { $zip = new \ZipArchive(); if (!$zip->open($file, \ZipArchive::CREATE)) { return false; } _backup_dir($zip, $dest); $zip->close(); return $file; } function _backup_dir($zip, $dest, $sub='') { $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR; $dir = opendir($dest); if (!$dir) return false; while (false !== ($file = readdir($dir))) { if (is_file($dest . $file)) { $zip->addFile($dest . $file, $sub . $file); } else { if ($file != '.' and $file != '..' and is_dir($dest . $file)) { //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR); _backup_dir($zip, $dest . $file, $file); } } } closedir($dir); return true; } function execute_task($op, $data) { debug_log('Start...'); $t1 = microtime(true); switch($op) { case 'call': //执行任务脚本类 $cmd = $data; if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd; elseif (is_array($cmd)) { if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0]; } $ret = call($cmd); break; case 'grab': //抓取网页 if (is_string($data)) $data = ['url' => $data]; if (is_array($data)) $ret = grab($data); else throw new \Exception('无效的命令参数!'); break; case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天) if (isset($data['dirs'])) { $ret = clean($data['dirs'], @$data['expires']); } else { $ret = clean($data); } break; case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹 if (isset($data['zip']) and is_dir($data['dest'])) $ret = backup($data['zip'], $data['dest']); else throw new \Exception('没有指定需要备份的文件!'); break; case 'require': //加载脚本文件 if (is_file($data)) $ret = require($data); else throw new \Exception('不是可请求的文件!'); break; case 'test': sleep(rand(1, 5)); $ret = ucfirst(strval($data)). '.PID:'. getmypid(); break; case 'multi': //多进程处理模式 $results = $childs = []; $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid(); if (!file_exists($fifo)) { if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道 throw new Exception('make pipe failed!'); } } //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存 //shmop_write($shmid, serialize([]), 0); //$data = unserialize(shmop_read($shmid, 0, 4096)); //shmop_delete($shmid); //shmop_close($shmid); foreach($data as $_op => $_datas) { $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据 foreach($_datas as $_data) { $pid = pcntl_fork(); if ($pid == 0) { //子进程中执行任务 $_ret = execute_task($_op, $_data); $_pid = getmypid(); $pipe = fopen($fifo, 'w'); //写 //stream_set_blocking($pipe, false); $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]); if (strlen($_ret) > 4096) //写入管道的数据最大4K $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']); //debug_log('write pipe: '.$_ret); fwrite($pipe, $_ret.PHP_EOL); fflush($pipe); fclose($pipe); exit(0); //退出子进程 } elseif ($pid > 0) { //主进程中记录任务 $childs[] = $pid; $results[$pid] = 0; debug_log('fork by child: '.$pid); //pcntl_wait($status, WNOHANG); } elseif ($pid == -1) { throw new Exception('could not fork at '. getmygid()); } } } $pipe = fopen($fifo, 'r+'); //读 stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。 $n = 0; while(count($childs) > 0) { foreach($childs as $i => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); if (-1 == $res || $res > 0) { $_ret = @unserialize(fgets($pipe)); //读取管道数据 $results[$pid] = $_ret; unset($childs[$i]); debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256)); } if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程 } usleep(200000); $n++; } debug_log('child process completed.'); @fclose($pipe); @unlink($fifo); $ret = json_encode($results, 64|256); break; default: throw new \Exception('没有可执行的任务!'); break; } $t2 = microtime(true); $times = round(($t2 - $t1) * 1000, 2); $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op), @json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times); debug_log($log); return $ret; } // 读取 CLI 命令行参数 $params = getopt('', array('op:', 'data:')); $op = $params['op']; $data = unserialize(base64_decode($params['data'])); // 开始执行任务 execute_task($op, $data); function __autoload($classname) { $parts = explode('\\', ltrim($classname, '\\')); if (false !== strpos(end($parts), '_')) { array_splice($parts, -1, 1, explode('_', current($parts))); } $filename = implode(DIRECTORY_SEPARATOR, $parts) . '.php'; if ($filename = stream_resolve_include_path($filename)) { include $filename; } else if (preg_match('/.*Task$/', $classname)) { //查找以Task结尾的任务脚本类 include TASK_PATH . DIRECTORY_SEPARATOR . $classname . '.php'; } else { return false; } }
로그인 후 복사


위 내용은 PHP는 비동기 다중 프로세스 모드(코드)에서 작업 처리를 구현하기 위해 명령줄을 어떻게 사용합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
php
원천:segmentfault.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!