php+kafkaを素早くインストールする方法を教えます
Jun 30, 2021 am 09:39 AMPHP について多くのことを学びました。今日は、PHP kafka を簡単にインストールする方法を説明します。方法がわからない場合は、この記事に従って学習を続けてください
1. Java をインストールし、関連する環境変数を設定します
> wget https://download.java.net/openjdk/jdk7u75/ri/openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz > tar zxvf openjdk-7u75-b13-linux-x64-18_dec_2014.tar.gz > mv java-se-7u75-ri/ /opt/ > export JAVA_HOME=/opt/java-se-7u75-ri > export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin > export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar #验证安装 > java -verison openjdk version "1.7.0_75" OpenJDK Runtime Environment (build 1.7.0_75-b13) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode)
2. kafka をインストールします。ここでは例としてバージョン 0.10.2 を取り上げます
> wget http://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz > tar zxvf kafka_2.11-0.10.2.0.tgz > mv kafka_2.11-0.10.2.0/ /opt/kafka > cd /opt/kafka #启动zookeeper > bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... #启动kafka > bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ... #尝试创建一个topic > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test > bin/kafka-topics.sh --list --zookeeper localhost:2181 test #生产者写入消息 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message #消费者消费消息 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
3. kafka
> wget https://github.com/edenhill/librdkafka/archive/v1.3.0.tar.gz > tar zxvf v1.3.0.tar.gz > cd librdkafka-1.3.0 > ./configure > make && make install
の C 操作ライブラリをインストールします。php の kafka 拡張機能をインストールします。ここでは php-rdkafka 拡張機能 https://github.com/arnaud-lb/php- を選択します。 rdkafka
> wget https://github.com/arnaud-lb/php-rdkafka/archive/4.0.2.tar.gz > tar 4.0.2.tar.gz > cd php-rdkafka-4.0.2 > /opt/php7/bin/phpize > ./configure --with-php-config=/opt/php7/bin/php-config > make && make install
php.ini を変更し、extension=rdkafka.so
5. rdkafka
> composer create-project kwn/php-rdkafka-stubs php-rdkafka-stubs
の IDE コード プロンプト ファイルをインストールします。プロジェクトの外部 [ライブラリ] を右クリックし、[PHP インクルード パスの構成] を選択して、今すぐパスを追加します。
プロデューサー:
<?php $conf = new RdKafka\Conf(); $conf->set('log_level', LOG_ERR); $conf->set('debug', 'admin'); $conf->set('metadata.broker.list', 'localhost:9092'); //If you need to produce exactly once and want to keep the original produce order, uncomment the line below //$conf->set('enable.idempotence', 'true'); $producer = new RdKafka\Producer($conf); $topic = $producer->newTopic("test2"); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); $producer->poll(0); } for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) { $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { break; } } if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); }
低レベルのコンシューマ:
<?php $conf = new RdKafka\Conf(); $conf->set('log_level', LOG_ERR); $conf->set('debug', 'admin'); // Set the group id. This is required when storing offsets on the broker $conf->set('group.id', 'myConsumerGroup'); $rk = new RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); // Set the offset store method to 'file' $topicConf->set('offset.store.method', 'broker'); // Alternatively, set the offset store method to 'none' // $topicConf->set('offset.store.method', 'none'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); $topic = $rk->newTopic("test2", $topicConf); // Start consuming partition 0 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: print_r($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }
高レベルのコンシューマ:
<?php $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'myConsumerGroup'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', '127.0.0.1'); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $conf->set('auto.offset.reset', 'smallest'); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'test2' $consumer->subscribe(['test2']); echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n"; while (true) { $message = $consumer->consume(1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: print_r($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } sleep(2); }
PHP ビデオ チュートリアル 」 》
以上がphp+kafkaを素早くインストールする方法を教えますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

人気の記事

人気の記事

ホットな記事タグ

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック











Ubuntu および Debian 用の PHP 8.4 インストールおよびアップグレード ガイド

PHP 開発用に Visual Studio Code (VS Code) をセットアップする方法
