如何在Golang Kafka 10 檢索消費者群組偏移量
之前,使用kazoo-go 等外部庫來擷取消費者群組訊息偏移量儲存在Zookeeper 中。然而,隨著 Golang Kafka 函式庫(sarama)在版本 10 中引入消費者群組功能,可以直接存取這些偏移量。
使用sarama-cluster
使用sarama-cluster 庫取得消費者群組偏移量:
<code class="go">import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
此程式碼建立一個新的ConsumerGroup 實例,並透過使用空處理程序(consumeClaim) 消費來自指定主題的訊息來檢索目前偏移量。然後,宣告的初始偏移量將記錄在 gcInfo 結構中。
以上是如何使用「sarama-cluster」檢索 Golang Kafka 10 中的消費者群組偏移量?的詳細內容。更多資訊請關注PHP中文網其他相關文章!