Retrieving Consumer Group Offsets in Go with Kafka 10
With the release of Kafka 10, the Go Kafka library (sarama) now provides consumer group capabilities without relying on external libraries. This raises the question of how to retrieve the current message offset being processed by a consumer group.
Solution
To obtain the consumer group offset, follow these steps:
Implement a Consumer Group Info Struct:
<code class="go">type gcInfo struct { offset int64 }</code>
Create a Consumer Group Info Handler:
<code class="go">func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil }</code>
Configure and Create the Consumer Group:
<code class="go">config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)</code>
Consume a Message Within the Group:
<code class="go">info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err }</code>
Retrieve the Offset:
<code class="go">return info.offset, nil</code>
With this implementation, you can retrieve the consumer group offset for a specific partition and topic at any given time.
The above is the detailed content of How to Retrieve Consumer Group Offsets in Go with Kafka 10?. For more information, please follow other related articles on the PHP Chinese website!