Home > php教程 > php手册 > body text

kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

WBOY
Release: 2016-06-13 08:46:05
Original
2284 people have browsed it

kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展

话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。

实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。

一. 首先确认下jdk有没有安装

使用命令

[root@localhost ~]# java -<span>version
java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span>
Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02)
Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)
Copy after login

如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。

然后打开/etc/profile文件

[root@localhost ~]# vim /etc/profile
Copy after login

把下面这段代码写到文件里

export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73
export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar
export PATH</span>=$JAVA_HOME/bin:$PATH
Copy after login

最后

[root@localhost ~]# source /etc/profile
Copy after login

这时jdk就生效了,可以使用 java -version验证下。

二. 接下来安装Kafka

1. 下载Kafka

到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。

2. 下载完解压到你喜欢的目录

我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 运行默认的Kafka

启动Zookeeper server

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &
Copy after login

启动Kafka server

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &
Copy after login

运行生产者producer

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test
Copy after login

运行消费者consumer

[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning
Copy after login

这样,在producer那边输入内容,consumer马上就能接收到。

4. 当有跨机的producer或consumer连接时

需要配置config/server.properties的host.name,要不然跨机的连不上。

三. Kafka-PHP扩展

使用了一圈,就https://github.com/nmred/kafka-php可以用。

我是使用composer安装的,以下是示例:

producer.php

<?<span>php
</span><span>require</span> 'vendor/autoload.php'<span>;

</span><span>while</span> (1<span>) {
    </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>);
    </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>);
    </span><span>//</span><span> get available partitions</span>
    <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>);
    </span><span>var_dump</span>(<span>$partitions</span><span>);
    </span><span>//</span><span> send message</span>
    <span>$produce</span>->setRequireAck(-1<span>);
    </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>));
   
    </span><span>sleep</span>(3<span>);
}</span>
Copy after login

consumer.php

<span>require</span> 'vendor/autoload.php'<span>;

</span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>);
</span><span>$group</span> = 'topic_name'<span>;
</span><span>$consumer</span>->setGroup(<span>$group</span><span>);
</span><span>$consumer</span>->setFromOffset(<span>true</span><span>);
</span><span>$consumer</span>->setTopic('topic_name', 0<span>);
</span><span>$consumer</span>->setMaxBytes(102400<span>);
</span><span>$result</span> = <span>$consumer</span>-><span>fetch();
</span><span>print_r</span>(<span>$result</span><span>);
</span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) {
    </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) {
    </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset());
        </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) {
            </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>);
        }
    </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset());
    }
}</span>
Copy after login

 

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Recommendations
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!