Heim > php教程 > php手册 > kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Freigeben: 2016-06-13 08:46:05
Original
2388 Leute haben es durchsucht

kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。

实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。

一. 首先确认下jdk有没有安装

使用命令

[root@localhost ~]# java -<span>version
java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span>
Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02)
Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)
Nach dem Login kopieren

如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。

然后打开/etc/profile文件

[root@localhost ~]# vim /etc/profile
Nach dem Login kopieren

把下面这段代码写到文件里

export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73
export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar
export PATH</span>=$JAVA_HOME/bin:$PATH
Nach dem Login kopieren

最后

[root@localhost ~]# source /etc/profile
Nach dem Login kopieren

这时jdk就生效了,可以使用 java -version验证下。

二. 接下来安装Kafka

1. 下载Kafka

到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。

2. 下载完解压到你喜欢的目录

我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 运行默认的Kafka

启动Zookeeper server

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &
Nach dem Login kopieren

启动Kafka server

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &
Nach dem Login kopieren

运行生产者producer

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test
Nach dem Login kopieren

运行消费者consumer

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning
Nach dem Login kopieren

这样,在producer那边输入内容,consumer马上就能接收到。

4. 当有跨机的producer或consumer连接时

需要配置config/server.properties的host.name,要不然跨机的连不上。

三. Kafka-PHP扩展

使用了一圈,就https://github.com/nmred/kafka-php可以用。

我是使用composer安装的,以下是示例:

producer.php

<?<span>php
</span><span>require</span> 'vendor/autoload.php'<span>;

</span><span>while</span> (1<span>) {
    </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>);
    </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>);
    </span><span>//</span><span> get available partitions</span>
    <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>);
    </span><span>var_dump</span>(<span>$partitions</span><span>);
    </span><span>//</span><span> send message</span>
    <span>$produce</span>->setRequireAck(-1<span>);
    </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>));
   
    </span><span>sleep</span>(3<span>);
}</span>
Nach dem Login kopieren

consumer.php

<span>require</span> 'vendor/autoload.php'<span>;

</span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>);
</span><span>$group</span> = 'topic_name'<span>;
</span><span>$consumer</span>->setGroup(<span>$group</span><span>);
</span><span>$consumer</span>->setFromOffset(<span>true</span><span>);
</span><span>$consumer</span>->setTopic('topic_name', 0<span>);
</span><span>$consumer</span>->setMaxBytes(102400<span>);
</span><span>$result</span> = <span>$consumer</span>-><span>fetch();
</span><span>print_r</span>(<span>$result</span><span>);
</span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) {
    </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) {
    </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset());
        </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) {
            </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>);
        }
    </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset());
    }
}</span>
Nach dem Login kopieren

 

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Empfehlungen
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage