首頁 > 後端開發 > php教程 > Kafka的介紹以及基於PHP的kafka的安裝和測試

Kafka的介紹以及基於PHP的kafka的安裝和測試

不言
發布: 2023-04-03 12:38:01
原創
3690 人瀏覽過

這篇文章給大家分享的內容是關於Kafka的介紹以及基於PHP的kafka的安裝和測試,內容很詳細,有需要的朋友可以參考一下,希望可以幫助到你們。

簡介

Kafka 是一種高吞吐量的分散式發布訂閱訊息系統

kafka角色必知

producer:生產者。
consumer:消費者。
topic: 訊息以topic為類別記錄,Kafka將訊息種子(Feed)分類, 每一類的訊息稱之為一個主題(Topic)。
broker:以叢集的方式運作,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic), 並從Broker拉資料,從而消費這些已發布的消息。

經典模型

1. 一個主題下的分區不能小於消費者數量,即一個主題下消費者數量不能大於分區屬,大了就浪費了空閒了
2 . 一個主題下的一個分區可以同時被不同消費組其中某一個消費者消費
3. 一個主題下的一個分區只能被同一個消費組的一個消費者消費

Kafka的介紹以及基於PHP的kafka的安裝和測試

#常用參數說明

request.required.acks

Kafka producer的ack有3中機制,初始化producer時的producerconfig可以透過設定request.required.acks不同的值來實作。

0:這表示生產者producer不等待來自broker同步完成的確認繼續發送下一條(批次)訊息。此選項提供最低的延遲但最弱的耐久性保證(當伺服器發生故障時某些資料會遺失,如leader已死,但producer並不知情,發出去的信息broker就收不到)。

1:這表示producer在leader已成功收到的資料並確認後發送下一條message。此選項提供了更好的耐久性為客戶等待伺服器確認請求成功(被寫入死亡leader但尚未複製將失去了唯一的訊息)。

-1:這表示producer在follower副本確認接收到資料後才算一次發送完成。
此選項提供最好的耐久性,我們保證沒有資訊將會遺失,只要至少一個同步副本保持存活。

三種機制,效能依序遞減 (producer吞吐量降低),資料健壯性則依序遞增。

auto.offset.reset

1. earliest:自動將偏移重置為最早的偏移量
2. latest:自動將偏移量重設為最新的偏移量(預設)
3. none:如果consumer group沒有發現先前的偏移量,則向consumer拋出例外狀況。
4. 其他的參數:向consumer拋出異常(無效參數)

kafka安裝和簡單測試

安裝kafka(不需要安裝,解包即可)

# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0
登入後複製

啟動kafka server

# 需先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
登入後複製

啟動kafka客戶端測試

# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个生产者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?
登入後複製

安裝kafka的php擴充

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
extension=rdkafka.so
登入後複製

php程式碼實踐

生產者

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}


$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);
}
登入後複製

運行生產者

php producer.php
# output

int(20)
int(20)
int(20)
int(20)
int(0)

# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19
登入後複製

消費者

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

//设置消费组
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的时间内自动提交确认、建议不要启动
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    //参数1表示消费分区,这里是分区0
    //参数2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
登入後複製

查看伺服器元資料(topic/partition/broker)

<?php $conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";


foreach ($topics as $topic) {

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") {
        continue ;
    }

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) {
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) {
//            var_dump($f);
//        }
//        die();
        $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
        echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
    }
}
登入後複製

相關推薦:

kafka安裝及Kafka-PHP擴充的使用,kafkakafka-php擴充

kafka組裝及Kafka-PHP擴充的使用

#

以上是Kafka的介紹以及基於PHP的kafka的安裝和測試的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板