首頁 > 後端開發 > Golang > 主體

如何使用 Sarama 檢索 Golang Kafka 10 中消費者群組的目前訊息偏移量?

Susan Sarandon
發布: 2024-10-25 11:29:30
原創
376 人瀏覽過

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

使用Golang Kafka 10 取得消費者群組偏移

傳統上,使用外部函式庫來使用Kafka 管理Golang 中的消費者群組功能。然而,Kafka 10 現在原生提供了這樣的功能。這就提出了一個問題:我們如何使用 Golang Kafka 函式庫(sarama)來檢索消費者群組處理的當前訊息偏移量?

之前,kazoo-go 用於從 Zookeeper 檢索群組訊息偏移量。隨著sarama-cluster的引入,需要另一種方法。

解決方案

以下程式碼片段示範如何取得消費者群組偏移量:

<code class="go">package main

import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false  // we don't want to change consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>
登入後複製

以上是如何使用 Sarama 檢索 Golang Kafka 10 中消費者群組的目前訊息偏移量?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板