Ces dernières années, avec l'essor du big data et des communautés open source actives, de plus en plus d'entreprises ont commencé à rechercher des systèmes de traitement de données interactifs hautes performances pour répondre aux besoins croissants en matière de données. Dans cette vague de mises à niveau technologiques, le go-zero et Kafka+Avro suscitent l’attention et sont adoptés par de plus en plus d’entreprises.
go-zero est un framework de microservices développé sur la base du langage Golang. Il présente les caractéristiques de hautes performances, de facilité d'utilisation, d'extension facile et de maintenance facile. Il est conçu pour aider les entreprises à créer rapidement des systèmes d'applications de microservices efficaces. Sa croissance rapide est due aux excellentes performances et à la haute efficacité de développement de Golang lui-même, ainsi qu'à l'itération et à l'optimisation continues de l'équipe go-zero.
Kafka est un système de traitement de flux distribué développé par Apache. Il présente les caractéristiques d'une haute disponibilité et d'un débit élevé. Il s'agit de l'une des files d'attente de messages les plus populaires de l'écosystème actuel du Big Data. Avro est un outil de sérialisation de données développé par Apache, qui peut convertir les flux de données en formats binaires, améliorant ainsi la compression des données et l'efficacité de la transmission. Il peut également prendre en charge les mises à niveau et les conversions de formats de données.
Dans cet article, nous présenterons comment combiner go-zero et Kafka+Avro pour créer un système de traitement de données interactif haute performance. Le processus pratique spécifique est le suivant :
Tout d'abord, nous devons intégrer le client Kafka dans le service go-zero. go-zero fournit un package Kafka qui peut facilement interagir avec Kafka.
Il nous suffit d'introduire le package Kafka dans le projet et de configurer les paramètres Kafka dans le fichier de configuration pour obtenir la connexion et l'interaction des données avec Kafka. Voici un exemple de configuration Kafka :
[kafka] addrs = ["localhost:9092"] version = "2.0.0" maxMessageBytes = 10000000
Dans une logique métier spécifique, nous pouvons utiliser les API producteur et consommateur fournies par Kafka pour envoyer et recevoir des données. Voici un exemple de producteur Kafka :
var ( topic = "test" ) func (s *Service) Produce(msg []byte) error { p, err := kafka.NewProducer(s.cfg.Kafka) if err != nil { return err } defer p.Close() return p.Send(context.TODO(), &kafka.Message{ Key: []byte(topic), Value: msg, }) }
Dans l'exemple ci-dessus, nous avons créé un sujet Kafka nommé "test" et lorsque la méthode Produce est appelée, les données sont envoyées au sujet.
Ensuite, nous devons convertir les données au format Avro pour la sérialisation et la désérialisation. go-zero fournit un package Avro et prend en charge la génération de code. En définissant le fichier Schema, nous pouvons générer le code Go correspondant pour encoder et décoder les données Avro.
Ce qui suit est un exemple de configuration du schéma Avro :
{ "namespace": "com.example", "type": "record", "name": "User", "fields": [ { "name": "name", "type": "string" }, { "name": "age", "type": "int" } ] }
En exécutant la commande suivante, le fichier Go correspondant peut être automatiquement généré :
$ go run github.com/gogo/protobuf/protoc-gen-gogofaster --proto_path=./ example.proto --gogofaster_out
Dans le fichier Go généré, nous pouvons voir la relation entre le type de champ Avro et le Type de données Go correspondant La relation de mappage réalise la sérialisation et la désérialisation des données.
Après avoir intégré Kafka et Avro, nous pouvons commencer à construire un système de traitement de données interactif haute performance. Nous pouvons utiliser Kafka comme centre de stockage de données et y établir plusieurs partitions pour réaliser un stockage et un traitement distribués des données.
Pour chaque partition, nous pouvons créer un groupe de consommateurs pour réaliser un traitement parallèle et un équilibrage de charge des données. Dans le même temps, nous pouvons utiliser le pool de coroutines et le canal de synchronisation fournis par go-zero pour optimiser les performances de concurrence du traitement des données.
Ce qui suit est un exemple de système de traitement de données interactif :
// 创建消费组 group, err := kafka.NewGroup(s.cfg.Kafka, "test", kafka.WithGroupID("test-group")) if err != nil { return nil, err } // 创建消费者 consumer, err := group.NewConsumer(context.Background(), []string{"test"}) if err != nil { return nil, err } // 启动并发协程 for i := 0; i < s.cfg.WorkerNum; i++ { go func() { for { select { // 从同步通道中获取新消息 case msg := <-msgs: if err := s.processMsg(msg); err != nil { log.Errorf("failed to process message(%v): %v", msg.Value, err) } } } }() } // 消费数据 for { m, err := consumer.FetchMessage(context.Background()) if err != nil { log.Errorf("failed to fetch message: %v", err) continue } // 将新消息发送到同步通道中 msgs <- m }
Dans l'exemple ci-dessus, nous avons créé un groupe de consommateurs « test-group » et créé le consommateur correspondant. Pendant le traitement, nous démarrons d'abord plusieurs coroutines simultanées pour réaliser un traitement parallèle des données. Lorsqu'un nouveau message est reçu, nous l'envoyons à un canal synchrone et utilisons un pool de coroutines pour le traitement asynchrone.
Grâce à la construction ci-dessus, nous avons intégré avec succès go-zero, Kafka et Avro pour mettre en œuvre un système de traitement de données interactif haute performance. L’utilisation de ce type de système permet de gérer facilement des données massives et d’améliorer l’efficacité du traitement et de l’analyse des données.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!