Der Inhalt dieses Artikels befasst sich mit der Einführung von Kafka und der Installation und dem Testen von Kafka auf Basis von PHP. Ich hoffe, dass er Ihnen weiterhelfen kann.
Einführung
Kafka ist ein verteiltes Publish-Subscribe-Messagingsystem mit hohem Durchsatz
Kafka-Rollen, die Sie kennen müssen
Produzent: Produzent .
Verbraucher: Verbraucher.
Thema: Nachrichten werden in der Themenkategorie erfasst. Kafka klassifiziert Nachrichten-Seeds (Feeds) und jeder Nachrichtentyp wird als Thema bezeichnet.
Broker: Läuft in einem Cluster und kann aus einem oder mehreren Diensten bestehen, die als Broker bezeichnet werden. Verbraucher können ein oder mehrere Themen abonnieren und Daten vom Broker abrufen, um diese veröffentlichten Nachrichten zu nutzen.
Klassisches Modell
1. Die Partition unter einem Thema kann nicht kleiner sein als die Anzahl der Verbraucher, das heißt, die Anzahl der Verbraucher unter einem Thema kann nicht größer sein als die Partition größer, es wird Leerlaufzeit verschwenden
2 Eine Partition unter einem Thema kann von einem Verbraucher in verschiedenen Verbrauchergruppen gleichzeitig genutzt werden
3 Eine Partition unter einem Thema kann nur von einem Verbraucher in der verwendet werden gleiche Verbrauchergruppe

Gemeinsame Parameterbeschreibung
request.required.acks
Die Bestätigung des Kafka-Produzenten verfügt über 3 Mechanismen . Die Producerconfig kann beim Initialisieren des Producers durch verschiedene Werte für request.required.acks konfiguriert werden.
0: Dies bedeutet, dass der Produzent nicht auf die Bestätigung des Brokers wartet, dass die Synchronisierung abgeschlossen ist, um mit dem Senden der nächsten (Batch-)Nachricht fortzufahren. Diese Option bietet die niedrigste Latenz, aber die schwächste Haltbarkeitsgarantie (einige Daten gehen verloren, wenn der Server ausfällt, z. B. wenn der Anführer tot ist, der Produzent dies jedoch nicht weiß und der Broker die gesendeten Informationen nicht empfangen kann).
1: Dies bedeutet, dass der Produzent die nächste Nachricht sendet, nachdem der Leiter die Daten erfolgreich empfangen und bestätigt hat. Diese Option bietet eine bessere Haltbarkeit, da der Client darauf wartet, dass der Server bestätigt, dass die Anfrage erfolgreich war (die einzige Nachricht, die an den toten Leader geschrieben, aber noch nicht repliziert wurde, geht verloren).
-1: Dies bedeutet, dass der Produzent eine Übertragung erst dann abschließt, wenn die Follower-Kopie bestätigt, dass sie die Daten erhalten hat.
Diese Option bietet die beste Haltbarkeit. Wir garantieren, dass keine Informationen verloren gehen, solange mindestens ein synchronisiertes Replikat aktiv bleibt.
Drei Mechanismen: Die Leistung nimmt in der Reihenfolge ab (der Herstellerdurchsatz nimmt ab) und die Datenrobustheit nimmt in der Reihenfolge zu.
auto.offset.reset
1. Der Offset wird automatisch auf den frühesten Offset zurückgesetzt.
Der Offset wird automatisch auf den neuesten Offset zurückgesetzt (Standard). 3. keine: Wenn die Verbrauchergruppe den vorherigen Offset nicht findet, wird eine Ausnahme an den Verbraucher ausgelöst.
4. Andere Parameter: Eine Ausnahme (ungültiger Parameter) an den Verbraucher auslösen
Kafka-Installation und einfacher Test
Kafka installieren (keine Installation erforderlich, einfach entpacken)
1 2 3 4 | # 官方下载地址:http:
# wget https:
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0
|
Nach dem Login kopieren
Kafka-Server starten
1 2 3 | # 需先启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
|
Nach dem Login kopieren
Kafka-Client-Test starten
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test" .
# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?
# 启动一个生产者(等待消息)
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?
|
Nach dem Login kopieren
PHP-Erweiterung von Kafka installieren
1 2 3 4 5 6 7 8 9 | git clone https:
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install
vim [php]/php.ini
extension=rdkafka.so
|
Nach dem Login kopieren
PHP-Code-Übung
Produzent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | <?php $conf = new RdKafka\Conf();
$conf ->setDrMsgCb( function ( $kafka , $message ) {
file_put_contents ( "./dr_cb.log" , var_export( $message , true).PHP_EOL, FILE_APPEND);
});
$conf ->setErrorCb( function ( $kafka , $err , $reason ) {
file_put_contents ( "./err_cb.log" , sprintf( "Kafka error: %s (reason: %s)" , rd_kafka_err2str( $err ), $reason ).PHP_EOL, FILE_APPEND);
});
$rk = new RdKafka\Producer( $conf );
$rk ->setLogLevel(LOG_DEBUG);
$rk ->addBrokers( "127.0.0.1" );
$cf = new RdKafka\TopicConf();
$cf ->set( 'request.required.acks' , 0);
$topic = $rk ->newTopic( "test" , $cf );
$option = 'qkl' ;
for ( $i = 0; $i produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i" , $option );
}
$len = $rk ->getOutQLen();
while ( $len > 0) {
$len = $rk ->getOutQLen();
var_dump( $len );
$rk ->poll(50);
}
|
Nach dem Login kopieren
Produzent ausführen
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | php producer.php
# output
int(20)
int(20)
int(20)
int(20)
int(0)
# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19
|
Nach dem Login kopieren
Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | <?php $conf = new RdKafka\Conf();
$conf ->setDrMsgCb( function ( $kafka , $message ) {
file_put_contents ( "./c_dr_cb.log" , var_export( $message , true), FILE_APPEND);
});
$conf ->setErrorCb( function ( $kafka , $err , $reason ) {
file_put_contents ( "./err_cb.log" , sprintf( "Kafka error: %s (reason: %s)" , rd_kafka_err2str( $err ), $reason ).PHP_EOL, FILE_APPEND);
});
$conf ->set( 'group.id' , 'myConsumerGroup' );
$rk = new RdKafka\Consumer( $conf );
$rk ->addBrokers( "127.0.0.1" );
$topicConf = new RdKafka\TopicConf();
$topicConf ->set( 'request.required.acks' , 1);
$topicConf ->set( 'auto.commit.enable' , 0);
$topicConf ->set( 'auto.commit.interval.ms' , 100);
$topicConf ->set( 'offset.store.method' , 'broker' );
$topic = $rk ->newTopic( "test" , $topicConf );
$topic ->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$message = $topic ->consume(0, 12 * 1000);
switch ( $message ->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump( $message );
break ;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n" ;
break ;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n" ;
break ;
default :
throw new \Exception( $message ->errstr(), $message ->err);
break ;
}
}
|
Nach dem Login kopieren
Servermetadaten anzeigen (Thema/Partition/Broker)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | <?php $conf = new RdKafka\Conf();
$conf ->setDrMsgCb( function ( $kafka , $message ) {
file_put_contents ( "./xx.log" , var_export( $message , true), FILE_APPEND);
});
$conf ->setErrorCb( function ( $kafka , $err , $reason ) {
printf( "Kafka error: %s (reason: %s)\n" , rd_kafka_err2str( $err ), $reason );
});
$conf ->set( 'group.id' , 'myConsumerGroup' );
$rk = new RdKafka\Consumer( $conf );
$rk ->addBrokers( "127.0.0.1" );
$allInfo = $rk ->metadata(true, NULL, 60e3);
$topics = $allInfo ->getTopics();
echo rd_kafka_offset_tail(100);
echo "--" ;
echo count ( $topics );
echo "--" ;
foreach ( $topics as $topic ) {
$topicName = $topic ->getTopic();
if ( $topicName == "__consumer_offsets" ) {
continue ;
}
$partitions = $topic ->getPartitions();
foreach ( $partitions as $partition ) {
$topPartition = new RdKafka\TopicPartition( $topicName , $partition ->getId());
echo "当前的话题:" . ( $topPartition ->getTopic()) . " - " . $partition ->getId() . " - " ;
echo "offset:" . ( $topPartition ->getOffset()) . PHP_EOL;
}
}
|
Nach dem Login kopieren
Verwandte Empfehlungen:
kafka Installation und Verwendung der Kafka-PHP-Erweiterung, Kafkakafka-PHP-Erweiterung
Kafka-Assembly und Verwendung der Kafka-PHP-Erweiterung
Das obige ist der detaillierte Inhalt vonEinführung in Kafka und Installation und Test von Kafka auf Basis von PHP. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!