Avec le développement continu de la technologie Internet et l'expansion continue des scénarios d'application, la technologie de mise en cache en temps réel est de plus en plus devenue une compétence essentielle pour les entreprises Internet. En tant que méthode de technologie de mise en cache en temps réel, la file d'attente de messages est de plus en plus privilégiée par les développeurs dans les applications pratiques. Cet article explique principalement comment établir une technologie de mise en cache en temps réel basée sur la file d'attente de messages Kafka dans Golang.
Qu'est-ce que la file d'attente de messages Kafka ?
Kafka est un système de messagerie distribué développé par LinkedIn et peut gérer des dizaines de millions de messages. Il présente les caractéristiques d’un débit élevé, d’une faible latence, d’une durabilité et d’une fiabilité élevée. Kafka comporte trois composants principaux : les producteurs, les consommateurs et les sujets. Parmi eux, les producteurs et les consommateurs sont les éléments essentiels de Kafka.
Le producteur envoie des messages au sujet spécifié, et peut également spécifier la partition et la clé (Key). Les consommateurs reçoivent les messages correspondants du sujet. Chez Kafka, producteurs et consommateurs sont indépendants et n’ont aucune dépendance les uns envers les autres. Ils interagissent uniquement les uns avec les autres en partageant le même sujet. Cette architecture implémente la messagerie distribuée et résout efficacement les exigences de file d'attente de messages dans divers scénarios commerciaux.
La combinaison de Golang et Kafka
Golang est un langage de programmation populaire et efficace ces dernières années. Avec sa haute concurrence, ses hautes performances et d'autres caractéristiques, il est de plus en plus largement utilisé. . Il présente l'avantage inhérent de se combiner avec les files d'attente de messages, car dans Golang, le nombre de goroutines a une relation biunivoque avec le nombre de threads du noyau, ce qui signifie que Golang peut gérer des tâches simultanées à grande échelle de manière efficace et fluide, tout en Kafka peut distribuer divers messages à différents nœuds de courtier selon des règles de partition personnalisables pour réaliser une expansion horizontale.
En utilisant la bibliothèque tierce Kafka sarama dans Golang, nous pouvons facilement implémenter une interaction avec Kafka. Les étapes spécifiques de mise en œuvre sont les suivantes :
1. Introduisez la bibliothèque sarama dans le projet Golang :
import "github.com/Shopify/sarama"
2. Créez une instance d'expéditeur de message (Producteur) : #🎜🎜. #
config := sarama.NewConfig() config.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
msg := &sarama.ProducerMessage{ Topic: "test-topic", Value: sarama.StringEncoder("hello world"), } producer.Input() <- msg
config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest) for msg := range partitionConsumer.Messages() { fmt.Printf("Consumed message: %s ", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") // 确认消息已被消费 }
type Cache struct { Key string Value interface{} } var cache []Cache
type Message struct { Operation string // 操作类型(Add/Delete/Update) Cache Cache // 缓存内容 } func generateMessage(operation string, cache Cache) Message { return Message{ Operation: operation, Cache: cache, } }
func producer(messages chan *sarama.ProducerMessage) { config := sarama.NewConfig() config.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } for { select { case msg := <-messages: producer.Input() <- msg } } } func pushMessage(operation string, cache Cache, messages chan *sarama.ProducerMessage) { msg := sarama.ProducerMessage{ Topic: "cache-topic", Value: sarama.StringEncoder(generateMessage(operation, cache)), } messages <- &msg }
func consumer() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } partitionConsumer, err := consumer.ConsumePartition("cache-topic", 0, sarama.OffsetNewest) if err != nil { panic(err) } for msg := range partitionConsumer.Messages() { var message Message err := json.Unmarshal(msg.Value, &message) if err != nil { fmt.Println("Failed to unmarshal message: ", err.Error()) continue } switch message.Operation { case "Add": cache = append(cache, message.Cache) case "Delete": for i, c := range cache { if c.Key == message.Cache.Key { cache = append(cache[:i], cache[i+1:]...) break } } case "Update": for i, c := range cache { if c.Key == message.Cache.Key { cache[i] = message.Cache break } } } partitionConsumer.MarkOffset(msg, "") // 确认消息已被消费 } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!