Getting Consumer Group Offsets with Golang Kafka 10
Traditionally, external libraries were employed to manage consumer group capabilities in Golang with Kafka. However, Kafka 10 now natively provides such functionality. This raises the question: how can we retrieve the current message offset processed by a consumer group using the Golang Kafka library (sarama)?
Previously, kazoo-go was utilized to retrieve group message offsets from Zookeeper. With the introduction of sarama-cluster, an alternative approach is required.
Solution
The following code snippet demonstrates how to obtain the consumer group offset:
<code class="go">package main import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we don't want to change consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
The above is the detailed content of How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?. For more information, please follow other related articles on the PHP Chinese website!