This article roughly summarizes the five concurrency methods in PHP programming:
1.curl_multi_init
The document says Allows the processing of multiple cURL handles asynchronously. It is indeed asynchronous. What needs to be understood here is the select method, which is explained in the document Blocks until there is activity on any of the curl_multi connections.. You should be able to understand if you understand the common asynchronous models, select and epoll are all famous
<?php // build the individual requests as above, but do not execute them $ch_1 = curl_init('http://www.jb51.net/'); $ch_2 = curl_init('http://www.jb51.net/'); curl_setopt($ch_1, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch_2, CURLOPT_RETURNTRANSFER, true); // build the multi-curl handle, adding both $ch $mh = curl_multi_init(); curl_multi_add_handle($mh, $ch_1); curl_multi_add_handle($mh, $ch_2); // execute all queries simultaneously, and continue when all are complete $running = null; do { curl_multi_exec($mh, $running); $ch = curl_multi_select($mh); if($ch !== 0){ $info = curl_multi_info_read($mh); if($info){ var_dump($info); $response_1 = curl_multi_getcontent($info['handle']); echo "$response_1 \n"; break; } } } while ($running > 0); //close the handles curl_multi_remove_handle($mh, $ch_1); curl_multi_remove_handle($mh, $ch_2); curl_multi_close($mh);
What I set here is that when the select gets the result, it will exit the loop and delete the curl resource, so as to cancel the http request.
2.swoole_client
swoole_client provides asynchronous mode, I actually forgot about this. The sleep method here requires that the swoole version is greater than or equal to 1.7.21. I haven't upgraded to this version yet, so I can just exit directly.
<?php $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC); //设置事件回调函数 $client->on("connect", function($cli) { $req = "GET / HTTP/1.1\r\n Host: www.jb51.net\r\n Connection: keep-alive\r\n Cache-Control: no-cache\r\n Pragma: no-cache\r\n\r\n"; for ($i=0; $i < 3; $i++) { $cli->send($req); } }); $client->on("receive", function($cli, $data){ echo "Received: ".$data."\n"; exit(0); $cli->sleep(); // swoole >= 1.7.21 }); $client->on("error", function($cli){ echo "Connect failed\n"; }); $client->on("close", function($cli){ echo "Connection close\n"; }); //发起网络连接 $client->connect('183.207.95.145', 80, 1);
3.process
Hey, I almost forgot about swoole_process. There is no need for the pcntl module here. But after writing it, I found that this is not actually an interrupt request, but whichever comes first is read, ignoring the subsequent return value.
<?php $workers = []; $worker_num = 3;//创建的进程数 $finished = false; $lock = new swoole_lock(SWOOLE_MUTEX); for($i=0;$i<$worker_num ; $i++){ $process = new swoole_process('process'); //$process->useQueue(); $pid = $process->start(); $workers[$pid] = $process; } foreach($workers as $pid => $process){ //子进程也会包含此事件 swoole_event_add($process->pipe, function ($pipe) use($process, $lock, &$finished) { $lock->lock(); if(!$finished){ $finished = true; $data = $process->read(); echo "RECV: " . $data.PHP_EOL; } $lock->unlock(); }); } function process(swoole_process $process){ $response = 'http response'; $process->write($response); echo $process->pid,"\t",$process->callback .PHP_EOL; } for($i = 0; $i < $worker_num; $i++) { $ret = swoole_process::wait(); $pid = $ret['pid']; echo "Worker Exit, PID=".$pid.PHP_EOL; }
4.pthreads
When compiling the pthreads module, it prompts that ZTS must be turned on when compiling PHP, so it seems that the thread safe version must be used. Many PHP in wamp happens to be TS, so I directly downloaded a dll and copied the instructions in the document to the corresponding directory. Tested under win. I still don’t fully understand it, but I found an article saying that PHP’s pthreads and POSIX pthreads are completely different. The code is a bit bad, I need to read more documentation to get a feel for it.
<?php class Foo extends Stackable { public $url; public $response = null; public function __construct(){ $this->url = 'http://www.jb51.net'; } public function run(){} } class Process extends Worker { private $text = ""; public function __construct($text,$object){ $this->text = $text; $this->object = $object; } public function run(){ while (is_null($this->object->response)){ print " Thread {$this->text} is running\n"; $this->object->response = 'http response'; sleep(1); } } } $foo = new Foo(); $a = new Process("A",$foo); $a->start(); $b = new Process("B",$foo); $b->start(); echo $foo->response;
5.yield
Write asynchronous code in a synchronous manner:
<?php class AsyncServer { protected $handler; protected $socket; protected $tasks = []; protected $timers = []; public function __construct(callable $handler) { $this->handler = $handler; $this->socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$this->socket) { die(socket_strerror(socket_last_error())."\n"); } if (!socket_set_nonblock($this->socket)) { die(socket_strerror(socket_last_error())."\n"); } if(!socket_bind($this->socket, "0.0.0.0", 1234)) { die(socket_strerror(socket_last_error())."\n"); } } public function Run() { while (true) { $now = microtime(true) * 1000; foreach ($this->timers as $time => $sockets) { if ($time > $now) break; foreach ($sockets as $one) { list($socket, $coroutine) = $this->tasks[$one]; unset($this->tasks[$one]); socket_close($socket); $coroutine->throw(new Exception("Timeout")); } unset($this->timers[$time]); } $reads = array($this->socket); foreach ($this->tasks as list($socket)) { $reads[] = $socket; } $writes = NULL; $excepts= NULL; if (!socket_select($reads, $writes, $excepts, 0, 1000)) { continue; } foreach ($reads as $one) { $len = socket_recvfrom($one, $data, 65535, 0, $ip, $port); if (!$len) { //echo "socket_recvfrom fail.\n"; continue; } if ($one == $this->socket) { //echo "[Run]request recvfrom succ. data=$data ip=$ip port=$port\n"; $handler = $this->handler; $coroutine = $handler($one, $data, $len, $ip, $port); if (!$coroutine) { //echo "[Run]everything is done.\n"; continue; } $task = $coroutine->current(); //echo "[Run]AsyncTask recv. data=$task->data ip=$task->ip port=$task->port timeout=$task->timeout\n"; $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); if(!$socket) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } if (!socket_set_nonblock($socket)) { //echo socket_strerror(socket_last_error())."\n"; $coroutine->throw(new Exception(socket_strerror(socket_last_error()), socket_last_error())); continue; } socket_sendto($socket, $task->data, $task->len, 0, $task->ip, $task->port); $deadline = $now + $task->timeout; $this->tasks[$socket] = [$socket, $coroutine, $deadline]; $this->timers[$deadline][$socket] = $socket; } else { //echo "[Run]response recvfrom succ. data=$data ip=$ip port=$port\n"; list($socket, $coroutine, $deadline) = $this->tasks[$one]; unset($this->tasks[$one]); unset($this->timers[$deadline][$one]); socket_close($socket); $coroutine->send(array($data, $len)); } } } } } class AsyncTask { public $data; public $len; public $ip; public $port; public $timeout; public function __construct($data, $len, $ip, $port, $timeout) { $this->data = $data; $this->len = $len; $this->ip = $ip; $this->port = $port; $this->timeout = $timeout; } } function AsyncSendRecv($req_buf, $req_len, $ip, $port, $timeout) { return new AsyncTask($req_buf, $req_len, $ip, $port, $timeout); } function RequestHandler($socket, $req_buf, $req_len, $ip, $port) { //echo "[RequestHandler] before yield AsyncTask. REQ=$req_buf\n"; try { list($rsp_buf, $rsp_len) = (yield AsyncSendRecv($req_buf, $req_len, "127.0.0.1", 2345, 3000)); } catch (Exception $ex) { $rsp_buf = $ex->getMessage(); $rsp_len = strlen($rsp_buf); //echo "[Exception]$rsp_buf\n"; } //echo "[RequestHandler] after yield AsyncTask. RSP=$rsp_buf\n"; socket_sendto($socket, $rsp_buf, $rsp_len, 0, $ip, $port); } $server = new AsyncServer(RequestHandler); $server->Run(); ?>
Code interpretation:
Use PHP’s built-in array capability to implement simple “timeout management” and use millisecond precision as time slicing;
Encapsulate the AsyncSendRecv interface and call it in the form of yield AsyncSendRecv(), which is more natural;
Add Exception as an error handling mechanism. You can also add ret_code, which is for demonstration purposes only.