Heim > Web-Frontend > js-Tutorial > Kafka-Grundlagen mit einem praktischen Beispiel

Kafka-Grundlagen mit einem praktischen Beispiel

Linda Hamilton
Freigeben: 2024-12-28 09:26:11
Original
354 Leute haben es durchsucht

In den letzten Wochen habe ich mich intensiv mit Kafka beschäftigt und mir nebenbei Notizen gemacht. Ich habe beschlossen, daraus einen Blog-Beitrag zu organisieren und zu strukturieren, in dem neben Konzepten und Tipps auch ein praktisches Beispiel enthalten ist NestJS und KafkaJs.

Was ist Kafka?

Apache Kafka ist eine verteilte Event-Streaming-Plattform, die für die Verarbeitung von Echtzeitereignissen entwickelt wurde. Es ermöglicht das Speichern, Verarbeiten und Abrufen großer Datenströme mit hohem Durchsatz und geringer Latenz und eignet sich daher für den Aufbau von Echtzeit-Datenpipelines und ereignisgesteuerten Anwendungen.

Hauptmerkmale:

  • Ereignis-Streaming: Kafka organisiert Daten in Themen, bei denen es sich um geordnete Protokolle von Ereignissen handelt.
  • Verteilte Architektur: Kafka ist auf Skalierbarkeit und Fehlertoleranz ausgelegt. Es fungiert als Cluster von Knoten, die als Broker bezeichnet werden, und kann Daten auf mehrere Server verteilen.
  • Publish-Subscribe-Modell: Produzenten schreiben Nachrichten zu den Themen und Konsumenten lesen Nachrichten von ihnen. Kafka unterstützt mehrere Verbraucher, sodass verschiedene Anwendungen unabhängig voneinander denselben Datenstrom verarbeiten können.
  • Hohe Leistung: Kafka ist für hohen Durchsatz optimiert und verarbeitet Millionen von Nachrichten pro Sekunde mit geringer Latenz.
  • Dauerhafter Speicher: Kafka speichert Nachrichten auf der Festplatte mit konfigurierbaren Aufbewahrungsfristen und gewährleistet so Datenpersistenz und Zuverlässigkeit.
  • Partitionierung und Replikation: Themen werden zur Skalierbarkeit in Partitionen unterteilt und zur Fehlertoleranz über Broker hinweg repliziert.
  • Wiederspielbarkeit: Verbraucher können Nachrichten erneut lesen, indem sie ihren Offset zurücksetzen und so die Daten erneut verarbeiten oder wiederherstellen.
  • Integration und Ökosystem: Kafka lässt sich in verschiedene Systeme integrieren und verfügt über Tools wie Kafka Connect (zur Datenintegration) und Kafka Streams (zur Stream-Verarbeitung).

Vorteile

  • Zuverlässigkeit: Es gewährleistet Fehlertoleranz durch Datenverteilung, Replikation und Partitionierung.
  • Skalierbarkeit: Kafka kann riesige Datenmengen verarbeiten und ohne Ausfallzeiten horizontal skalieren.
  • Haltbarkeit:Nachrichten werden umgehend gespeichert, wodurch Ausfallsicherheit und Datenpersistenz gewährleistet sind.
  • Leistung: Kafka behält auch bei extremer Datenlast eine hohe Leistung bei und verarbeitet große Datenmengen ohne Ausfallzeiten oder Datenverlust.

Nachteile

Diese Kompromisse sind bewusste Designentscheidungen, um die Leistung von Kafka zu maximieren, können jedoch bei Anwendungsfällen, die eine größere Flexibilität erfordern, eine Herausforderung darstellen:

  • Eingeschränkte Flexibilität: Kafka bietet keine Unterstützung für erweiterte Abfragen, beispielsweise das Filtern bestimmter Daten in Berichten. Verbraucher müssen diese Aufgaben erledigen, da Kafka Nachrichten durch Offsets in der Reihenfolge abruft, in der sie empfangen werden.
  • Nicht für die Langzeitspeicherung konzipiert: Kafka zeichnet sich durch das Streamen von Daten aus, eignet sich jedoch nicht für die Speicherung historischer Daten über längere Zeiträume. Datenduplizierung kann die Speicherung großer Datenmengen kostspielig machen.
  • Keine Unterstützung für Wildcard-Themen: Kafka erlaubt nicht die Nutzung mehrerer Themen mithilfe von Wildcard-Mustern (z. B. log-2024-*).

Anwendungsfälle

  • Echtzeitanalyse: Datenströme verarbeiten und analysieren, sobald sie auftreten.
  • Ereignisbeschaffung: Zeichnen Sie alle Änderungen am Status einer Anwendung als Abfolge von Ereignissen auf.
  • Protokollaggregation: Protokolle von verteilten Systemen sammeln und verwalten.
  • Datenpipelines:Daten zwischen Systemen zuverlässig und effizient streamen.
  • IoT-Anwendungen:Verarbeiten Sie Sensordaten von IoT-Geräten in Echtzeit.

Wie funktioniert Kafka?

Kafka integriert die Funktionen von Warteschlangen- und Publish-Subscribe-Messaging-Modellen und bietet Verbrauchern die Vorteile beider Ansätze.

  • Warteschlangen ermöglichen eine skalierbare Datenverarbeitung durch die Verteilung von Aufgaben auf mehrere Verbraucherinstanzen. Herkömmliche Warteschlangen unterstützen jedoch nicht mehrere Abonnenten.
  • Das Publish-Subscribe-Modell unterstützt mehrere Abonnenten, kann Aufgaben jedoch nicht auf mehrere Arbeitsprozesse verteilen, da jede Nachricht an alle Abonnenten gesendet wird.

Kafka verwendet ein partitioniertes Protokollsystem, um die Vorteile von Warteschlangen- und Publish-Subscribe-Modellen zu kombinieren. Protokolle, bei denen es sich um geordnete Sequenzen von Datensätzen handelt, werden in Partitionen unterteilt, wobei jede Partition verschiedenen Abonnenten (Konsumenten) zugewiesen wird. Dieses Setup ermöglicht es mehreren Abonnenten, ein Thema zu teilen und gleichzeitig die Skalierbarkeit beizubehalten.

Kafka fundamentals with a practical example

Ereignisse, Themen und Partitionen

Wir haben gesehen, dass Kafka eine Plattform ist, die für die Verarbeitung von Echtzeitereignissen entwickelt wurde. Bevor wir darüber sprechen, wie mit diesen Ereignissen umgegangen wird, müssen wir eine Definition dafür haben:

Ein Ereignis ist eine Aktion, ein Vorfall oder eine Änderung, die in Anwendungen aufgezeichnet wird, zum Beispiel eine Zahlung, ein Website-Klick oder eine Temperaturmessung.

Ereignisse werden in Kafka als Schlüssel/Wert-Paare modelliert, wobei sowohl Schlüssel als auch Werte in Bytesequenzen serialisiert werden.

  • Werte stellen häufig serialisierte Domänenobjekte oder Roheingaben dar, wie z. B. Sensorausgaben oder andere Anwendungsdaten. Sie fassen die Kerninformationen zusammen, die im Kafka-Ereignis übermittelt werden.
  • Schlüssel können komplexe Domänenobjekte sein, sind jedoch häufig einfache Typen wie Zeichenfolgen oder Ganzzahlen. Anstatt ein einzelnes Ereignis eindeutig zu identifizieren (wie es ein Primärschlüssel in einer relationalen Datenbank tut), identifizieren Schlüssel normalerweise Entitäten innerhalb des Systems, wie z. B. einen bestimmten Benutzer, eine Bestellung oder ein verbundenes Gerät.

Kafka organisiert Ereignisse in geordneten Protokollen, die als Themen bezeichnet werden. Wenn ein externes System ein Ereignis in Kafka schreibt, wird es an das Ende eines Themas angehängt. Nachrichten bleiben für eine konfigurierbare Dauer in den Themen, auch nachdem sie gelesen wurden. Im Gegensatz zu Warteschlangen sind Themen dauerhaft, repliziert und fehlertolerant und speichern Ereignisdatensätze effizient. Protokolle können jedoch nur sequentiell gescannt und nicht abgefragt werden.

Themen werden als Protokolldateien auf der Festplatte gespeichert. Festplatten unterliegen jedoch Einschränkungen wie begrenzter Größe und E/A. Um dieses Problem zu lösen, ermöglicht Kafka die Unterteilung von Themen in Partitionen, wodurch ein einzelnes Protokoll in mehrere Protokolle aufgeteilt wird, die auf verschiedene Server verteilt werden können. Diese Partitionierung ermöglicht Kafka eine horizontale Skalierung und verbessert so seine Fähigkeit, große Mengen an Ereignissen und einen hohen Durchsatz zu verarbeiten.

Kafka weist Partitionen Nachrichten basierend darauf zu, ob sie einen Schlüssel haben:

  • Kein Schlüssel: Nachrichten werden im Round-Robin-Verfahren über alle Partitionen verteilt, wodurch eine gleichmäßige Datenverteilung gewährleistet wird, die Nachrichtenreihenfolge jedoch nicht erhalten bleibt.
  • Mit Schlüssel: Die Partition wird durch Hashing des Schlüssels bestimmt, wodurch sichergestellt wird, dass Nachrichten mit demselben Schlüssel immer in dieselbe Partition gelangen und ihre Reihenfolge beibehalten.

Makler

Kafka arbeitet als verteilte Dateninfrastruktur unter Verwendung von Knoten, die als Broker bezeichnet werden und zusammen einen Kafka-Cluster bilden. Broker können auf Bare-Metal-Hardware, einer Cloud-Instanz, in einem von Kubernetes verwalteten Container, in Docker auf Ihrem Laptop oder überall dort ausgeführt werden, wo JVM-Prozesse ausgeführt werden können.

Makler konzentrieren sich auf:

  • Neue Ereignisse in Partitionen schreiben.
  • Bereitstellung von Lesevorgängen aus Partitionen.
  • Partitionen über Broker hinweg replizieren.

Sie führen keine Nachrichtenberechnung oder Weiterleitung von Thema zu Thema durch, wodurch ihr Design einfach und effizient bleibt.

Replikation

Kafka gewährleistet Datenhaltbarkeit und Fehlertoleranz durch die Replikation von Partitionsdaten über mehrere Broker hinweg. Die primäre Kopie einer Partition ist das Leader-Replikat, während zusätzliche Kopien Follower-Replikate sind. Die Daten werden an den Leader geschrieben, der Aktualisierungen automatisch an die Follower repliziert.

Dieser Replikationsprozess gewährleistet:

  • Datensicherheit, auch bei Broker- oder Speicherausfällen.
  • Automatisches Failover, bei dem ein anderes Replikat die Rolle des Anführers übernimmt, wenn der aktuelle Anführer ausfällt.

Entwickler profitieren von diesen Garantien, ohne die Replikation direkt verwalten zu müssen, da Kafka sie transparent abwickelt.

Produzenten

Ein Kafka-Produzent ist eine Client-Anwendung, die Daten an Kafka-Themen sendet (veröffentlicht). Es ist für die Erstellung und Übermittlung von Nachrichten (Aufzeichnungen) an den Kafka-Cluster verantwortlich. Produzenten bestimmen das Thema und die Partition, in der Nachrichten basierend auf ihrer Konfiguration und dem Vorhandensein eines Nachrichtenschlüssels gespeichert werden. Die Hersteller sind verantwortlich für, aber nicht beschränkt auf:

  • Nachrichtenaufbau:
    • Jede Nachricht besteht aus einem Schlüssel (optional), einem Wert (den tatsächlichen Daten) und Metadaten.
    • Der Schlüssel bestimmt die Partition für die Nachricht und stellt so die Reihenfolge für Nachrichten mit demselben Schlüssel sicher.
  • Partitionszuweisung:
    • Wenn ein Schlüssel bereitgestellt wird, verwendet der Hersteller einen Hashing-Algorithmus, um die Partition zu bestimmen.
    • Ohne Schlüssel werden Nachrichten zur Lastverteilung im Round-Robin-Verfahren auf Partitionen verteilt.
  • Komprimierung:

    Produzenten können Nachrichten komprimieren, um die Netzwerkbandbreite und den Speicherverbrauch zu reduzieren.

Verbraucher

Ein Kafka-Consumer ist eine Client-Anwendung, die Nachrichten aus Kafka-Themen liest. Sie ruft Nachrichten in ihrem eigenen Tempo von Kafka-Partitionen ab und ermöglicht so eine Echtzeit- oder Stapelverarbeitung von Daten . Beachten Sie, dass Kafka keine Nachrichten an Verbraucher sendet, sondern Nachrichten von Kafka-Partitionen abruft, indem es die Daten anfordert.

Verbraucher behalten auch den Überblick über die Verrechnungen, die sie verarbeitet haben. Offsets können automatisch oder manuell festgeschrieben werden, um sicherzustellen, dass keine Daten verloren gehen, wenn ein Verbraucher ausfällt. Dies ermöglicht eine flexible Nutzung, einschließlich der Wiedergabe von Nachrichten durch Zurücksetzen des Offsets.

Verbrauchergruppen

Eine Verbrauchergruppe ist eine Gruppe von Verbrauchern, die zusammenarbeiten, um Daten aus bestimmten Themen zu konsumieren, was eine verteilte Verarbeitung der Nachrichten eines Themas ermöglicht.

Partitionen eines Themas werden auf die Verbraucher in der Gruppe aufgeteilt, um sicherzustellen, dass jede Nachricht nur von einem Verbraucher innerhalb der Gruppe verarbeitet wird. Mehrere Verbrauchergruppen können unabhängig voneinander und ohne Interferenzen dasselbe Thema konsumieren.

Wenn ein neuer Verbraucher einer Gruppe beitritt oder ein bestehender Verbraucher ausfällt, weist Kafka Partitionen unter den Verbrauchern in der Gruppe neu zu, um sicherzustellen, dass alle Partitionen abgedeckt sind.

Serialisierung und Deserialisierung

Bei der Serialisierung und Deserialisierung in Kafka geht es um die Konvertierung von Daten zwischen ihrem Originalformat und einem Byte-Array zur Übertragung und Speicherung, sodass Produzenten und Verbraucher effizient kommunizieren können.

Serialisierung

Ist der Prozess der Konvertierung eines Objekts oder einer Datenstruktur in einen Bytestrom, damit dieser übertragen oder gespeichert werden kann. Bevor ein Produzent Daten an ein Kafka-Thema sendet, serialisiert er die Daten (Schlüssel und Wert) in Byte-Arrays.

Gemeinsame Serialisierungsformate:

  • JSON: Für Menschen lesbar, weitgehend kompatibel.
  • Avro: Kompakt und effizient, schemabasiert.
  • Protobuf: Kompakt, schemabasiert und sprachunabhängig.
  • String: Einfache textbasierte Serialisierung.
  • Benutzerdefinierte Serialisierung: Für anwendungsspezifische Anforderungen.

Deserialisierung

Ist der umgekehrte Prozess, bei dem ein Bytestrom wieder in sein ursprüngliches Objekt oder seine ursprüngliche Datenstruktur konvertiert wird. Wenn ein Verbraucher Daten aus einem Kafka-Thema liest, deserialisiert er das Byte-Array wieder in ein für die Verarbeitung verwendbares Format.

Kompression

Komprimierung reduziert die Größe von Nachrichten, bevor sie gespeichert oder übertragen werden. Es optimiert die Speichernutzung, reduziert den Netzwerkbandbreitenverbrauch und verbessert die Gesamtleistung durch das Senden kleinerer Nutzlasten zwischen Produzenten, Brokern und Verbrauchern.

Wenn ein Produzent Nachrichten zu einem Kafka-Thema sendet, kann er die Nachricht vor der Übertragung komprimieren. Die komprimierte Nachricht wird unverändert auf Brokern gespeichert und von Verbrauchern dekomprimiert, wenn sie die Nachrichten lesen.

Vorteile

  • Reduzierte Netzwerkbandbreite:Kleinere Nutzlasten bedeuten, dass weniger Daten über das Netzwerk übertragen werden.
  • Geringerer Speicherbedarf:Komprimierte Nachrichten beanspruchen weniger Speicherplatz auf der Festplatte.
  • Verbesserter Durchsatz:Kleinere Nachrichten ermöglichen eine schnellere Datenübertragung und -verarbeitung.

Wann verwenden?

  • Anwendungsfälle mit großen Nachrichtengrößen: Durch die Komprimierung wird die Datengröße erheblich reduziert.
  • Systeme mit hohem Durchsatz: Reduziert die Belastung der Netzwerk- und Speicherressourcen.
  • Batching: Die Komprimierung funktioniert am besten, wenn Ersteller mehrere Nachrichten stapelweise zusammenfassen.

Während die Komprimierung Ressourcen spart, ist es wichtig, den Kompromiss zwischen CPU-Auslastung und Komprimierungsvorteilen auszugleichen und den Codec auszuwählen, der zu Ihrem Anwendungsfall passt.

Unterstützte Komprimierungstypen

  • Keine: Keine Komprimierung (Standard).
  • Gzip:Hohes Komprimierungsverhältnis, aber höhere CPU-Auslastung.
  • Schnell: Ausgewogene Komprimierungsgeschwindigkeit und CPU-Auslastung, geeignet für Echtzeit-Anwendungsfälle.
  • LZ4:Schnellere Komprimierung und Dekomprimierung, optimiert für Systeme mit geringer Latenz.
  • Zstd: Hohe Komprimierungsrate mit besserer Leistung als Gzip, unterstützt in neueren Kafka-Versionen.

Abstimmung

Zur Optimierung der Leistung von Apache Kafka gehört die Feinabstimmung verschiedener Komponenten, um Durchsatz und Latenz effektiv auszubalancieren. Dieser Artikel kratzt nur an der Oberfläche dieses Themas. Hier sind einige Aspekte, die beim Tuning von Kafka berücksichtigt werden sollten:

  • Partitionsverwaltung:

    • Partitionsanzahl: Erhöhen Sie die Anzahl der Partitionen, um Parallelität und Durchsatz zu verbessern. Vermeiden Sie jedoch übermäßig viele Partitionen, um Verwaltungsaufwand zu vermeiden. Passen Sie die Anzahl der Partitionen an Ihre Verbraucherkapazitäten und die gewünschte Verbrauchsrate an.
  • Produzentenkonfiguration:

    • Batching: Konfigurieren Sie batch.size und linger.ms, um eine effiziente Stapelverarbeitung von Nachrichten zu ermöglichen, die Anzahl der Anfragen zu reduzieren und den Durchsatz zu verbessern.
    • Komprimierung: Implementieren Sie eine Komprimierung (z. B. compress.type=snappy), um die Nachrichtengröße zu verringern und so die Netzwerk- und Speichernutzung zu reduzieren. Bedenken Sie den zusätzlichen CPU-Overhead, der durch die Komprimierung entsteht.
  • Verbraucherkonfiguration:

    • Abrufeinstellungen: Passen Sie fetch.min.bytes und fetch.max.wait.ms an, um zu steuern, wie Verbraucher Nachrichten abrufen, und gleichen Sie Latenz und Durchsatz entsprechend den Anforderungen Ihrer Anwendung aus.

Praxisbeispiel

Stellen Sie sich eine Anwendung vor, die die Temperatur in einem Raum aufzeichnet und diese Daten mithilfe von Kafka übermittelt, wo eine andere Anwendung sie verarbeitet. Der Einfachheit halber konzentrieren wir uns ausschließlich auf den Kafka-Aspekt, wobei sowohl der Produzent als auch der Verbraucher in derselben Anwendung implementiert werden. In diesem Szenario stellt jede aufgezeichnete Temperatur zu einem bestimmten Zeitpunkt ein Ereignis dar:

{
  temperature: 42,
  timeStamp: new Date(),
};
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Der gesamte Code befindet sich in diesem Repository.

Zuerst brauchen wir einen Kafka-Broker, aber anstatt Kafka auf unserem Computer zu installieren, verwenden wir einfach dieses Docker-Kafka-Image.

Beginnen Sie mit dem Ziehen dieses Bildes:

Docker Pull Apache/Kafka

Dann führen Sie es aus und ordnen den Port zu, den Kafka auf demselben Port auf unserem Computer abhört:

docker run -d -p 9092:9092 --name Broker Apache/kafka:latest

Das ist es, wir haben einen Kafka-Broker im Einsatz. Bevor Sie fortfahren, möchten Sie vielleicht damit herumspielen, indem Sie Themen erstellen, Nachrichten senden und konsumieren. Befolgen Sie dazu einfach die Anweisungen auf dieser Bildseite.

Um unsere Anwendung zu erstellen, verwenden wir NestJS mit KafkaJS. Beginnen Sie mit der Erstellung der App mit Nest CLI

Nest neues My-Nest-Projekt

Im Projektordner installieren Sie kafkajs

npm i kafkajs

Und generieren Sie die folgenden Module

nest g mo kafka

Nest G MO Producer

Nest G Mo Consumer

Nest-G-Mo-Temperatur

Das Kafka-Modul übernimmt alle Kafka-spezifischen Vorgänge, einschließlich der Verwaltung von Verbraucher- und Produzentenklassen zum Verbinden, Trennen, Senden und Empfangen von Nachrichten. Dies wird das einzige Modul sein, das direkt mit dem kafkajs-Paket interagiert.

Die Produzenten- und Verbrauchermodule fungieren als Schnittstellen zwischen der Pub-Sub-Plattform (in diesem Fall Kafka) und dem Rest der Anwendung und abstrahieren plattformspezifische Details.

Das Temperaturmodul verwaltet die Ereignisse. Es muss nicht wissen, welche Pub-Sub-Plattform verwendet wird, es sind lediglich ein Verbraucher und ein Produzent erforderlich, um zu funktionieren.

Nachdem wir die Module erstellt haben, erstellen wir auch einen Ordner src/interface und fügen darin die folgenden Schnittstellen hinzu:

{
  temperature: 42,
  timeStamp: new Date(),
};
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
Nach dem Login kopieren
Nach dem Login kopieren

Fügen Sie im Ordner src/kafka/ die Producer- und Consumer-Klassen hinzu, die diese Schnittstellen implementieren:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
Nach dem Login kopieren
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

Nach dem Login kopieren

Vergessen Sie nicht, diese Klassen in kafka.module.ts zu exportieren

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}
Nach dem Login kopieren

So wie es jetzt ist, könnten wir zum Temperaturmodul gehen und diese Kafka-Klassen instanziieren und beginnen, sie zu verwenden. Allerdings wäre es besser, wenn sich das Temperaturmodul keine Gedanken darüber machen müsste, welche Pub-Sub-Plattform es verwendet. Stattdessen sollte es einfach mit einem injizierten Produzenten und/oder Konsumenten zusammenarbeiten und sich ausschließlich auf das Senden und Empfangen von Nachrichten konzentrieren, unabhängig von der zugrunde liegenden Plattform. Wenn wir uns in Zukunft dazu entschließen, auf eine andere Pub-Sub-Plattform zu wechseln, müssen wir auf diese Weise keine Änderungen am Temperaturmodul vornehmen.

Um diese Abstraktion zu erreichen, können wir Producer- und Consumer-Klassen erstellen, die die Besonderheiten der Producer- und Consumer-Implementierungen von Kafka verarbeiten. Beginnen wir mit dem Produzenten:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
Nach dem Login kopieren
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}
Nach dem Login kopieren

Nun der Verbraucher:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
Nach dem Login kopieren
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
Nach dem Login kopieren

Jetzt können wir uns auf den Aufbau des Temperaturmoduls konzentrieren. In der Datei „temperature.service.ts“ erstellen wir eine Methode zum Registrieren einer Temperatur, die in diesem Beispiel einfach die Temperaturdaten über einen Produzenten an den Broker sendet. Darüber hinaus implementieren wir zu Demonstrationszwecken eine Methode zur Verarbeitung eingehender Nachrichten.

Diese Methoden können von einem anderen Dienst oder einem Controller aufgerufen werden. Der Einfachheit halber rufen wir sie in diesem Beispiel jedoch direkt beim Start der Anwendung auf, indem wir die Methode onModuleInit verwenden.

{
  temperature: 42,
  timeStamp: new Date(),
};
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
Nach dem Login kopieren
Nach dem Login kopieren

Das ist es! Wenn der Broker im Docker-Container ausgeführt wird, können Sie die Anwendung zum Senden und Empfangen von Nachrichten starten. Darüber hinaus können Sie mit dem folgenden Befehl eine Shell im Broker-Container öffnen:

Docker Exec --workdir /opt/kafka/bin/ -it Broker Sh

Von dort aus können Sie direkt mit dem Broker interagieren und Nachrichten an die Anwendung senden, Nachrichten von ihr empfangen, neue Themen erstellen usw.

Dies ist das Repository mit dem Code dieses Beispiels.

Das obige ist der detaillierte Inhalt vonKafka-Grundlagen mit einem praktischen Beispiel. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage