이 기사에서는 PHP 프로그래밍의 5가지 동시성 방법을 대략적으로 요약합니다.
1.curl_multi_init
문서에는 여러 cURL 핸들을 비동기식으로 처리할 수 있다고 나와 있습니다. 여기서 이해해야 할 것은 select 방법입니다. 이는 모든 컬_멀티 연결에 대한 활동이 있을 때까지 차단 문서에 설명되어 있습니다. 일반적인 비동기 모델을 이해하시면 이해하실 수 있을 텐데요, select와 epoll이 모두 유명합니다
<?php // build the individual requests as above, but do not execute them $ch_1 = curl_init('http://www.jb51.net/'); $ch_2 = curl_init('http://www.jb51.net/'); curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true); // build the multi-curl handle, adding both $ch $mh = curl_multi_init(); curl_multi_add_handle($mh, $ch_1); curl_multi_add_handle($mh, $ch_2); // execute all queries simultaneously, and continue when all are complete $running = null; do { curl_multi_exec($mh, $running); $ch = curl_multi_select($mh); if($ch !== 0){ $info = curl_multi_info_read($mh); if($info){ var_dump($info); $response_1 = curl_multi_getcontent($info['handle']); echo "$response_1 \n"; break; } } } while ($running > 0); //close the handles curl_multi_remove_handle($mh, $ch_1); curl_multi_remove_handle($mh, $ch_2); curl_multi_close($mh);
여기서 설정한 것은 select가 결과를 얻을 때 루프를 종료하고 컬 리소스를 삭제하여 http 요청을 취소한다는 것입니다.
2.swoole_client
swoole_client는 비동기 모드를 제공하는데, 사실 이 점을 잊어버렸습니다. 여기서 수면 방법을 사용하려면 swoole 버전이 1.7.21 이상이어야 합니다. 아직 이 버전으로 업그레이드하지 않았으므로 바로 종료할 수 있습니다.
<?php $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); //设置事件回调函数 $client->on("connect", function($cli) { $req = "GET / HTTP/1.1\r\n Host: www.jb51.net\r\n Connection: keep-alive\r\n Cache-Control: no-cache\r\n Pragma: no-cache\r\n\r\n"; for ($i=0; $i < 3; $i++) { $cli->send($req); } }); $client->on("receive", function($cli, $data){ echo "Received: ".$data."\n"; exit(0); $cli->sleep(); // swoole >= 1.7.21 }); $client->on("error", function($cli){ echo "Connect failed\n"; }); $client->on("close", function($cli){ echo "Connection close\n"; }); //发起网络连接 $client->connect('183.207.95.145', 80, 1);
3.프로세스
안녕하세요. swoole_process를 잊어버릴 뻔했습니다. 여기에는 pcntl 모듈이 필요하지 않습니다. 하지만 작성한 후에는 이것이 실제로 인터럽트 요청이 아니라 먼저 오는 것이 읽혀지고 후속 반환 값을 무시한다는 것을 알았습니다.
<?php $workers = []; $worker_num = 3;//创建的进程数 $finished = false; $lock = new swoole_lock(SWOOLE_MUTEX); for($i=0;$i<$worker_num ; $i++){ $process = new swoole_process('process'); //$process->useQueue(); $pid = $process->start(); $workers[$pid] = $process; } foreach($workers as $pid => $process){ //子进程也会包含此事件 swoole_event_add($process->pipe, function ($pipe) use($process, $lock, &$finished) { $lock->lock(); if(!$finished){ $finished = true; $data = $process->read(); echo "RECV: " . $data.PHP_EOL; } $lock->unlock(); }); } function process(swoole_process $process){ $response = 'http response'; $process->write($response); echo $process->pid,"\t",$process->callback .PHP_EOL; } for($i = 0; $i < $worker_num; $i++) { $ret = swoole_process::wait(); $pid = $ret['pid']; echo "Worker Exit, PID=".$pid.PHP_EOL; }
4.p스레드
pthreads 모듈을 컴파일할 때 PHP를 컴파일할 때 ZTS를 켜야 한다는 메시지가 나오므로 thread safe 버전을 사용해야 할 것 같습니다. wamp에 있는 PHP 중 상당수가 TS인 경우가 많아서 직접 dll을 다운로드해서 복사했습니다. 문서의 지침을 해당 디렉토리로 이동하여 win에서 테스트했습니다. 아직도 잘 이해가 안가는데, PHP의 pthread와 POSIX pthread가 전혀 다르다는 기사를 발견했습니다. 코드가 약간 좋지 않습니다. 코드에 대한 느낌을 얻으려면 더 많은 문서를 읽어야 합니다.
<?php class Foo extends Stackable { public $url; public $response = null; public function __construct(){ $this->url = 'http://www.jb51.net'; } public function run(){} } class Process extends Worker { private $text = ""; public function __construct($text,$object){ $this->text = $text; $this->object = $object; } public function run(){ while (is_null($this->object->response)){ print " Thread {$this->text} is running\n"; $this->object->response = 'http response'; sleep(1); } } } $foo = new Foo(); $a = new Process("A",$foo); $a->start(); $b = new Process("B",$foo); $b->start(); echo $foo->response;
5.수익률
동기식으로 비동기 코드 작성:
<?php class AsyncServer { protected $handler; protected $socket; protected $tasks = []; protected $timers = []; public function __construct(callable $handler) { $this->handler = $handler; $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$this->socket) { die(socket_strerror(socket_last_error())."\n"); } if (!socket_set_nonblock($this->socket)) { die(socket_strerror(socket_last_error())."\n"); } if(!socket_bind($this->socket, "0.0.0.0", 1234)) { die(socket_strerror(socket_last_error())."\n"); } } public function Run() { while (true) { $now = microtime(true) * 1000; foreach ($this->timers as $time => $sockets) { if ($time > $now) break; foreach ($sockets as $one) { list($socket, $coroutine) = $this->tasks[$one]; unset($this->tasks[$one]); socket_close($socket); $coroutine->throw(new Exception("Timeout")); } unset($this->timers[$time]); } $reads = array($this->socket); foreach ($this->tasks as list($socket)) { $reads[] = $socket; } $writes = NULL; $excepts= NULL; if (!socket_select($reads, $writes, $excepts, 0, 1000)) { continue; } foreach ($reads as $one) { $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port); if (!$len) { //echo "socket_recvfrom fail.\n"; continue; } if ($one == $this->socket) { //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n"; $handler = $this->handler; $coroutine = $handler($one, $data, $len, $ip, $port); if (!$coroutine) { //echo "[Run]everything is done.\n"; continue; } $task = $coroutine->current(); //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n"; $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$socket) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } if (!socket_set_nonblock($socket)) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port); $deadline = $now + $task->timeout; $this->tasks[$socket] = [$socket, $coroutine, $deadline]; $this->timers[$deadline][$socket] = $socket; } else { //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n"; list($socket, $coroutine, $deadline) = $this->tasks[$one]; unset($this->tasks[$one]); unset($this->timers[$deadline][$one]); socket_close($socket); $coroutine->send(array($data, $len)); } } } } } class AsyncTask { public $data; public $len; public $ip; public $port; public $timeout; public function __construct($data, $len, $ip, $port, $timeout) { $this->data = $data; $this->len = $len; $this->ip = $ip; $this->port = $port; $this->timeout = $timeout; } } function AsyncSendRecv($req_buf, $req_len, $ip, $port, $timeout) { return new AsyncTask($req_buf, $req_len, $ip, $port, $timeout); } function RequestHandler($socket, $req_buf, $req_len, $ip, $port) { //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n"; try { list($rsp_buf, $rsp_len) = (yield AsyncSendRecv($req_buf, $req_len, "127.0.0.1", 2345, 3000)); } catch (Exception $ex) { $rsp_buf = $ex->getMessage(); $rsp_len = strlen($rsp_buf); //echo "[Exception]$rsp_buf\n"; } //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n"; socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port); } $server = new AsyncServer(RequestHandler); $server->Run(); ?>
코드 해석:
PHP에 내장된 배열 기능을 사용하여 간단한 "시간 초과 관리"를 구현하고 밀리초 단위의 정밀도를 시간 분할로 사용합니다.
AsyncSendRecv 인터페이스를 캡슐화하고 이를 보다 자연스러운 Yield AsyncSendRecv() 형식으로 호출합니다.
오류 처리 메커니즘으로 Exception을 추가할 수도 있습니다. 이는 데모 목적으로만 사용됩니다.