kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展
Jun 13, 2016 am 08:46 AMkafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展
话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。
实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。
一. 首先确认下jdk有没有安装
使用命令
[root@localhost ~]# java -<span>version java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span> Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02) Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。
然后打开/etc/profile文件
[root@localhost ~]# vim /etc/profile
把下面这段代码写到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73 export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar export PATH</span>=$JAVA_HOME/bin:$PATH
最后
[root@localhost ~]# source /etc/profile
这时jdk就生效了,可以使用 java -version验证下。
二. 接下来安装Kafka
1. 下载Kafka
到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下载完解压到你喜欢的目录
我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 运行默认的Kafka
启动Zookeeper server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &
启动Kafka server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &
运行生产者producer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test
运行消费者consumer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning
这样,在producer那边输入内容,consumer马上就能接收到。
4. 当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
三. Kafka-PHP扩展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安装的,以下是示例:
producer.php
<?<span>php </span><span>require</span> 'vendor/autoload.php'<span>; </span><span>while</span> (1<span>) { </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>); </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>); </span><span>//</span><span> get available partitions</span> <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>); </span><span>var_dump</span>(<span>$partitions</span><span>); </span><span>//</span><span> send message</span> <span>$produce</span>->setRequireAck(-1<span>); </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>)); </span><span>sleep</span>(3<span>); }</span>
consumer.php
<span>require</span> 'vendor/autoload.php'<span>; </span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>); </span><span>$group</span> = 'topic_name'<span>; </span><span>$consumer</span>->setGroup(<span>$group</span><span>); </span><span>$consumer</span>->setFromOffset(<span>true</span><span>); </span><span>$consumer</span>->setTopic('topic_name', 0<span>); </span><span>$consumer</span>->setMaxBytes(102400<span>); </span><span>$result</span> = <span>$consumer</span>-><span>fetch(); </span><span>print_r</span>(<span>$result</span><span>); </span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) { </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) { </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset()); </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) { </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>); } </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset()); } }</span>

Hot Article

Hot tools Tags

Hot Article

Hot Article Tags

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

What software is crystaldiskmark? -How to use crystaldiskmark?

What should I do if Baidu Netdisk is downloaded successfully but cannot be installed?

How to install Android apps on Linux?

How to download foobar2000? -How to use foobar2000

BTCC tutorial: How to bind and use MetaMask wallet on BTCC exchange?

How to Install and Run the Ubuntu Notes App on Ubuntu 24.04
