Heim > Backend-Entwicklung > PHP-Tutorial > osx - 请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?

osx - 请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?

WBOY
Freigeben: 2016-06-06 20:24:29
Original
1990 Leute haben es durchsucht

最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:

<code> Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?
</code>
Nach dem Login kopieren
Nach dem Login kopieren

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

<code>    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }</code>
Nach dem Login kopieren
Nach dem Login kopieren

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

回复内容:

最近在研究用php连kafka.
用的是githup上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:

<code> Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?
</code>
Nach dem Login kopieren
Nach dem Login kopieren

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

<code>    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }</code>
Nach dem Login kopieren
Nach dem Login kopieren

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

consumer应该只能通过循环来判断broker是否有变化,然后更新说。

Verwandte Etiketten:
php
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage