Apache Kafka est une puissante plateforme de streaming distribuée utilisée pour créer des pipelines de données en temps réel et des applications de streaming. Dans cet article de blog, nous expliquerons la configuration d'un producteur et d'un consommateur Kafka à l'aide de Golang.
Avant de commencer, assurez-vous que les éléments suivants sont installés sur votre ordinateur :
Go (1.16 ou supérieur)
Docker (pour exécuter Kafka localement)
Kafka
Pour configurer rapidement Kafka, nous utiliserons Docker. Créez un fichier docker-compose.yml dans le répertoire de votre projet :
yamlCopy codeversion: '3.7' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.13-2.7.0 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 depends_on: - zookeeper
Exécutez la commande suivante pour démarrer Kafka et Zookeeper :
docker-compose up -d
Tout d'abord, initialisez un nouveau module Go :
go mod init kafka-example
Installez la bibliothèque kafka-go :
go get github.com/segmentio/kafka-go
Maintenant, créez un fichier producteur.go et ajoutez le code suivant :
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" "log" "time" ) func main() { writer := kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "example-topic", Balancer: &kafka.LeastBytes{}, } defer writer.Close() for i := 0; i < 10; i++ { msg := kafka.Message{ Key: []byte(fmt.Sprintf("Key-%d", i)), Value: []byte(fmt.Sprintf("Hello Kafka %d", i)), } err := writer.WriteMessages(context.Background(), msg) if err != nil { log.Fatal("could not write message " + err.Error()) } time.Sleep(1 * time.Second) fmt.Printf("Produced message: %s\n", msg.Value) } }
Ce code configure un producteur Kafka qui envoie dix messages au sujet exemple-sujet.
Exécutez le producteur :
go run producer.go
Vous devriez voir un résultat indiquant que des messages ont été produits.
Créez un fichier consumer.go et ajoutez le code suivant :
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" "log" ) func main() { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "example-topic", GroupID: "example-group", }) defer reader.Close() for { msg, err := reader.ReadMessage(context.Background()) if err != nil { log.Fatal("could not read message " + err.Error()) } fmt.Printf("Consumed message: %s\n", msg.Value) } }
Ce consommateur lit les messages du sujet d'exemple et les imprime sur la console.
Exécuter le consommateur :
go run consumer.go
Vous devriez voir un résultat indiquant que les messages ont été consommés.
Dans cet article de blog, nous avons montré comment configurer un producteur et un consommateur Kafka à l'aide de Golang. Cet exemple simple montre les bases de la production et de la consommation de messages, mais les capacités de Kafka s'étendent bien au-delà. Avec Kafka, vous pouvez créer des systèmes de traitement de données en temps réel robustes et évolutifs.
N'hésitez pas à explorer des fonctionnalités plus avancées telles que le partitionnement des messages, la distribution des messages par clé et l'intégration avec d'autres systèmes. Bon codage !
C'est ça ! Cet article de blog fournit une introduction concise à l'utilisation de Kafka avec Go, parfaite pour les développeurs souhaitant se lancer dans le traitement des données en temps réel.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!