Apache Kafka est un système de file d'attente de messages basé sur le modèle de publication-abonnement. Il fournit un mécanisme de livraison de messages fiable, efficace et évolutif et est largement utilisé dans le big data, le traitement de flux de données en temps réel, la collecte de journaux et d'autres domaines. Le langage Go est un langage de programmation rapide, distribué et simultané. Il convient naturellement à la gestion de la transmission et du traitement des messages dans des scénarios à forte concurrence. Dans cet article, nous expliquerons comment utiliser Apache Kafka pour la messagerie dans Go, avec un guide complet et des exemples de code.
Première étape : installer et configurer Apache Kafka
Tout d'abord, nous devons installer et configurer Apache Kafka. Vous pouvez télécharger la dernière version de Kafka sur le site officiel, la décompresser et démarrer le serveur Kafka :
$ tar -xzf kafka_2.13-2.8.0.tgz $ cd kafka_2.13-2.8.0 $ bin/zookeeper-server-start.sh config/zookeeper.properties
Ensuite, démarrez le serveur Kafka :
$ bin/kafka-server-start.sh config/server.properties
Ensuite, nous devons créer un sujet Kafka pour stocker et transmettre les messages :
$ bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Cette commande créera un sujet nommé "my_topic" et configurera un facteur de réplication et 1 partition sur le nœud local.
Étape 2 : Présenter et installer la bibliothèque Kafka Go
Pour utiliser Kafka en langage Go, nous devons introduire la bibliothèque tierce Kafka Go. Actuellement, le langage Go ne fournit officiellement pas de bibliothèques standards liées à Kafka, mais les bibliothèques tierces de la communauté sont déjà très matures et stables.
Dans cet article, nous utiliserons la bibliothèque sarama. Vous pouvez utiliser la commande suivante pour installer :
$ go get github.com/Shopify/sarama
Ici, nous devons introduire le package sarama et utiliser les API producteur et consommateur pour la transmission des messages.
Étape 3 : Utiliser l'API du producteur pour envoyer des messages
Il est très simple d'utiliser l'API du producteur Kafka pour envoyer des messages en langage Go. Tout d'abord, nous devons créer un objet producteur Kafka :
import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() }
Ici, nous utilisons la fonction NewSyncProducer() dans le package sarama pour créer un objet producteur synchrone et spécifier l'adresse et les informations de configuration du serveur Kafka. Une fois la création réussie, vous devez utiliser l'instruction defer pour vous assurer que l'objet producteur est fermé une fois le programme terminé.
Ensuite, nous pouvons utiliser la fonction Produce() pour envoyer des messages au sujet Kafka :
msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset)
Ici, créez d'abord un objet sarama.ProducerMessage, définissez le nom du sujet et le contenu du message, puis utilisez SendMessage( ) de l'objet producteur. La fonction envoie le message au sujet cible.
Étape 4 : Utiliser l'API consommateur pour recevoir des messages du sujet
Il est également très simple d'utiliser l'API consommateur Kafka pour recevoir des messages en langage Go. Tout d'abord, nous devons créer un objet consommateur Kafka :
config := sarama.NewConfig() consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close()
Ici, nous utilisons la fonction NewConsumer() du package sarama pour créer un objet consommateur et établir une connexion avec le serveur Kafka. Après une création réussie, vous devez utiliser l'instruction defer pour vous assurer que l'objet consommateur est fermé une fois le programme terminé.
Ensuite, nous utilisons la fonction ConsumePartition() pour nous abonner à un sujet et une partition spécifiques et définir le décalage de départ du message. Cette fonction renvoie un objet PartitionConsumer, nous devons utiliser l'instruction defer pour garantir qu'elle est fermée une fois le programme terminé.
Enfin, nous pouvons utiliser la fonction Consumer.Messages() dans une boucle for pour récupérer les messages et les traiter :
for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } }
Ici, nous utilisons la fonction Messages() pour récupérer les messages de l'objet PartitionConsumer, puis utilisons une boucle for pour les traiter. Kafka étant un système de messagerie hautement concurrent, il est nécessaire d'utiliser des instructions select pour gérer les notifications de messages provenant de plusieurs canaux. Notez qu'après avoir traité le message, vous devez utiliser la fonction Ack() pour confirmer manuellement que le message a été consommé.
Exemple de code complet
package main import ( "log" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create producer: %s", err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "my_topic", Value: sarama.StringEncoder("hello, kafka"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatalf("Failed to send message: %s", err) } log.Printf("Message sent to partition %d at offset %d", partition, offset) consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config) if err != nil { log.Fatalf("Failed to create consumer: %s", err) } defer consumer.Close() partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetOldest) if err != nil { log.Fatalf("Failed to consume partition: %s", err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Received message: %s", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") case err := <-partitionConsumer.Errors(): log.Fatalf("Error while consuming: %s", err) } } }
Résumé
Dans cet article, nous présentons comment utiliser Apache Kafka pour la messagerie en langage Go, et fournissons l'installation complète, la configuration, l'introduction des bibliothèques dépendantes et l'implémentation du code. Kafka est un système de messagerie efficace et fiable qui a été largement utilisé dans le Big Data, le traitement des flux de données en temps réel, la collecte de journaux et d'autres scénarios. Lorsque vous utilisez Kafka, vous devez faire attention à certains points clés, tels que la confirmation manuelle de la fin de la consommation des messages, le traitement des notifications de messages provenant de plusieurs canaux, etc. J'espère que cet article vous sera utile pour écrire des programmes distribués à haute concurrence utilisant le langage Kafka et Go.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!