Abrufen von Verbrauchergruppen-Offsets in Go mit Kafka 10
Mit der Veröffentlichung von Kafka 10 stellt die Go-Kafka-Bibliothek (sarama) jetzt Verbraucher bereit Gruppenfunktionen, ohne auf externe Bibliotheken angewiesen zu sein. Dies wirft die Frage auf, wie der aktuelle Nachrichten-Offset abgerufen werden kann, der von einer Verbrauchergruppe verarbeitet wird.
Lösung
Um den Verbrauchergruppen-Offset zu erhalten, führen Sie die folgenden Schritte aus:
Implementieren Sie eine Verbrauchergruppen-Infostruktur:
<code class="go">type gcInfo struct { offset int64 }</code>
Erstellen Sie einen Verbrauchergruppen-Info-Handler:
<code class="go">func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { g.offset = claim.InitialOffset() return nil }</code>
Konfigurieren und erstellen Sie die Verbrauchergruppe:
<code class="go">config := sarama.NewConfig() config.Consumer.Offsets.AutoCommit.Enable = false client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)</code>
Verbrauchen Sie a Nachricht innerhalb der Gruppe:
<code class="go">info := gcInfo{} if err := client.Consume(ctx, []string{topic}, &info); err != nil { return 0, err }</code>
Versatz abrufen:
<code class="go">return info.offset, nil</code>
Damit Implementierung können Sie jederzeit den Consumer-Gruppen-Offset für eine bestimmte Partition und ein bestimmtes Thema abrufen.
Das obige ist der detaillierte Inhalt vonWie kann ich mit Kafka 10 Verbrauchergruppen-Offsets in Go abrufen?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!