Maison > développement back-end > Golang > le corps du texte

Comment récupérer le décalage de message actuel d'un groupe de consommateurs dans Golang Kafka 10 à l'aide de Sarama ?

Susan Sarandon
Libérer: 2024-10-25 11:29:30
original
309 Les gens l'ont consulté

How to Retrieve the Current Message Offset of a Consumer Group in Golang Kafka 10 Using Sarama?

Obtenir des compensations de groupes de consommateurs avec Golang Kafka 10

Traditionnellement, des bibliothèques externes étaient utilisées pour gérer les capacités des groupes de consommateurs dans Golang avec Kafka. Cependant, Kafka 10 propose désormais nativement une telle fonctionnalité. Cela soulève la question : comment pouvons-nous récupérer le décalage de message actuel traité par un groupe de consommateurs à l'aide de la bibliothèque Golang Kafka (sarama) ?

Auparavant, kazoo-go était utilisé pour récupérer les décalages de message de groupe de Zookeeper. Avec l'introduction de sarama-cluster, une approche alternative est nécessaire.

Solution

L'extrait de code suivant montre comment obtenir la compensation du groupe de consommateurs :

<code class="go">package main

import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, err := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false  // we don't want to change consumer group offsets
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}</code>
Copier après la connexion

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal
À propos de nous Clause de non-responsabilité Sitemap
Site Web PHP chinois:Formation PHP en ligne sur le bien-être public,Aidez les apprenants PHP à grandir rapidement!