Maison > interface Web > js tutoriel > Les fondamentaux de Kafka avec un exemple pratique

Les fondamentaux de Kafka avec un exemple pratique

Linda Hamilton
Libérer: 2024-12-28 09:26:11
original
342 Les gens l'ont consulté

Au cours des dernières semaines, je me suis plongé dans Kafka et j'ai pris des notes en cours de route, ce que j'ai décidé de les organiser et de structurer un article de blog, dessus, outre les concepts et les astuces, il y a un exemple pratique construit avec NestJS et KafkaJs.

Qu’est-ce que Kafka ?

Apache Kafka est une plateforme de streaming d'événements distribuée conçue pour gérer des événements en temps réel. Il permet de stocker, de traiter et de récupérer des flux de données à grande échelle, à haut débit et à faible latence, ce qui le rend adapté à la création de pipelines de données en temps réel et d'applications basées sur les événements.

Principales caractéristiques :

  • Diffusion d'événements : Kafka organise les données en sujets, qui sont des journaux d'événements ordonnés.
  • Architecture distribuée : Kafka est conçu pour l'évolutivité et la tolérance aux pannes. Il fonctionne comme un cluster de nœuds appelés courtiers et peut distribuer des données sur plusieurs serveurs.
  • Modèle de publication-abonnement : les producteurs écrivent des messages sur les sujets et les consommateurs lisent leurs messages. Kafka prend en charge plusieurs consommateurs, permettant à différentes applications de traiter indépendamment le même flux de données.
  • Hautes performances : Kafka est optimisé pour un débit élevé, traitant des millions de messages par seconde avec une faible latence.
  • Stockage durable : Kafka stocke les messages sur disque avec des périodes de conservation configurables, garantissant la persistance et la fiabilité des données.
  • Partitionnement et réplication : Les sujets sont divisés en partitions pour l'évolutivité et répliqués entre les courtiers pour la tolérance aux pannes.
  • Rejouabilité : Les consommateurs peuvent relire les messages en réinitialisant leur décalage, permettant ainsi le retraitement ou la récupération des données.
  • Intégration et écosystème : Kafka s'intègre à divers systèmes et dispose d'outils tels que Kafka Connect (pour l'intégration de données) et Kafka Streams (pour le traitement des flux).

Avantages

  • Fiabilité : Il garantit la tolérance aux pannes grâce à la distribution, à la réplication et au partitionnement des données.
  • Évolutivité : Kafka peut traiter des volumes de données massifs et évoluer horizontalement sans temps d'arrêt.
  • Durabilité : Les messages sont rapidement stockés, garantissant la résilience et la persistance des données.
  • Performances : Kafka maintient des performances élevées sous des charges de données extrêmes, gérant de gros volumes de données sans temps d'arrêt ni perte de données.

Inconvénients

Ces compromis sont des choix de conception intentionnels visant à maximiser les performances de Kafka, mais peuvent poser des défis pour les cas d'utilisation nécessitant une plus grande flexibilité :

  • Flexibilité limitée : Kafka ne prend pas en charge les requêtes étendues, telles que le filtrage de données spécifiques dans les rapports. Les consommateurs doivent gérer ces tâches, car Kafka récupère les messages par décalage dans l'ordre dans lequel ils sont reçus.
  • Non conçu pour le stockage à long terme : Kafka excelle dans le streaming de données, mais n'est pas adapté au stockage de données historiques pendant de longues périodes. La duplication des données peut rendre le stockage coûteux pour les grands ensembles de données.
  • Pas de prise en charge des sujets génériques : Kafka n'autorise pas la consommation de plusieurs sujets à l'aide de modèles génériques (par exemple, log-2024-*).

Cas d'utilisation

  • Analyse en temps réel : Traitez et analysez les flux de données au fur et à mesure qu'ils se produisent.
  • Sourcing d'événements : Enregistrez toutes les modifications apportées à l'état d'une application sous la forme d'une séquence d'événements.
  • Agrégation de journaux : Collectez et gérez les journaux des systèmes distribués.
  • Pipelines de données : Diffusez des données entre les systèmes de manière fiable et efficace.
  • Applications IoT : Gérez les données des capteurs des appareils IoT en temps réel.

Comment fonctionne Kafka ?

Kafka intègre les fonctionnalités des modèles de messagerie de mise en file d'attente et de publication-abonnement, offrant aux consommateurs les avantages de chaque approche.

  • Queuing permet un traitement de données évolutif en répartissant les tâches sur plusieurs instances de consommateur, mais les files d'attente traditionnelles ne prennent pas en charge plusieurs abonnés.
  • Le modèle publier-abonnement prend en charge plusieurs abonnés mais ne peut pas répartir les tâches entre plusieurs processus de travail car chaque message est envoyé à tous les abonnés.

Kafka utilise un système de journaux partitionnés pour combiner les avantages des modèles de mise en file d'attente et de publication-abonnement. Les journaux, qui sont des séquences ordonnées d'enregistrements, sont divisés en partitions, chaque partition étant attribuée à différents abonnés (consommateurs). Cette configuration permet à plusieurs abonnés de partager un sujet tout en conservant l'évolutivité.

Kafka fundamentals with a practical example

Événements, sujets et partitions

Nous avons vu que Kafka est une plateforme conçue pour gérer des événements en temps réel, avant de parler de la façon dont ces événements sont gérés, nous devons en avoir une définition :

Un événement est une action, un incident ou un changement enregistré dans des applications, par exemple un paiement, un clic sur un site Web ou une lecture de température.

Les

Événements dans Kafka sont modélisés sous forme de paires clé/valeur, où les clés et les valeurs sont sérialisées en séquences d'octets.

  • Les Valeurs représentent souvent des objets de domaine sérialisés ou des entrées brutes, telles que des sorties de capteurs ou d'autres données d'application. Ils encapsulent les informations essentielles transmises lors de l'événement Kafka.
  • Les Clés peuvent être des objets de domaine complexes, mais il s'agit souvent de types simples comme des chaînes ou des entiers. Au lieu d'identifier de manière unique un événement individuel (comme le fait une clé primaire dans une base de données relationnelle), les clés identifient généralement des entités au sein du système, telles qu'un utilisateur spécifique, une commande ou un appareil connecté.

Kafka organise les événements en journaux ordonnés appelés sujets. Lorsqu'un système externe écrit un événement dans Kafka, il est ajouté à la fin d'un sujet. Les messages restent dans les sujets pendant une durée configurable, même après avoir été lus. Contrairement aux files d'attente, les sujets sont durables, répliqués et tolérants aux pannes, stockant efficacement les enregistrements d'événements. Cependant, les journaux ne peuvent être analysés que de manière séquentielle, et non interrogés.

Les sujets sont stockés sous forme de fichiers journaux sur le disque. Cependant, les disques ont des limitations telles qu'une taille finie et des E/S. Pour surmonter ce problème, Kafka permet de diviser les sujets en partitions, divisant ainsi un seul journal en plusieurs journaux pouvant être distribués sur différents serveurs. Ce partitionnement permet à Kafka d'évoluer horizontalement, améliorant ainsi sa capacité à gérer de gros volumes d'événements et un débit élevé.

Kafka attribue des messages aux partitions selon qu'elles possèdent ou non une clé :

  • Pas de clé : Les messages sont distribués à tour de rôle sur toutes les partitions, garantissant une distribution uniforme des données mais ne préservant pas l'ordre des messages.
  • Avec clé : La partition est déterminée en hachant la clé, garantissant que les messages avec la même clé vont toujours vers la même partition et maintiennent leur ordre.

Courtiers

Kafka fonctionne comme une infrastructure de données distribuée utilisant des nœuds appelés courtiers, qui forment collectivement un cluster Kafka. Les courtiers peuvent fonctionner sur du matériel nu, une instance cloud, dans un conteneur géré par Kubernetes, dans Docker sur votre ordinateur portable ou partout où les processus JVM peuvent s'exécuter.

Les courtiers se concentrent sur :

  • Écrire de nouveaux événements sur des partitions.
  • Servir les lectures à partir des partitions.
  • Réplication des partitions entre les courtiers.

Ils n'effectuent pas de calcul de message ni de routage de sujet à sujet, gardant leur conception simple et efficace.

Réplication

Kafka garantit la durabilité des données et la tolérance aux pannes en répliquant les données de partition sur plusieurs courtiers. La copie principale d'une partition est la réplique leader, tandis que les copies supplémentaires sont les répliques suiveuses. Les données sont écrites sur le leader, qui réplique automatiquement les mises à jour vers les abonnés.

Ce processus de réplication garantit :

  • Sécurité des données, même en cas de panne du courtier ou du stockage.
  • Basculement automatique, où une autre réplique prend le relais en cas d'échec du leader actuel.

Les développeurs bénéficient de ces garanties sans avoir à gérer directement la réplication, car Kafka la gère de manière transparente.

Producteurs

Un producteur Kafka est une application client qui envoie (publie) des données à des sujets Kafka. Il est responsable de la création et de la transmission des messages (enregistrements) au cluster Kafka. Les producteurs déterminent le sujet et la partition où les messages seront stockés en fonction de leur configuration et de la présence d'une clé de message. Les producteurs sont responsables, sans toutefois s'y limiter :

  • Composition du message :
    • Chaque message se compose d'une clé (facultatif), d'une valeur (les données réelles) et de métadonnées.
    • La clé détermine la partition du message, garantissant l'ordre des messages avec la même clé.
  • Affectation des partitions :
    • Si une clé est fournie, le producteur utilise un algorithme de hachage pour déterminer la partition.
    • Sans clé, les messages sont distribués entre les partitions de manière circulaire pour l'équilibrage de charge.
  • Compression :

    Les producteurs peuvent compresser les messages pour réduire la bande passante du réseau et l'utilisation du stockage.

Consommateurs

Un consommateur Kafka est une application client qui lit les messages des sujets Kafka, il récupère les messages des partitions Kafka à leur propre rythme, permettant un traitement des données en temps réel ou par lots . Notez que Kafka ne transmet pas de messages aux consommateurs, ils extraient les messages des partitions Kafka en demandant les données.

Les consommateurs gardent également une trace des compensations qu'ils ont traitées. Les compensations peuvent être validées automatiquement ou manuellement, garantissant ainsi que les données ne sont pas perdues en cas de défaillance d'un consommateur. Cela permet une consommation flexible, y compris la relecture des messages en réinitialisant le décalage.

Groupes de consommateurs

Un groupe de consommateurs est un ensemble de consommateurs qui coopèrent pour consommer les données de certains sujets, ce qui permet un traitement distribué des messages d'un sujet.

Les partitions d'un sujet sont réparties entre les consommateurs du groupe, garantissant que chaque message est traité par un seul consommateur au sein du groupe. Plusieurs groupes de consommateurs peuvent consommer indépendamment le même sujet sans interférence.

Lorsqu'un nouveau consommateur rejoint un groupe ou qu'un consommateur existant échoue, Kafka réaffecte les partitions entre les consommateurs du groupe pour garantir que toutes les partitions sont couvertes.

Sérialisation et désérialisation

La sérialisation et la désérialisation dans Kafka consistent à convertir les données entre leur format d'origine et un tableau d'octets pour la transmission et le stockage, permettant aux producteurs et aux consommateurs de communiquer efficacement.

Sérialisation

Est le processus de conversion d'un objet ou d'une structure de données en un flux d'octets afin qu'il puisse être transmis ou stocké. Avant qu'un producteur n'envoie des données à un sujet Kafka, il sérialise les données (clé et valeur) dans des tableaux d'octets.

Formats de sérialisation courants :

  • JSON : lisible par l'homme, largement compatible.
  • Avro : Compact et efficace, basé sur un schéma.
  • Protobuf : Compact, basé sur un schéma et indépendant du langage.
  • String : sérialisation simple basée sur du texte.
  • Sérialisation personnalisée : pour les besoins spécifiques à une application.

Désérialisation

Il s'agit du processus inverse, dans lequel un flux d'octets est reconverti en son objet ou sa structure de données d'origine. Lorsqu'un consommateur lit les données d'un sujet Kafka, il désérialise le tableau d'octets dans un format utilisable pour le traitement.

Compression

La compression consiste à réduire la taille des messages avant qu'ils ne soient stockés ou transmis. Il optimise l'utilisation du stockage, réduit la consommation de bande passante du réseau et améliore les performances globales en envoyant des charges utiles plus petites entre les producteurs, les courtiers et les consommateurs.

Lorsqu'un producteur envoie des messages à un sujet Kafka, il peut compresser le message avant la transmission. Le message compressé est stocké tel quel chez les courtiers et décompressé par les consommateurs lorsqu'ils lisent les messages.

Avantages

  • Bande passante réseau réduite : Des charges utiles plus petites signifient que moins de données sont transmises sur le réseau.
  • Exigences de stockage inférieures : Les messages compressés occupent moins d'espace sur le disque.
  • Débit amélioré : Des messages plus petits permettent un transfert et un traitement des données plus rapides.

Quand utiliser ?

  • Cas d'utilisation avec des messages de grande taille : La compression réduit considérablement la taille des données.
  • Systèmes à haut débit : Réduit la pression sur les ressources réseau et de stockage.
  • Regroupement : La compression fonctionne mieux lorsque les producteurs regroupent plusieurs messages ensemble.

Bien que la compression permette d'économiser des ressources, il est essentiel d'équilibrer le compromis entre l'utilisation du processeur et les avantages de la compression, en choisissant le codec qui convient à votre cas d'utilisation.

Types de compression pris en charge

  • Aucun : Aucune compression (par défaut).
  • Gzip : Taux de compression élevé mais utilisation du processeur plus élevée.
  • Snappy : Vitesse de compression et utilisation du processeur équilibrées, adaptées aux cas d'utilisation en temps réel.
  • LZ4 : Compression et décompression plus rapides, optimisées pour les systèmes à faible latence.
  • Zstd : Taux de compression élevé avec de meilleures performances que Gzip, pris en charge dans les versions plus récentes de Kafka.

Réglage

L'optimisation des performances d'Apache Kafka implique d'affiner divers composants pour équilibrer efficacement le débit et la latence. Cet article ne fait qu'effleurer ce sujet, voici quelques aspects à prendre en compte lors du réglage de Kafka :

  • Gestion des partitions :

    • Nombre de partitions : Augmentez le nombre de partitions pour améliorer le parallélisme et le débit. Cependant, évitez les partitions excessives pour éviter les frais de gestion. Alignez le nombre de partitions avec vos capacités de consommateur et le taux de consommation souhaité.
  • Configuration du producteur :

    • Traitement par lots : Configurez batch.size et linger.ms pour permettre un traitement par lots efficace des messages, réduisant ainsi le nombre de requêtes et améliorant le débit.
    • Compression : Implémentez la compression (par exemple, compression.type=snappy) pour diminuer la taille des messages, réduisant ainsi l'utilisation du réseau et du stockage. Soyez conscient de la surcharge CPU supplémentaire introduite par la compression.
  • Configuration du consommateur :

    • Récupérer les paramètres : Ajustez fetch.min.bytes et fetch.max.wait.ms pour contrôler la façon dont les consommateurs récupèrent les messages, en équilibrant la latence et le débit en fonction des besoins de votre application.

Exemple pratique

Imaginez une application qui enregistre la température dans une pièce et transmet ces données à l'aide de Kafka, où une autre application les traite. Pour plus de simplicité, nous nous concentrerons exclusivement sur l'aspect Kafka, avec le producteur et le consommateur implémentés au sein de la même application. Dans ce scénario, chaque température enregistrée à un moment précis représente un événement :

{
  temperature: 42,
  timeStamp: new Date(),
};
Copier après la connexion
Copier après la connexion
Copier après la connexion

Tout le code sera dans ce référentiel.

Tout d'abord, nous avons besoin d'un courtier Kafka, mais au lieu d'installer Kafka sur notre machine, gardons simplement cette image Docker Kafka.

Commencez par extraire cette image :

docker pull apache/kafka

Ensuite, exécutez-le en mappant le port que Kafka écoute sur le même port de notre machine :

docker run -d -p 9092:9092 --name courtier apache/kafka:latest

Ça y est, nous avons un courtier Kafka en cours d'exécution, avant de continuer, vous voudrez peut-être jouer avec en créant des sujets, en envoyant et en consommant des messages, pour ce faire, suivez simplement les instructions sur cette page d'image.

Pour créer notre application, nous allons utiliser NestJS avec KafkaJS, commencez par créer l'application avec Nest CLI

nest nouveau projet mon-nest

Dans le dossier du projet, installez kafkajs

npm je kafkajs

Et générer les modules suivants

nid g mo kafka

Nest G Mo Producteur

Nest G Mo Consommateur

Nest g mo température

Le module Kafka gérera toutes les opérations spécifiques à Kafka, y compris la gestion des classes de consommateur et de producteur pour la connexion, la déconnexion, l'envoi et la réception de messages. Ce sera le seul module à interagir directement avec le package kafkajs.

Les Modules Producteur et Consommateur agiront comme des interfaces entre la plateforme pub-sub (Kafka, dans ce cas) et le reste de l'application, en résumant les détails spécifiques à la plateforme.

Le Module Température gérera les événements. Il n'est pas nécessaire de savoir quelle plateforme pub-sub est utilisée, il suffit d'un consommateur et d'un producteur pour fonctionner.

Une fois les modules créés, créons également un dossier src/interface et y ajoutons les interfaces suivantes :

{
  temperature: 42,
  timeStamp: new Date(),
};
Copier après la connexion
Copier après la connexion
Copier après la connexion
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
Copier après la connexion
Copier après la connexion

Dans le dossier src/kafka/, ajoutez les classes productrices et consommatrices qui implémentent ces interfaces :

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
Copier après la connexion
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

Copier après la connexion

N'oubliez pas d'exporter ces classes dans kafka.module.ts

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}
Copier après la connexion

Dans l'état actuel des choses, nous pourrions accéder au module de température, instancier ces classes Kafka et commencer à les utiliser. Cependant, il serait préférable que le module de température n'ait pas à se soucier de la plate-forme pub-sub qu'il utilise. Au lieu de cela, il devrait simplement fonctionner avec un producteur et/ou un consommateur injecté, en se concentrant uniquement sur l'envoi et la réception de messages, quelle que soit la plate-forme sous-jacente. De cette façon, si nous décidons de passer à une autre plateforme pub-sub à l’avenir, nous n’aurons pas besoin d’apporter de modifications au module de température.

Pour réaliser cette abstraction, nous pouvons créer des classes Producer et Consumer qui gèrent les spécificités des implémentations Producer et Consumer de Kafka. Commençons par le producteur :

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
Copier après la connexion
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}
Copier après la connexion

Maintenant, le consommateur :

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
Copier après la connexion
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
Copier après la connexion

Maintenant, nous pouvons nous concentrer sur la construction du module de température. Dans le fichier temperature.service.ts, nous allons créer une méthode pour enregistrer une température qui, dans cet exemple, enverra simplement les données de température au courtier en utilisant un producteur. De plus, nous mettrons en œuvre une méthode pour gérer les messages entrants à des fins de démonstration.

Ces méthodes peuvent être invoquées par un autre service ou un contrôleur. Cependant, par souci de simplicité, dans cet exemple, nous les appellerons directement au démarrage de l'application, en utilisant la méthode onModuleInit.

{
  temperature: 42,
  timeStamp: new Date(),
};
Copier après la connexion
Copier après la connexion
Copier après la connexion
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
Copier après la connexion
Copier après la connexion

C'est ça ! Avec le courtier exécuté dans le conteneur Docker, vous pouvez démarrer l'application pour envoyer et recevoir des messages. De plus, vous pouvez ouvrir un shell dans le conteneur du courtier à l'aide de la commande suivante :

docker exec --workdir /opt/kafka/bin/ -it courtier sh

De là, vous pouvez interagir directement avec le courtier et envoyer des messages à l'application, en recevoir des messages, créer de nouveaux sujets, etc.

Voici le référentiel avec le code de cet exemple.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Derniers articles par auteur
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal