Obtention de compensations de groupes de consommateurs dans Golang Kafka 10
Avec l'introduction de la prise en charge des groupes de consommateurs dans la bibliothèque Kafka de Golang (Sarama) dans Kafka 10, les développeurs ont désormais accès aux fonctionnalités liées aux groupes de consommateurs sans recourir à des bibliothèques externes. Une tâche courante consiste à récupérer le décalage du message en cours de traitement par un groupe de consommateurs. Auparavant, cela nécessitait l'utilisation de solutions basées sur Zookeeper, telles que Kazoo-go. Désormais, avec Sarama-cluster, cela peut être réalisé grâce au code suivant :
<code class="go">package main import ( "context" "log" "strings" "github.com/Shopify/sarama" ) func main() { groupName := "testgrp" topic := "topic_name" offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic) if err != nil { log.Fatal(err) } log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset) } type gcInfo struct { offset int64 } func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error { return nil } func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil } func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) { config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config) if err != nil { return 0, err } info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err } return info.offset, nil }</code>
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!