Kafka est une file d'attente de messages distribuée open source qui est souvent utilisée pour créer des applications de traitement de flux de données en temps réel dans les applications Big Data. Golang est un langage de programmation développé par Google et est connu pour sa concurrence efficace, ses bibliothèques puissantes et son écosystème. Alors, comment utiliser Golang pour combiner avec Kafka ?
Tout d'abord, nous devons importer le package github.com/Shopify/sarama. Il s'agit d'une bibliothèque client Golang qui prend en charge Kafka. Pendant le processus d'installation, vous devez exécuter la commande suivante :
go get github.com/Shopify/sarama
Ensuite, nous devons créer un producteur. Tout d'abord, créez la configuration :
config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 config.Producer.Return.Successes = true
Ici, nous configurons le producteur pour qu'il attende tous les ACK, essaie jusqu'à 5 tentatives et renvoie un message de réussite au producteur après le succès.
Ensuite, nous devons créer une instance de producteur :
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close()
Nous devons spécifier l'adresse d'un courtier Kafka comme point de terminaison du service pour se connecter à Kafka. Ici, nous nous connectons au serveur Kafka local. Nous appelons également la méthode .Close()
pour garantir que le producteur nettoie à sa sortie. .Close()
方法,以确保生产者退出时会清理。
现在我们已经准备好了开始向Kafka主题发布消息:
msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("Hello World!"), } part, offset, err := producer.SendMessage(msg) if err != nil { fmt.Printf("Error publishing message: %v", err) } else { fmt.Printf("Message published successfully. Partition: %v, Offset: %v\n", part, offset) }
在这个例子中,我们发布了一个消息到名为“test”的主题中。如果没有错误,它会打印出成功发布的分区和偏移量。
现在我们已经创建了一个生产者,向Kafka发布了一条消息。接下来,我们来看一下如何创建一个消费者。
首先,我们需要创建消费者配置:
config := sarama.NewConfig() config.Consumer.Return.Errors = true
此处我们设定了接收错误。
接下来,我们需要创建一个消费者实例:
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer consumer.Close()
这里我们同样指定了一个Kafka broker地址。我们还需要调用.Close()
方法来确保消费者退出时会清理。
现在我们已经准备好读取Kafka主题的消息:
partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) if err != nil { panic(err) } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Received message from partition %d with offset %d: %s = %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) case err := <-partitionConsumer.Errors(): fmt.Println("Error: ", err.Error()) } }
在这个例子中,我们订阅了名为“test”的主题。然后我们读取第一个分区的偏移量。我们然后在一个循环中无限读取来自该分区的消息。循环中的select
rrreee
Dans cet exemple, nous publions un message sur le sujet nommé "test". S'il n'y a aucune erreur, il imprime la partition et le décalage publiés avec succès. 🎜🎜Maintenant, nous avons créé un producteur qui publie un message à Kafka. Voyons ensuite comment créer un consommateur. 🎜🎜Tout d'abord, nous devons créer la configuration du consommateur : 🎜rrreee🎜Ici, nous définissons l'erreur de réception. 🎜🎜Ensuite, nous devons créer une instance de consommateur : 🎜rrreee🎜Ici, nous spécifions également l'adresse d'un courtier Kafka. Nous devons également appeler la méthode.Close()
pour garantir que le consommateur nettoiera à sa sortie. 🎜🎜Nous sommes maintenant prêts à lire les messages du sujet Kafka : 🎜rrreee🎜Dans cet exemple, nous nous abonnons au sujet nommé "test". Ensuite, nous lisons le décalage de la première partition. Nous lisons ensuite les messages de cette partition à l'infini en boucle. L'instruction select
dans la boucle écoutera toujours les canaux de message et d'erreur et les imprimera respectivement. 🎜🎜Jusqu'à présent, nous avons présenté comment utiliser Golang et Kafka pour les combiner. Avec cet exemple simple, vous devriez maîtriser l'utilisation de base de Golang et Kafka. 🎜Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!