PHP的多工協程處理的分析

不言
發布: 2023-04-03 08:24:01
原創
1607 人瀏覽過

這篇文章主要介紹了關於PHP的多任務協程處理,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下

那麼,開始吧!

PHP的多工協程處理的分析

這就是本文我們要討論的問題。不過我們會從更簡單更熟悉的範例開始。

一切從陣列開始

我們可以透過簡單的遍歷來使用陣列:

$array = ["foo", "bar", "baz"];
 
foreach ($array as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}
 
for ($i = 0; $i <p>這是我們日常編碼所依賴的基本實作。可以透過遍歷數組來取得每個元素的鍵名和鍵值。 </p><p>當然,如果我們希望能夠知道何時可以使用陣列。 PHP 提供了一個方便的內建函數:</p><pre class="brush:php;toolbar:false">print is_array($array) ? "yes" : "no"; // yes
登入後複製

類別數組處理

有時,我們需要對一些資料使用相同的方式進行遍歷處理,但它們並非數組類型。例如對 DOMDocument 類別進行處理:

$document = new DOMDocument();
$document->loadXML("<p></p>");

$elements = $document->getElementsByTagName("p");
print_r($elements); // DOMNodeList Object ( [length] => 1 )
登入後複製

這顯然不是一個數組,但它有一個 length 屬性。我們能像遍歷數組一樣,對其進行遍歷麼?我們可以判斷它是否實作了下面這個特殊的介面:

print ($elements instanceof Traversable) ? "yes" : "no"; // yes
登入後複製

這真的太有用了。它不會導致我們在遍歷非可遍歷資料時觸發錯誤。我們僅需在處理前進行檢測即可。

不過,這會引發另一個問題:我們能否讓自訂類別也擁有這個功能呢?答案是肯定的!第一個實作方法類似如下:

class MyTraversable implements Traversable
{
    //  在这里编码...
}
登入後複製

如果我們執行這個類,我們將看到一個錯誤訊息:

PHP Fatal error: Class MyTraversable must implement interface Traversable as part of either Iterator or IteratorAggregate

Iterator(迭代器)

我們無法直接實作Traversable,但是我們可以嘗試第二種方案:

class MyTraversable implements Iterator
{
    //  在这里编码...
}
登入後複製

這個介面需要我們實作5 個方法。讓我們完善我們的迭代器:

class MyTraversable implements Iterator
{
    protected $data;

    protected $index = 0;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function current()
    {
        return $this->data[$this->index];
    }

    public function next()
    {
        return $this->data[$this->index++];
    }

    public function key()
    {
        return $this->index;
    }

    public function rewind()
    {
        $this->index = 0;
    }

    public function valid()
    {
        return $this->index data);
    }
}
登入後複製

這邊我們需要注意幾個事項:

  1. 我們需要儲存建構器方法傳入的$data 數組,以便後續我們可以從中取得它的元素。

  2. 也需要一個內部索引(或指標)來追蹤 currentnext 元素。

  3. rewind() 僅重設index# 屬性,這樣current()next() 才能正常運作。

  4. 鍵名稱並非只能是數字型別!這裡使用數組索引是為了確保範例足夠簡單。

我們可以向下面這樣運行這段程式碼:

$iterator = new MyIterator(["foo", "bar", "baz"]);
 
foreach ($iterator as $key => $value) {
    print "item: " . $key . "|" . $value . "\n";
}
登入後複製

這看起來需要處理太多工作,但是這是能夠像數組一樣使用foreach /for 功能的一個簡潔實作。

IteratorAggregate(聚合迭代器)

還記得第二個介面拋出的 Traversable 異常麼?下面來看一個比實作 Iterator 介面更快的實作吧:

class MyIteratorAggregate implements IteratorAggregate
{
    protected $data;

    public function __construct($data)
    {
        $this->data = $data;
    }

    public function getIterator()
    {
        return new ArrayIterator($this->data);
    }
}
登入後複製

這裡我們作弊了。相較於實作一個完整的 Iterator,我們透過 ArrayIterator() 裝飾。不過,這相比於透過實現完整的 Iterator 簡化了不少程式碼。

PHP的多工協程處理的分析

兄弟莫急!先讓我們比較一些程式碼。首先,我們在不使用生成器的情況下從檔案中讀取每一行資料:

$content = file_get_contents(__FILE__);

$lines = explode("\n", $content);

foreach ($lines as $i => $line) {
    print $i . ". " . $line . "\n";
}
登入後複製

這段程式碼讀取檔案自身,然後會列印出每行的行號和程式碼。那為什麼我們不使用生成器呢!

function lines($file) {
    $handle = fopen($file, 'r');

    while (!feof($handle)) {
        yield trim(fgets($handle));
    }

    fclose($handle);
}

foreach (lines(__FILE__) as $i => $line) {
    print $i . ". " . $line . "\n";
}
登入後複製

我知道這看起來更複雜。不錯,不過這是因為我們沒有使用 file_get_contents() 函數。一個生成器看起來就像是函數,但是它會在每次取得到 yield 關鍵字是停止運作。

產生器看起來有點像迭代器:

print_r(lines(__FILE__)); // Generator Object ( )
登入後複製

儘管它不是迭代器,它是一個 Generator。它的內部定義了什麼方法呢?

print_r(get_class_methods(lines(__FILE__)));
 
// Array
// (
//     [0] => rewind
//     [1] => valid
//     [2] => current
//     [3] => key
//     [4] => next
//     [5] => send
//     [6] => throw
//     [7] => __wakeup
// )
登入後複製
如果你讀取一個大文件,然後使用 memory_get_peak_usage(),你會注意到生成器的程式碼會使用固定的內存,無論這個文件有多大。它每次進度去一行。而是用 file_get_contents() 函數讀取整個文件,會使用更大的記憶體。這就是在迭代處理這類事物時,生成器的能帶給我們的優勢!

Send(傳送資料)

可以將資料傳送到生成器。看下面這個生成器:

<?php $generator = call_user_func(function() {
    yield "foo";
});

print $generator->current() . "\n"; // foo
登入後複製
注意这里我们如何在 call_user_func() 函数中封装生成器函数的?这里仅仅是一个简单的函数定义,然后立即调用它获取一个新的生成器实例...

我们已经见过 yield 的用法。我们可以通过扩展这个生成器来接收数据:

$generator = call_user_func(function() {
    $input = (yield "foo");

    print "inside: " . $input . "\n";
});

print $generator->current() . "\n";

$generator->send("bar");
登入後複製

数据通过 yield 关键字传入和返回。首先,执行 current() 代码直到遇到 yield,返回 foosend() 将输出传入到生成器打印输入的位置。你需要习惯这种用法。

抛出异常(Throw)

由于我们需要同这些函数进行交互,可能希望将异常推送到生成器中。这样这些函数就可以自行处理异常。

看看下面这个示例:

$multiply = function($x, $y) {
    yield $x * $y;
};

print $multiply(5, 6)->current(); // 30
登入後複製

现在让我们将它封装到另一个函数中:

$calculate = function ($op, $x, $y) use ($multiply) {
    if ($op === 'multiply') {
        $generator = $multiply($x, $y);

        return $generator->current();
    }
};

print $calculate("multiply", 5, 6); // 30
登入後複製

这里我们通过一个普通闭包将乘法生成器封装起来。现在让我们验证无效参数:

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            throw new InvalidArgumentException();
        }

        return $generator->current();
    }
};

print $calculate('multiply', 5, 'foo'); // PHP Fatal error...
登入後複製

如果我们希望能够通过生成器处理异常?我们怎样才能将异常传入生成器呢!

$multiply = function ($x, $y) {
    try {
        yield $x * $y;
    } catch (InvalidArgumentException $exception) {
        print "ERRORS!";
    }
};

$calculate = function ($op, $x, $y) use ($multiply) {

    if ($op === "multiply") {
        $generator = $multiply($x, $y);

        if (!is_numeric($x) || !is_numeric($y)) {
            $generator->throw(new InvalidArgumentException());
        }

        return $generator->current();
    }
};
print $calculate('multiply', 5, 'foo'); // PHP Fatal error...
登入後複製

棒呆了!我们不仅可以像迭代器一样使用生成器。还可以通过它们发送数据并抛出异常。它们是可中断和可恢复的函数。有些语言把这些函数叫做……

PHP的多工協程處理的分析

我们可以使用协程(coroutines)来构建异步代码。让我们来创建一个简单的任务调度程序。首先我们需要一个 Task 类:

class Task
{
    protected $generator;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        $this->generator->next();
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}
登入後複製

Task 是普通生成器的装饰器。我们将生成器赋值给它的成员变量以供后续使用,然后实现一个简单的 run()finished() 方法。run() 方法用于执行任务,finished() 方法用于让调度程序知道何时终止运行。

然后我们需要一个 Scheduler 类:

class Scheduler
{
    protected $queue;

    public function __construct()
    {
        $this->queue = new SplQueue();
    }

    public function enqueue(Task $task)
    {
        $this->queue->enqueue($task);
    }

    pulic function run()
    {
        while (!$this->queue->isEmpty()) {
            $task = $this->queue->dequeue();
            $task->run();

            if (!$task->finished()) {
                $this->queue->enqueue($task);
            }
        }
    }
}
登入後複製

Scheduler 用于维护一个待执行的任务队列。run() 会弹出队列中的所有任务并执行它,直到运行完整个队列任务。如果某个任务没有执行完毕,当这个任务本次运行完成后,我们将再次入列。

SplQueue 对于这个示例来讲再合适不过了。它是一种 FIFO(先进先出:fist in first out) 数据结构,能够确保每个任务都能够获取足够的处理时间。

我们可以像这样运行这段代码:

$scheduler = new Scheduler();

$task1 = new Task(call_user_func(function() {
    for ($i = 0; $i enqueue($task1);
$scheduler->enqueue($task2);

$scheduler->run();
登入後複製

运行时,我们将看到如下执行结果:

task 1: 0
task 1: 1
task 2: 0
task 2: 1
task 1: 2
task 2: 2
task 2: 3
task 2: 4
task 2: 5
登入後複製

这几乎就是我们想要的执行结果。不过有个问题发生在首次运行每个任务时,它们都执行了两次。我们可以对 Task 类稍作修改来修复这个问题:

class Task
{
    protected $generator;

    protected $run = false;

    public function __construct(Generator $generator)
    {
        $this->generator = $generator;
    }

    public function run()
    {
        if ($this->run) {
            $this->generator->next();
        } else {
            $this->generator->current();
        }

        $this->run = true;
    }

    public function finished()
    {
        return !$this->generator->valid();
    }
}
登入後複製

我们需要调整首次 run() 方法调用,从生成器当前有效的指针读取运行。后续调用可以从下一个指针读取运行...

PHP的多工協程處理的分析

有些人基于这个思路实现了一些超赞的类库。我们来看看其中的两个...

RecoilPHP

RecoilPHP 是一套基于协程的类库,它最令人印象深刻的是用于 ReactPHP 内核。可以将事件循环在 RecoilPHP 和 RecoilPHP 之间进行交换,而你的程序无需架构上的调整。

我们来看一下 ReactPHP 异步 DNS 解决方案:

function resolve($domain, $resolver) {
    $resolver
        ->resolve($domain)
        ->then(function ($ip) use ($domain) {
            print "domain: " . $domain . "\n";
            print "ip: " . $ip . "\n";
        }, function ($error) {            
            print $error . "\n";
        })
}

function run()
{
    $loop = React\EventLoop\Factory::create();
 
    $factory = new React\Dns\Resolver\Factory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    resolve("silverstripe.org", $resolver);
    resolve("wordpress.org", $resolver);
    resolve("wardrobecms.com", $resolver);
    resolve("pagekit.com", $resolver);
 
    $loop->run();
}
 
run();
登入後複製

resolve() 接收域名和 DNS 解析器,并使用 ReactPHP 执行标准的 DNS 查找。不用太过纠结与 resolve() 函数内部。重要的是这个函数不是生成器,而是一个函数!

run() 创建一个 ReactPHP 事件循环,DNS 解析器(这里是个工厂实例)解析若干域名。同样,这个也不是一个生成器。

想知道 RecoilPHP 到底有何不同?还希望掌握更多细节!

use Recoil\Recoil;
 
function resolve($domain, $resolver)
{
    try {
        $ip = (yield $resolver->resolve($domain));
 
        print "domain: " . $domain . "\n";
        print "ip: " . $ip . "\n";
    } catch (Exception $exception) {
        print $exception->getMessage() . "\n";
    }
}
 
function run()
{
    $loop = (yield Recoil::eventLoop());
 
    $factory = new React\Dns\Resolver\Factory();
 
    $resolver = $factory->create("8.8.8.8", $loop);
 
    yield [
        resolve("silverstripe.org", $resolver),
        resolve("wordpress.org", $resolver),
        resolve("wardrobecms.com", $resolver),
        resolve("pagekit.com", $resolver),
    ];
}
 
Recoil::run("run");
登入後複製

通过将它集成到 ReactPHP 来完成一些令人称奇的工作。每次运行 resolve() 时,RecoilPHP 会管理由 $resoler->resolve() 返回的 promise 对象,然后将数据发送给生成器。此时我们就像在编写同步代码一样。与我们在其他一步模型中使用回调代码不同,这里只有一个指令列表。

RecoilPHP 知道它应该管理一个有执行 run() 函数时返回的 yield 数组。RoceilPHP 还支持基于协程的数据库(PDO)和日志库。

IcicleIO

IcicleIO 为了一全新的方案实现 ReactPHP 一样的目标,而仅仅使用协程功能。相比 ReactPHP 它仅包含极少的组件。但是,核心的异步流、服务器、Socket、事件循环特性一个不落。

让我们看一个 socket 服务器示例:

use Icicle\Coroutine\Coroutine;
use Icicle\Loop\Loop;
use Icicle\Socket\Client\ClientInterface;
use Icicle\Socket\Server\ServerInterface;
use Icicle\Socket\Server\ServerFactory;
 
$factory = new ServerFactory();
 
$coroutine = Coroutine::call(function (ServerInterface $server) {
    $clients = new SplObjectStorage();
     
    $handler = Coroutine::async(
        function (ClientInterface $client) use (&$clients) {
            $clients->attach($client);
             
            $host = $client->getRemoteAddress();
            $port = $client->getRemotePort();
             
            $name = $host . ":" . $port;
             
            try {
                foreach ($clients as $stream) {
                    if ($client !== $stream) {
                        $stream->write($name . "connected.\n");
                    }
                }
 
                yield $client->write("Welcome " . $name . "!\n");
                 
                while ($client->isReadable()) {
                    $data = trim(yield $client->read());
                     
                    if ("/exit" === $data) {
                        yield $client->end("Goodbye!\n");
                    } else {
                        $message = $name . ":" . $data . "\n";
                        
                        foreach ($clients as $stream) {
                            if ($client !== $stream) {
                                $stream->write($message);
                            }
                        }
                    }
                }
            } catch (Exception $exception) {
                $client->close($exception);
            } finally {
                $clients->detach($client);
                foreach ($clients as $stream) {
                    $stream->write($name . "disconnected.\n");
                }
            }
        }
    );
     
    while ($server->isOpen()) {
        $handler(yield $server->accept());
    }
}, $factory->create("127.0.0.1", 6000));
 
Loop::run();
登入後複製

据我所知,这段代码所做的事情如下:

  1. 在 127.0.0.1 和 6000 端口创建一个服务器实例,然后将其传入外部生成器.

  2. 外部生成器运行,同时服务器等待新连接。当服务器接收一个连接它将其传入内部生成器。

  3. 内部生成器写入消息到 socket。当 socket 可读时运行。

  4. 每次 socket 向服务器发送消息时,内部生成器检测消息是否是退出标识。如果是,通知其他 socket。否则,其它 socket 发送这个相同的消息。

打开命令行终端输入 nc localhost 6000 查看执行结果!

该示例使用 SplObjectStorage 跟踪 socket 连接。这样我们就可以向所有 socket 发送消息。

PHP的多工協程處理的分析

这个话题可以包含很多内容。希望您能看到生成器是如何创建的,以及它们如何帮助编写迭代程序和异步代码。

如果你有问题,可以随时问我。

相关推荐:

浅谈一下PHP生成器的使用方法

以上是PHP的多工協程處理的分析的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板