使用 Kafka 10 检索 Go 中的消费者组偏移量
随着 Kafka 10 的发布,Go Kafka 库 (sarama) 现在提供消费者无需依赖外部库即可对功能进行分组。这就提出了如何检索消费者组正在处理的当前消息偏移量的问题。
解决方案
要获取消费者组偏移量,请按照以下步骤操作:
实现消费者组信息结构:
<code class="go">type gcInfo struct { offset int64 }</code>
创建消费者组信息处理程序:
<code class="go">func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil }</code>
配置并创建消费者组:
<code class="go">config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)</code>
消费群组内消息:
<code class="go">info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err }</code>
检索偏移量:
<code class="go">return info.offset, nil</code>
以此实现后,您可以在任何给定时间检索特定分区和主题的消费者组偏移量。
以上是如何使用 Kafka 10 检索 Go 中的消费者组偏移量?的详细内容。更多信息请关注PHP中文网其他相关文章!