Apache Kafka ist eine leistungsstarke verteilte Streaming-Plattform, die zum Aufbau von Echtzeit-Datenpipelines und Streaming-Anwendungen verwendet wird. In diesem Blogbeitrag gehen wir Schritt für Schritt durch die Einrichtung eines Kafka-Produzenten und -Konsumenten mit Golang.
Bevor wir beginnen, stellen Sie sicher, dass Folgendes auf Ihrem Computer installiert ist:
Los (1.16 oder höher)
Docker (zum lokalen Ausführen von Kafka)
Kafka
Um Kafka schnell einzurichten, verwenden wir Docker. Erstellen Sie eine docker-compose.yml-Datei in Ihrem Projektverzeichnis:
yamlCopy codeversion: '3.7' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.13-2.7.0 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper
Führen Sie den folgenden Befehl aus, um Kafka und Zookeeper zu starten:
docker-compose up -d
Initialisieren Sie zunächst ein neues Go-Modul:
go mod init kafka-example
Installieren Sie die Kafka-Go-Bibliothek:
go get github.com/segmentio/kafka-go
Erstellen Sie nun eine Datei „producer.go“ und fügen Sie den folgenden Code hinzu:
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" "log" "time" ) func main() { writer := kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "example-topic", Balancer: &kafka.LeastBytes{}, } defer writer.Close() for i := 0; i < 10; i++ { msg := kafka.Message{ Key: []byte(fmt.Sprintf("Key-%d", i)), Value: []byte(fmt.Sprintf("Hello Kafka %d", i)), } err := writer.WriteMessages(context.Background(), msg) if err != nil { log.Fatal("could not write message " + err.Error()) } time.Sleep(1 * time.Second) fmt.Printf("Produced message: %s\n", msg.Value) } }
Dieser Code richtet einen Kafka-Produzenten ein, der zehn Nachrichten an das Beispielthema-Thema sendet.
Führen Sie den Produzenten aus:
go run producer.go
Sie sollten eine Ausgabe sehen, die darauf hinweist, dass Nachrichten erstellt wurden.
Erstellen Sie eine Datei Consumer.go und fügen Sie den folgenden Code hinzu:
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" "log" ) func main() { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "example-topic", GroupID: "example-group", }) defer reader.Close() for { msg, err := reader.ReadMessage(context.Background()) if err != nil { log.Fatal("could not read message " + err.Error()) } fmt.Printf("Consumed message: %s\n", msg.Value) } }
Dieser Verbraucher liest Nachrichten aus dem Beispielthema und gibt sie auf der Konsole aus.
Führen Sie den Verbraucher aus:
go run consumer.go
Sie sollten eine Ausgabe sehen, die darauf hinweist, dass Nachrichten verbraucht wurden.
In diesem Blogbeitrag haben wir gezeigt, wie man mit Golang einen Kafka-Produzenten und -Konsumenten einrichtet. Dieses einfache Beispiel zeigt die Grundlagen des Produzierens und Konsumierens von Nachrichten, aber die Fähigkeiten von Kafka gehen weit darüber hinaus. Mit Kafka können Sie robuste, skalierbare Echtzeit-Datenverarbeitungssysteme aufbauen.
Erkunden Sie gerne erweiterte Funktionen wie Nachrichtenpartitionierung, schlüsselbasierte Nachrichtenverteilung und Integration mit anderen Systemen. Viel Spaß beim Codieren!
Das ist es! Dieser Blogbeitrag bietet eine kurze Einführung in die Verwendung von Kafka mit Go, perfekt für Entwickler, die mit der Echtzeit-Datenverarbeitung beginnen möchten.
Das obige ist der detaillierte Inhalt vonAufbau eines Kafka-Produzenten und -Konsumenten in Go. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!