Au cours des dernières semaines, je me suis plongé dans Kafka et j'ai pris des notes en cours de route, ce que j'ai décidé de les organiser et de structurer un article de blog, dessus, outre les concepts et les astuces, il y a un exemple pratique construit avec NestJS et KafkaJs.
Apache Kafka est une plateforme de streaming d'événements distribuée conçue pour gérer des événements en temps réel. Il permet de stocker, de traiter et de récupérer des flux de données à grande échelle, à haut débit et à faible latence, ce qui le rend adapté à la création de pipelines de données en temps réel et d'applications basées sur les événements.
Ces compromis sont des choix de conception intentionnels visant à maximiser les performances de Kafka, mais peuvent poser des défis pour les cas d'utilisation nécessitant une plus grande flexibilité :
Kafka intègre les fonctionnalités des modèles de messagerie de mise en file d'attente et de publication-abonnement, offrant aux consommateurs les avantages de chaque approche.
Kafka utilise un système de journaux partitionnés pour combiner les avantages des modèles de mise en file d'attente et de publication-abonnement. Les journaux, qui sont des séquences ordonnées d'enregistrements, sont divisés en partitions, chaque partition étant attribuée à différents abonnés (consommateurs). Cette configuration permet à plusieurs abonnés de partager un sujet tout en conservant l'évolutivité.
Nous avons vu que Kafka est une plateforme conçue pour gérer des événements en temps réel, avant de parler de la façon dont ces événements sont gérés, nous devons en avoir une définition :
Un événement est une action, un incident ou un changement enregistré dans des applications, par exemple un paiement, un clic sur un site Web ou une lecture de température.
LesÉvénements dans Kafka sont modélisés sous forme de paires clé/valeur, où les clés et les valeurs sont sérialisées en séquences d'octets.
Kafka organise les événements en journaux ordonnés appelés sujets. Lorsqu'un système externe écrit un événement dans Kafka, il est ajouté à la fin d'un sujet. Les messages restent dans les sujets pendant une durée configurable, même après avoir été lus. Contrairement aux files d'attente, les sujets sont durables, répliqués et tolérants aux pannes, stockant efficacement les enregistrements d'événements. Cependant, les journaux ne peuvent être analysés que de manière séquentielle, et non interrogés.
Les sujets sont stockés sous forme de fichiers journaux sur le disque. Cependant, les disques ont des limitations telles qu'une taille finie et des E/S. Pour surmonter ce problème, Kafka permet de diviser les sujets en partitions, divisant ainsi un seul journal en plusieurs journaux pouvant être distribués sur différents serveurs. Ce partitionnement permet à Kafka d'évoluer horizontalement, améliorant ainsi sa capacité à gérer de gros volumes d'événements et un débit élevé.
Kafka attribue des messages aux partitions selon qu'elles possèdent ou non une clé :
Kafka fonctionne comme une infrastructure de données distribuée utilisant des nœuds appelés courtiers, qui forment collectivement un cluster Kafka. Les courtiers peuvent fonctionner sur du matériel nu, une instance cloud, dans un conteneur géré par Kubernetes, dans Docker sur votre ordinateur portable ou partout où les processus JVM peuvent s'exécuter.
Les courtiers se concentrent sur :
Ils n'effectuent pas de calcul de message ni de routage de sujet à sujet, gardant leur conception simple et efficace.
Kafka garantit la durabilité des données et la tolérance aux pannes en répliquant les données de partition sur plusieurs courtiers. La copie principale d'une partition est la réplique leader, tandis que les copies supplémentaires sont les répliques suiveuses. Les données sont écrites sur le leader, qui réplique automatiquement les mises à jour vers les abonnés.
Ce processus de réplication garantit :
Les développeurs bénéficient de ces garanties sans avoir à gérer directement la réplication, car Kafka la gère de manière transparente.
Un producteur Kafka est une application client qui envoie (publie) des données à des sujets Kafka. Il est responsable de la création et de la transmission des messages (enregistrements) au cluster Kafka. Les producteurs déterminent le sujet et la partition où les messages seront stockés en fonction de leur configuration et de la présence d'une clé de message. Les producteurs sont responsables, sans toutefois s'y limiter :
Compression :
Les producteurs peuvent compresser les messages pour réduire la bande passante du réseau et l'utilisation du stockage.
Un consommateur Kafka est une application client qui lit les messages des sujets Kafka, il récupère les messages des partitions Kafka à leur propre rythme, permettant un traitement des données en temps réel ou par lots . Notez que Kafka ne transmet pas de messages aux consommateurs, ils extraient les messages des partitions Kafka en demandant les données.
Les consommateurs gardent également une trace des compensations qu'ils ont traitées. Les compensations peuvent être validées automatiquement ou manuellement, garantissant ainsi que les données ne sont pas perdues en cas de défaillance d'un consommateur. Cela permet une consommation flexible, y compris la relecture des messages en réinitialisant le décalage.
Un groupe de consommateurs est un ensemble de consommateurs qui coopèrent pour consommer les données de certains sujets, ce qui permet un traitement distribué des messages d'un sujet.
Les partitions d'un sujet sont réparties entre les consommateurs du groupe, garantissant que chaque message est traité par un seul consommateur au sein du groupe. Plusieurs groupes de consommateurs peuvent consommer indépendamment le même sujet sans interférence.
Lorsqu'un nouveau consommateur rejoint un groupe ou qu'un consommateur existant échoue, Kafka réaffecte les partitions entre les consommateurs du groupe pour garantir que toutes les partitions sont couvertes.
La sérialisation et la désérialisation dans Kafka consistent à convertir les données entre leur format d'origine et un tableau d'octets pour la transmission et le stockage, permettant aux producteurs et aux consommateurs de communiquer efficacement.
Est le processus de conversion d'un objet ou d'une structure de données en un flux d'octets afin qu'il puisse être transmis ou stocké. Avant qu'un producteur n'envoie des données à un sujet Kafka, il sérialise les données (clé et valeur) dans des tableaux d'octets.
Formats de sérialisation courants :
Il s'agit du processus inverse, dans lequel un flux d'octets est reconverti en son objet ou sa structure de données d'origine. Lorsqu'un consommateur lit les données d'un sujet Kafka, il désérialise le tableau d'octets dans un format utilisable pour le traitement.
La compression consiste à réduire la taille des messages avant qu'ils ne soient stockés ou transmis. Il optimise l'utilisation du stockage, réduit la consommation de bande passante du réseau et améliore les performances globales en envoyant des charges utiles plus petites entre les producteurs, les courtiers et les consommateurs.
Lorsqu'un producteur envoie des messages à un sujet Kafka, il peut compresser le message avant la transmission. Le message compressé est stocké tel quel chez les courtiers et décompressé par les consommateurs lorsqu'ils lisent les messages.
Bien que la compression permette d'économiser des ressources, il est essentiel d'équilibrer le compromis entre l'utilisation du processeur et les avantages de la compression, en choisissant le codec qui convient à votre cas d'utilisation.
L'optimisation des performances d'Apache Kafka implique d'affiner divers composants pour équilibrer efficacement le débit et la latence. Cet article ne fait qu'effleurer ce sujet, voici quelques aspects à prendre en compte lors du réglage de Kafka :
Gestion des partitions :
Configuration du producteur :
Configuration du consommateur :
Imaginez une application qui enregistre la température dans une pièce et transmet ces données à l'aide de Kafka, où une autre application les traite. Pour plus de simplicité, nous nous concentrerons exclusivement sur l'aspect Kafka, avec le producteur et le consommateur implémentés au sein de la même application. Dans ce scénario, chaque température enregistrée à un moment précis représente un événement :
{ temperature: 42, timeStamp: new Date(), };
Tout le code sera dans ce référentiel.
Tout d'abord, nous avons besoin d'un courtier Kafka, mais au lieu d'installer Kafka sur notre machine, gardons simplement cette image Docker Kafka.
Commencez par extraire cette image :
docker pull apache/kafka
Ensuite, exécutez-le en mappant le port que Kafka écoute sur le même port de notre machine :
docker run -d -p 9092:9092 --name courtier apache/kafka:latest
Ça y est, nous avons un courtier Kafka en cours d'exécution, avant de continuer, vous voudrez peut-être jouer avec en créant des sujets, en envoyant et en consommant des messages, pour ce faire, suivez simplement les instructions sur cette page d'image.
Pour créer notre application, nous allons utiliser NestJS avec KafkaJS, commencez par créer l'application avec Nest CLI
nest nouveau projet mon-nest
Dans le dossier du projet, installez kafkajs
npm je kafkajs
Et générer les modules suivants
nid g mo kafka
Nest G Mo Producteur
Nest G Mo Consommateur
Nest g mo température
Le module Kafka gérera toutes les opérations spécifiques à Kafka, y compris la gestion des classes de consommateur et de producteur pour la connexion, la déconnexion, l'envoi et la réception de messages. Ce sera le seul module à interagir directement avec le package kafkajs.
Les Modules Producteur et Consommateur agiront comme des interfaces entre la plateforme pub-sub (Kafka, dans ce cas) et le reste de l'application, en résumant les détails spécifiques à la plateforme.
Le Module Température gérera les événements. Il n'est pas nécessaire de savoir quelle plateforme pub-sub est utilisée, il suffit d'un consommateur et d'un producteur pour fonctionner.
Une fois les modules créés, créons également un dossier src/interface et y ajoutons les interfaces suivantes :
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
Dans le dossier src/kafka/, ajoutez les classes productrices et consommatrices qui implémentent ces interfaces :
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
N'oubliez pas d'exporter ces classes dans kafka.module.ts
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
Dans l'état actuel des choses, nous pourrions accéder au module de température, instancier ces classes Kafka et commencer à les utiliser. Cependant, il serait préférable que le module de température n'ait pas à se soucier de la plate-forme pub-sub qu'il utilise. Au lieu de cela, il devrait simplement fonctionner avec un producteur et/ou un consommateur injecté, en se concentrant uniquement sur l'envoi et la réception de messages, quelle que soit la plate-forme sous-jacente. De cette façon, si nous décidons de passer à une autre plateforme pub-sub à l’avenir, nous n’aurons pas besoin d’apporter de modifications au module de température.
Pour réaliser cette abstraction, nous pouvons créer des classes Producer et Consumer qui gèrent les spécificités des implémentations Producer et Consumer de Kafka. Commençons par le producteur :
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
Maintenant, le consommateur :
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
Maintenant, nous pouvons nous concentrer sur la construction du module de température. Dans le fichier temperature.service.ts, nous allons créer une méthode pour enregistrer une température qui, dans cet exemple, enverra simplement les données de température au courtier en utilisant un producteur. De plus, nous mettrons en œuvre une méthode pour gérer les messages entrants à des fins de démonstration.
Ces méthodes peuvent être invoquées par un autre service ou un contrôleur. Cependant, par souci de simplicité, dans cet exemple, nous les appellerons directement au démarrage de l'application, en utilisant la méthode onModuleInit.
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
C'est ça ! Avec le courtier exécuté dans le conteneur Docker, vous pouvez démarrer l'application pour envoyer et recevoir des messages. De plus, vous pouvez ouvrir un shell dans le conteneur du courtier à l'aide de la commande suivante :
docker exec --workdir /opt/kafka/bin/ -it courtier sh
De là, vous pouvez interagir directement avec le courtier et envoyer des messages à l'application, en recevoir des messages, créer de nouveaux sujets, etc.
Voici le référentiel avec le code de cet exemple.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!