この記事の内容は、PHP がコマンドラインを使用して非同期マルチプロセスモードのタスク処理 (コード) を実装する方法に関するものであり、一定の参考価値があります。必要な友人は参照してください。お役に立てば幸いです。あなたに、助けてください。
PHP を使用して非同期タスクを実装することは、常に問題でした。既存の解決策の中には: PHP のよく知られた非同期フレームワークには swoole や Workerman がありますが、これらは、たとえ強制 Web 環境を構築するために、マルチプロセス モードを使用して非同期呼び出しも実装されます。ただし、途中でサーバー コードを変更できないことは言うまでもなく、実際にはサービスを開始してサーバーをクライアント メッセージを待機させる必要がない場合もあります。この記事では、フレームワークやサードパーティのライブラリを使用せずに、Web 環境でのマルチプロセス呼び出しと非同期呼び出しを CLI 環境で実装する方法を紹介します。
#Web 環境での非同期呼び出し
一般的に使用される方法は 2 つあります
1. ソケット接続を使用します
この方法は典型的な C/S アーキテクチャであり、サーバーのサポートが必要です。
// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);
ログイン後にコピー
2.popen を使用してプロセス パイプを開きます
このメソッドはオペレーティング システム コマンドを使用し、オペレーティング システムによって直接実行されます。
この記事で説明する非同期呼び出しでは、このメソッドが使用されます。
$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件
$op = 'call'; //脚本文件接收的参数1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL.'异步任务已执行。'.PHP_EOL;
ログイン後にコピー
この方法の利点は、1 ステップで解決でき、現在のプロセスにオーバーヘッドが必要ないことです。
欠点も明らかです。タスク スクリプトの実行ステータスを追跡することができません。
そこで注目はタスクを実行するスクリプトファイルですが、ここではタスク処理と多重処理の実装について紹介します。
CLI
環境でのマルチプロセス タスクの処理
注
: マルチプロセス モードでは、Linux のみがサポートされており、サポートされていません。 # ##ウィンドウズ###! ! ここでは (フレームワークやクラス ライブラリを使用せずに) 0 から各ステップを紹介します。 最後に完全なコードが表示されます 。
1. スクリプトを作成します
どのスクリプトでも無視できない側面は、
エラー処理
次のステップは、新しいクラスが使用されるたびに require / include する手間を避けるために、自動ロード関数 spl_autoload_register を定義することです。
それでは、メインプロセスはどのようにして子プロセスのステータスを知るのでしょうか?
pcntl_wait を使用します。この関数には 2 つのパラメータ $status と $options があります。$status は参照型で、子プロセスのステータスを保存するために使用されます。$options には 2 つのオプションの定数 WNOHANG|WUNTRACED があり、これは子プロセスの終了を待たずにすぐに戻ることを意味しますと子プロセスをそれぞれ待機し、プロセスは終了します。明らかに、WUNTRACED を使用するとメインプロセスがブロックされます。 (pcntl_waitpid 関数を使用して、特定の pid 子プロセスのステータスを取得することもできます。)
複数のプロセスの場合、メイン プロセスが行う必要があるのは、各子プロセスのステータスを管理することです。そうでない場合、子プロセスがステータスを管理する必要がある可能性があります。終了できなくなり、ゾンビプロセスになります。
複数のプロセス間のメッセージ通信について
このセクションでは、特定のビジネス ロジックに関わる必要があるため、簡単に説明します。
redis
などのサードパーティ サービスの使用を考慮せずに、PHP は
パイプライン通信 と 共有メモリ
をネイティブに実装できます。実装は比較的簡単ですが、使用できるデータ容量が限られており、単純なテキスト プロトコルを使用してのみデータを交換できるという欠点があります。
すべてのプロセス タスクを手動で終了する方法如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。
以下是完整的任务执行脚本代码:
可能无法直接使用,需要修改的地方有:
脚本目录和日志目录常量
自动加载任务类的方法(默认是加载脚本目录中以Task
结尾的文件)
其他的如:错误和日志处理方式和文本格式就随意吧...
如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。
多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。
<?php
error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);
@ini_set('display_errors', 0);
@ini_set('date.timezone', 'PRC');
chdir(__DIR__);
/* 任务脚本目录 */
defined('TASK_PATH') or define('TASK_PATH', realpath(__DIR__ .'/tasks'));
/* 任务日志目录 */
defined('TASK_LOGS_PATH') or define('TASK_LOGS_PATH', __DIR__ .'/tasks/logs');
if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);
set_exception_handler(function($e) {
$time = date('H:i:s', time());
$msg = sprintf(''. '<h3>[%s] %s (%s)</h3>'. "\n". '<pre class="brush:php;toolbar:false">%s
',
$time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()
);
file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX);
});
set_error_handler(function($errno, $errmsg, $filename, $line) {
if (!(error_reporting() & $errno)) return;
ob_start();
debug_print_backtrace();
$backtrace = ob_get_contents(); ob_end_clean();
$datetime = date('Y-m-d H:i:s', time());
$msg = <<
$header) {
if (!is_numeric($_k))
$header = sprintf('%s: %s', $_k, $header);
$_headers .= $header . "\r\n";
}
}
$headers = "Connection: close\r\n" . $_headers;
$opts = array(
'http' => array(
'method' => strtoupper(@$job['method'] ?: 'get'),
'content' => @$job['data'] ?: null,
'header' => $headers,
'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)',
'proxy' => @$job['args']['proxy'] ?: null,
'timeout' => intval(@$job['args']['timeout'] ?: 120),
'protocol_version' => @$job['args']['protocol_version'] ?: '1.1',
'max_redirects' => 3,
'ignore_errors' => true
)
);
$ret = @file_get_contents($url, false, stream_context_create($opts));
//debug_log($url.' -->'.strlen($ret));
if ($ret and isset($job['callback'])) {
$postdata = http_build_query(array(
'msg_id' => @$job['msg_id'] ?: 0,
'url' => @$job['url'],
'result' => $ret
));
$opts = array(
'http' => array(
'method' => 'POST',
'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n",
'content' => $postdata,
'timeout' => 30
)
);
file_get_contents($job['callback'], false, stream_context_create($opts));
//debug_log(json_encode(@$http_response_header));
//debug_log($job['callback'].' -->'.$ret2);
}
return $ret;
}
function clean($tmpdirs, $expires=3600*24*7) {
$ret = [];
foreach ((array)$tmpdirs as $tmpdir) {
$ret[$tmpdir] = 0;
foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) {
if (fileatime($_file) < (time()-$expires)) {
if (@unlink($_file)) $ret[$tmpdir]++;
}
}
}
return $ret;
}
function backup($file, $dest) {
$zip = new \ZipArchive();
if (!$zip->open($file, \ZipArchive::CREATE)) {
return false;
}
_backup_dir($zip, $dest);
$zip->close();
return $file;
}
function _backup_dir($zip, $dest, $sub='') {
$dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
$sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;
$dir = opendir($dest);
if (!$dir) return false;
while (false !== ($file = readdir($dir))) {
if (is_file($dest . $file)) {
$zip->addFile($dest . $file, $sub . $file);
} else {
if ($file != '.' and $file != '..' and is_dir($dest . $file)) {
//$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);
_backup_dir($zip, $dest . $file, $file);
}
}
}
closedir($dir);
return true;
}
function execute_task($op, $data) {
debug_log('Start...');
$t1 = microtime(true);
switch($op) {
case 'call': //执行任务脚本类
$cmd = $data;
if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;
elseif (is_array($cmd)) {
if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];
}
$ret = call($cmd);
break;
case 'grab': //抓取网页
if (is_string($data)) $data = ['url' => $data];
if (is_array($data)) $ret = grab($data);
else throw new \Exception('无效的命令参数!');
break;
case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天)
if (isset($data['dirs'])) {
$ret = clean($data['dirs'], @$data['expires']);
} else {
$ret = clean($data);
}
break;
case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹
if (isset($data['zip']) and is_dir($data['dest']))
$ret = backup($data['zip'], $data['dest']);
else
throw new \Exception('没有指定需要备份的文件!');
break;
case 'require': //加载脚本文件
if (is_file($data)) $ret = require($data);
else throw new \Exception('不是可请求的文件!');
break;
case 'test':
sleep(rand(1, 5));
$ret = ucfirst(strval($data)). '.PID:'. getmypid();
break;
case 'multi': //多进程处理模式
$results = $childs = [];
$fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid();
if (!file_exists($fifo)) {
if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道
throw new Exception('make pipe failed!');
}
}
//$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存
//shmop_write($shmid, serialize([]), 0);
//$data = unserialize(shmop_read($shmid, 0, 4096));
//shmop_delete($shmid);
//shmop_close($shmid);
foreach($data as $_op => $_datas) {
$_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据
foreach($_datas as $_data) {
$pid = pcntl_fork();
if ($pid == 0) { //子进程中执行任务
$_ret = execute_task($_op, $_data);
$_pid = getmypid();
$pipe = fopen($fifo, 'w'); //写
//stream_set_blocking($pipe, false);
$_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]);
if (strlen($_ret) > 4096) //写入管道的数据最大4K
$_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']);
//debug_log('write pipe: '.$_ret);
fwrite($pipe, $_ret.PHP_EOL);
fflush($pipe);
fclose($pipe);
exit(0); //退出子进程
} elseif ($pid > 0) { //主进程中记录任务
$childs[] = $pid;
$results[$pid] = 0;
debug_log('fork by child: '.$pid);
//pcntl_wait($status, WNOHANG);
} elseif ($pid == -1) {
throw new Exception('could not fork at '. getmygid());
}
}
}
$pipe = fopen($fifo, 'r+'); //读
stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。
$n = 0;
while(count($childs) > 0) {
foreach($childs as $i => $pid) {
$res = pcntl_waitpid($pid, $status, WNOHANG);
if (-1 == $res || $res > 0) {
$_ret = @unserialize(fgets($pipe)); //读取管道数据
$results[$pid] = $_ret;
unset($childs[$i]);
debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256));
}
if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程
}
usleep(200000); $n++;
}
debug_log('child process completed.');
@fclose($pipe);
@unlink($fifo);
$ret = json_encode($results, 64|256);
break;
default:
throw new \Exception('没有可执行的任务!');
break;
}
$t2 = microtime(true);
$times = round(($t2 - $t1) * 1000, 2);
$log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op),
@json_encode($data, 64|256), @strlen($ret)<65?$ret:@strlen($ret), $times);
debug_log($log);
return $ret;
}
// 读取 CLI 命令行参数
$params = getopt('', array('op:', 'data:'));
$op = $params['op'];
$data = unserialize(base64_decode($params['data']));
// 开始执行任务
execute_task($op, $data);
function __autoload($classname) {
$parts = explode('\\', ltrim($classname, '\\'));
if (false !== strpos(end($parts), '_')) {
array_splice($parts, -1, 1, explode('_', current($parts)));
}
$filename = implode(DIRECTORY_SEPARATOR, $parts) . '.php';
if ($filename = stream_resolve_include_path($filename)) {
include $filename;
} else if (preg_match('/.*Task$/', $classname)) { //查找以Task结尾的任务脚本类
include TASK_PATH . DIRECTORY_SEPARATOR . $classname . '.php';
} else {
return false;
}
}ログイン後にコピー
以上がPHP はコマンド ラインを使用して非同期マルチプロセス モードでタスク処理を実装する方法 (コード)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。