Wenn Sie die Grundlagen von Kafka kennen möchten, beispielsweise seine wichtigsten Funktionen, Komponenten und Vorteile, habe ich hier einen Artikel dazu. Bitte überprüfen Sie es und befolgen Sie die Schritte, bis Sie die Kafka-Installation mit Docker abgeschlossen haben, um mit den folgenden Abschnitten fortzufahren.
Ähnlich dem Beispiel im Artikel über die Verbindung von Kafka mit NodeJS enthält dieser Quellcode auch zwei Teile: Initialisieren eines Produzenten um Nachrichten an Kafka zu senden und einen Verbraucher zu verwenden, um Nachrichten von einem Thema.
Ich werde den Code zum besseren Verständnis in kleinere Teile zerlegen. Definieren wir zunächst die Variablenwerte.
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )
github.com/confluentinc/confluent-kafka-go/kafka verwendet, um eine Verbindung zu Kafka herzustellen.
- DerBroker ist die Hostadresse; Wenn Sie ZooKeeper verwenden, ersetzen Sie die Hostadresse entsprechend.
- DieGruppen-ID und das Thema können nach Bedarf geändert werden.
Als nächstes wird der Produzent initialisiert.
func startProducer() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { panic(err) } go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() for _, word := range []string{"message 1", "message 2", "message 3"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } }
{"Nachricht 1", "Nachricht 2", "Nachricht 3"🎜> an ein Thema zu senden und verwendet ein go-routine um Ereignisse mit for e := range p.Events() zu durchlaufen und das Lieferergebnis auszudrucken, unabhängig davon, ob es ein ist Erfolg oder Misserfolg. Als nächstes erstellen Sie einen
Verbraucher, um das Thema zu zu abonnieren und Nachrichten zu erhalten.
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }
Produzenten und den Verbraucher zur Verwendung zu erstellen. In einem realen Szenario erfolgt die Bereitstellung des Produzenten und Konsumenten typischerweise auf zwei verschiedenen Servern in einem Microservices-System.
func main() { startProducer() startConsumer() }
Viel Spaß beim Codieren!
Wenn Sie diesen Inhalt hilfreich fanden, besuchen Sie bitte den Originalartikel auf meinem Blog, um den Autor zu unterstützen und weitere interessante Inhalte zu entdecken.
Das obige ist der detaillierte Inhalt vonVerbinden Sie Kafka mit Golang. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!