Bagaimana untuk mendapatkan offset terakhir dalam topik dari Kafka menggunakan perpustakaan php?
P粉763662390
P粉763662390 2023-09-10 11:00:45
0
1
620

Saya sedang menulis pengguna Kafka untuk projek API menggunakan perpustakaan php-rdkafka. Saya perlu mencari offset terakhir dalam topik dan mendapatkan nilai daripadanya untuk pemprosesan selanjutnya. Sebagai contoh, offset terakhir dalam topik = 5, maka saya perlu mendapatkan offset 5 dan menghantarnya melalui API sehingga offset baharu ditambahkan. Apa yang saya cuba jalankan:

$conf = new RdKafka\Conf();

$settings = [
    'socket.keepalive.enable'  => true,
    'log_level'                => LOG_WARNING,
    'enable.auto.offset.store' => 'true',
    'auto.offset.reset'        => 'earliest',
    'enable.partition.eof'     => 'false',
    'enable.auto.commit'       => 'false',
    'max.poll.interval.ms'     => 300000,
    'session.timeout.ms'       => 45000,
    'group.id'                 => 'test-group',
    'group.instance.id'        => uniqid('', true),
    'metadata.broker.list'     => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
    $conf->set($key, $value);
}

$topicName = 'userstatistics_12345';
$partition = 0;

$topicPartition = new RdKafka\TopicPartition($topicName, $partition);

$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);

var_dump($topicPartitionsWithOffsets);

Tetapi ini mengembalikan hasil yang pelik dengan offset negatif

array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }

Walaupun sebenarnya offset terakhir pada masa ini ialah 59. Idea saya adalah untuk mendapatkan offset terakhir dan kemudian mendapatkan nilai menggunakan:

$consumer->assign([
    new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);

Saya juga tidak mahu menggunakan gelung while(true) untuk melakukan kerja skrip dengan cepat.

Itu sahaja. terima kasih.

P粉763662390
P粉763662390

membalas semua(1)
P粉701491897

Saya menemui jawapannya dan ia sangat berkesan untuk saya:

$conf = new RdKafka\Conf();

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'test-group');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', 'kafka-1:9092');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'latest');

// Emit EOF event when reaching the end of a partition
$conf->set('enable.partition.eof', 'true');

$kafkaConsumer = new RdKafka\KafkaConsumer($conf);
$topicName = 'topic_name';
$partition = 0;


$topicPartition = new RdKafka\TopicPartition($topicName, 0);
$timeoutMs = 100000;

$low = null;
$high = null;

$wm = $kafkaConsumer->queryWatermarkOffsets($topicName,$partition,$low,$high,$timeoutMs);

$offset = $high - 1;

$kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]);

$message = $kafkaConsumer->consume(1000);

if ($message !== null) {
    // Process the message
    $payload = $message->payload;
    echo "Message at offset $offset: $payload\n";
}

$kafkaConsumer->close();
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan