Apache Kafka ist ein Nachrichtenwarteschlangensystem, das auf dem Publish-Subscribe-Modell basiert. Es bietet einen zuverlässigen, effizienten und skalierbaren Nachrichtenübermittlungsmechanismus und wird häufig in den Bereichen Big Data, Echtzeit-Datenstromverarbeitung, Protokollerfassung und anderen Bereichen eingesetzt. Die Go-Sprache ist eine schnelle, verteilte und gleichzeitige Programmiersprache. Sie eignet sich natürlich für die Nachrichtenübermittlung und -verarbeitung in Szenarien mit hoher Parallelität. In diesem Artikel behandeln wir die Verwendung von Apache Kafka für Messaging in Go, mit einer vollständigen Anleitung und Codebeispielen.
Schritt eins: Apache Kafka installieren und konfigurieren
Zuerst müssen wir Apache Kafka installieren und konfigurieren. Sie können die neueste Kafka-Version von der offiziellen Website herunterladen, entpacken und den Kafka-Server starten:
$ tar -xzf kafka_2.13-2.8.0.tgz $ cd kafka_2.13-2.8.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties
Dann starten Sie den Kafka-Server:
$ bin/kafka-server-start.sh config/server.properties
Als nächstes müssen wir ein Kafka-Thema zum Speichern und Zustellen von Nachrichten erstellen:
$ bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Dieser Befehl erstellt ein Thema mit dem Namen „my_topic“ und konfiguriert einen Replikationsfaktor und 1 Partition auf dem lokalen Knoten.
Schritt 2: Einführung und Installation der Kafka Go-Bibliothek
Um Kafka in der Go-Sprache zu verwenden, müssen wir die Kafka Go-Bibliothek eines Drittanbieters einführen. Derzeit stellt die Go-Sprache offiziell keine Kafka-bezogenen Standardbibliotheken bereit, aber die Bibliotheken von Drittanbietern in der Community sind bereits sehr ausgereift und stabil.
In diesem Artikel verwenden wir die Sarama-Bibliothek. Zur Installation können Sie den folgenden Befehl verwenden:
$ go get github.com/Shopify/sarama
Hier müssen wir das Sarama-Paket einführen und die Producer- und Consumer-APIs für die Nachrichtenübermittlung verwenden.
Schritt 3: Verwenden Sie die Producer-API, um Nachrichten zu senden
Es ist sehr einfach, die Kafka-Producer-API zum Senden von Nachrichten in der Go-Sprache zu verwenden. Zuerst müssen wir ein Kafka-Produzentenobjekt erstellen:
import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() }
Hier verwenden wir die Funktion NewSyncProducer() im Sarama-Paket, um ein synchrones Produzentenobjekt zu erstellen und die Adresse und Konfigurationsinformationen des Kafka-Servers anzugeben. Nach erfolgreicher Erstellung müssen Sie die Defer-Anweisung verwenden, um sicherzustellen, dass das Producer-Objekt nach Programmende geschlossen wird.
Als nächstes können wir die Funktion Produce() verwenden, um Nachrichten an das Kafka-Thema zu senden:
msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset)
Hier erstellen Sie zunächst ein sarama.ProducerMessage-Objekt, legen den Themennamen und den Nachrichteninhalt fest und verwenden dann SendMessage() des Producer-Objekts. Die Funktion sendet die Nachricht an das Zielthema.
Schritt 4: Verwenden Sie die Consumer-API, um Nachrichten aus dem Thema zu empfangen.
Es ist auch sehr einfach, die Kafka-Consumer-API zu verwenden, um Nachrichten in der Go-Sprache zu empfangen. Zuerst müssen wir ein Kafka-Konsumentenobjekt erstellen:
config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close()
Hier verwenden wir die Funktion NewConsumer() im Sarama-Paket, um ein Konsumentenobjekt zu erstellen und eine Verbindung mit dem Kafka-Server herzustellen. Nach erfolgreicher Erstellung müssen Sie die Defer-Anweisung verwenden, um sicherzustellen, dass das Verbraucherobjekt nach Programmende geschlossen wird.
Als nächstes verwenden wir die Funktion ConsumePartition(), um ein bestimmtes Thema und eine bestimmte Partition zu abonnieren und den Startoffset der Nachricht festzulegen. Diese Funktion gibt ein PartitionConsumer-Objekt zurück. Wir müssen die Defer-Anweisung verwenden, um sicherzustellen, dass es nach Programmende geschlossen wird.
Schließlich können wir die Funktion Consumer.Messages() in einer for-Schleife verwenden, um die Nachrichten abzurufen und zu verarbeiten:
for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } }
Hier verwenden wir die Funktion Messages(), um die Nachrichten vom PartitionConsumer-Objekt abzurufen, und verwenden sie dann eine for-Schleife, um sie zu verarbeiten. Da es sich bei Kafka um ein hochgradig gleichzeitiges Nachrichtensystem handelt, ist es notwendig, SELECT-Anweisungen zu verwenden, um Nachrichtenbenachrichtigungen von mehreren Kanälen zu verarbeiten. Beachten Sie, dass Sie nach der Verarbeitung der Nachricht die Funktion Ack() verwenden müssen, um manuell zu bestätigen, dass die Nachricht verbraucht wurde.
Vollständiges Codebeispiel
package main import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset) consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } } }
Zusammenfassung
In diesem Artikel stellen wir die Verwendung von Apache Kafka für Nachrichten in der Go-Sprache vor und bieten eine vollständige Installation, Konfiguration, Einführung abhängiger Bibliotheken und Codeimplementierung. Kafka ist ein effizientes und zuverlässiges Nachrichtensystem, das häufig in Big Data, Echtzeit-Datenstromverarbeitung, Protokollerfassung und anderen Szenarien eingesetzt wird. Bei der Verwendung von Kafka müssen Sie einige wichtige Punkte beachten, z. B. die manuelle Bestätigung des Abschlusses des Nachrichtenverbrauchs, die Verarbeitung von Nachrichtenbenachrichtigungen von mehreren Kanälen usw. Ich hoffe, dass dieser Artikel Ihnen beim Schreiben verteilter Programme mit hoher Parallelität und der Sprache Kafka und Go hilfreich sein wird.
Das obige ist der detaillierte Inhalt vonVerwendung von Apache Kafka in Go: Eine vollständige Anleitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!