Comment obtenir le dernier décalage dans un sujet de Kafka en utilisant la bibliothèque php ?
P粉763662390
P粉763662390 2023-09-10 11:00:45
0
1
632

J'écris un consommateur Kafka pour un projet API utilisant la bibliothèque php-rdkafka. Je dois trouver le dernier décalage dans le sujet et en tirer la valeur pour un traitement ultérieur. Par exemple, dernier décalage dans le sujet = 5, je dois alors obtenir le décalage 5 et l'envoyer via l'API jusqu'à ce qu'un nouveau décalage soit ajouté. Ce que j'essaie d'exécuter :

$conf = new RdKafka\Conf();

$settings = [
    'socket.keepalive.enable'  => true,
    'log_level'                => LOG_WARNING,
    'enable.auto.offset.store' => 'true',
    'auto.offset.reset'        => 'earliest',
    'enable.partition.eof'     => 'false',
    'enable.auto.commit'       => 'false',
    'max.poll.interval.ms'     => 300000,
    'session.timeout.ms'       => 45000,
    'group.id'                 => 'test-group',
    'group.instance.id'        => uniqid('', true),
    'metadata.broker.list'     => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
    $conf->set($key, $value);
}

$topicName = 'userstatistics_12345';
$partition = 0;

$topicPartition = new RdKafka\TopicPartition($topicName, $partition);

$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);

var_dump($topicPartitionsWithOffsets);

Mais cela renvoie des résultats étranges avec des compensations négatives

array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }

Bien qu'en réalité le dernier décalage soit actuellement de 59. Mon idée est d'obtenir le dernier décalage puis d'obtenir la valeur en utilisant :

$consumer->assign([
    new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);

Je ne veux pas non plus utiliser une boucle while (true) pour effectuer rapidement un travail de script.

C'est tout. Merci.

P粉763662390
P粉763662390

répondre à tous(1)
P粉701491897

J'ai trouvé la réponse et cela fonctionne très bien pour moi :

$conf = new RdKafka\Conf();

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'test-group');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', 'kafka-1:9092');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'latest');

// Emit EOF event when reaching the end of a partition
$conf->set('enable.partition.eof', 'true');

$kafkaConsumer = new RdKafka\KafkaConsumer($conf);
$topicName = 'topic_name';
$partition = 0;


$topicPartition = new RdKafka\TopicPartition($topicName, 0);
$timeoutMs = 100000;

$low = null;
$high = null;

$wm = $kafkaConsumer->queryWatermarkOffsets($topicName,$partition,$low,$high,$timeoutMs);

$offset = $high - 1;

$kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]);

$message = $kafkaConsumer->consume(1000);

if ($message !== null) {
    // Process the message
    $payload = $message->payload;
    echo "Message at offset $offset: $payload\n";
}

$kafkaConsumer->close();
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal