In den letzten Wochen habe ich mich intensiv mit Kafka beschäftigt und mir nebenbei Notizen gemacht. Ich habe beschlossen, daraus einen Blog-Beitrag zu organisieren und zu strukturieren, in dem neben Konzepten und Tipps auch ein praktisches Beispiel enthalten ist NestJS und KafkaJs.
Apache Kafka ist eine verteilte Event-Streaming-Plattform, die für die Verarbeitung von Echtzeitereignissen entwickelt wurde. Es ermöglicht das Speichern, Verarbeiten und Abrufen großer Datenströme mit hohem Durchsatz und geringer Latenz und eignet sich daher für den Aufbau von Echtzeit-Datenpipelines und ereignisgesteuerten Anwendungen.
Diese Kompromisse sind bewusste Designentscheidungen, um die Leistung von Kafka zu maximieren, können jedoch bei Anwendungsfällen, die eine größere Flexibilität erfordern, eine Herausforderung darstellen:
Kafka integriert die Funktionen von Warteschlangen- und Publish-Subscribe-Messaging-Modellen und bietet Verbrauchern die Vorteile beider Ansätze.
Kafka verwendet ein partitioniertes Protokollsystem, um die Vorteile von Warteschlangen- und Publish-Subscribe-Modellen zu kombinieren. Protokolle, bei denen es sich um geordnete Sequenzen von Datensätzen handelt, werden in Partitionen unterteilt, wobei jede Partition verschiedenen Abonnenten (Konsumenten) zugewiesen wird. Dieses Setup ermöglicht es mehreren Abonnenten, ein Thema zu teilen und gleichzeitig die Skalierbarkeit beizubehalten.
Wir haben gesehen, dass Kafka eine Plattform ist, die für die Verarbeitung von Echtzeitereignissen entwickelt wurde. Bevor wir darüber sprechen, wie mit diesen Ereignissen umgegangen wird, müssen wir eine Definition dafür haben:
Ein Ereignis ist eine Aktion, ein Vorfall oder eine Änderung, die in Anwendungen aufgezeichnet wird, zum Beispiel eine Zahlung, ein Website-Klick oder eine Temperaturmessung.
Ereignisse werden in Kafka als Schlüssel/Wert-Paare modelliert, wobei sowohl Schlüssel als auch Werte in Bytesequenzen serialisiert werden.
Kafka organisiert Ereignisse in geordneten Protokollen, die als Themen bezeichnet werden. Wenn ein externes System ein Ereignis in Kafka schreibt, wird es an das Ende eines Themas angehängt. Nachrichten bleiben für eine konfigurierbare Dauer in den Themen, auch nachdem sie gelesen wurden. Im Gegensatz zu Warteschlangen sind Themen dauerhaft, repliziert und fehlertolerant und speichern Ereignisdatensätze effizient. Protokolle können jedoch nur sequentiell gescannt und nicht abgefragt werden.
Themen werden als Protokolldateien auf der Festplatte gespeichert. Festplatten unterliegen jedoch Einschränkungen wie begrenzter Größe und E/A. Um dieses Problem zu lösen, ermöglicht Kafka die Unterteilung von Themen in Partitionen, wodurch ein einzelnes Protokoll in mehrere Protokolle aufgeteilt wird, die auf verschiedene Server verteilt werden können. Diese Partitionierung ermöglicht Kafka eine horizontale Skalierung und verbessert so seine Fähigkeit, große Mengen an Ereignissen und einen hohen Durchsatz zu verarbeiten.
Kafka weist Partitionen Nachrichten basierend darauf zu, ob sie einen Schlüssel haben:
Kafka arbeitet als verteilte Dateninfrastruktur unter Verwendung von Knoten, die als Broker bezeichnet werden und zusammen einen Kafka-Cluster bilden. Broker können auf Bare-Metal-Hardware, einer Cloud-Instanz, in einem von Kubernetes verwalteten Container, in Docker auf Ihrem Laptop oder überall dort ausgeführt werden, wo JVM-Prozesse ausgeführt werden können.
Makler konzentrieren sich auf:
Sie führen keine Nachrichtenberechnung oder Weiterleitung von Thema zu Thema durch, wodurch ihr Design einfach und effizient bleibt.
Kafka gewährleistet Datenhaltbarkeit und Fehlertoleranz durch die Replikation von Partitionsdaten über mehrere Broker hinweg. Die primäre Kopie einer Partition ist das Leader-Replikat, während zusätzliche Kopien Follower-Replikate sind. Die Daten werden an den Leader geschrieben, der Aktualisierungen automatisch an die Follower repliziert.
Dieser Replikationsprozess gewährleistet:
Entwickler profitieren von diesen Garantien, ohne die Replikation direkt verwalten zu müssen, da Kafka sie transparent abwickelt.
Ein Kafka-Produzent ist eine Client-Anwendung, die Daten an Kafka-Themen sendet (veröffentlicht). Es ist für die Erstellung und Übermittlung von Nachrichten (Aufzeichnungen) an den Kafka-Cluster verantwortlich. Produzenten bestimmen das Thema und die Partition, in der Nachrichten basierend auf ihrer Konfiguration und dem Vorhandensein eines Nachrichtenschlüssels gespeichert werden. Die Hersteller sind verantwortlich für, aber nicht beschränkt auf:
Komprimierung:
Produzenten können Nachrichten komprimieren, um die Netzwerkbandbreite und den Speicherverbrauch zu reduzieren.
Ein Kafka-Consumer ist eine Client-Anwendung, die Nachrichten aus Kafka-Themen liest. Sie ruft Nachrichten in ihrem eigenen Tempo von Kafka-Partitionen ab und ermöglicht so eine Echtzeit- oder Stapelverarbeitung von Daten . Beachten Sie, dass Kafka keine Nachrichten an Verbraucher sendet, sondern Nachrichten von Kafka-Partitionen abruft, indem es die Daten anfordert.
Verbraucher behalten auch den Überblick über die Verrechnungen, die sie verarbeitet haben. Offsets können automatisch oder manuell festgeschrieben werden, um sicherzustellen, dass keine Daten verloren gehen, wenn ein Verbraucher ausfällt. Dies ermöglicht eine flexible Nutzung, einschließlich der Wiedergabe von Nachrichten durch Zurücksetzen des Offsets.
Eine Verbrauchergruppe ist eine Gruppe von Verbrauchern, die zusammenarbeiten, um Daten aus bestimmten Themen zu konsumieren, was eine verteilte Verarbeitung der Nachrichten eines Themas ermöglicht.
Partitionen eines Themas werden auf die Verbraucher in der Gruppe aufgeteilt, um sicherzustellen, dass jede Nachricht nur von einem Verbraucher innerhalb der Gruppe verarbeitet wird. Mehrere Verbrauchergruppen können unabhängig voneinander und ohne Interferenzen dasselbe Thema konsumieren.
Wenn ein neuer Verbraucher einer Gruppe beitritt oder ein bestehender Verbraucher ausfällt, weist Kafka Partitionen unter den Verbrauchern in der Gruppe neu zu, um sicherzustellen, dass alle Partitionen abgedeckt sind.
Bei der Serialisierung und Deserialisierung in Kafka geht es um die Konvertierung von Daten zwischen ihrem Originalformat und einem Byte-Array zur Übertragung und Speicherung, sodass Produzenten und Verbraucher effizient kommunizieren können.
Ist der Prozess der Konvertierung eines Objekts oder einer Datenstruktur in einen Bytestrom, damit dieser übertragen oder gespeichert werden kann. Bevor ein Produzent Daten an ein Kafka-Thema sendet, serialisiert er die Daten (Schlüssel und Wert) in Byte-Arrays.
Gemeinsame Serialisierungsformate:
Ist der umgekehrte Prozess, bei dem ein Bytestrom wieder in sein ursprüngliches Objekt oder seine ursprüngliche Datenstruktur konvertiert wird. Wenn ein Verbraucher Daten aus einem Kafka-Thema liest, deserialisiert er das Byte-Array wieder in ein für die Verarbeitung verwendbares Format.
Komprimierung reduziert die Größe von Nachrichten, bevor sie gespeichert oder übertragen werden. Es optimiert die Speichernutzung, reduziert den Netzwerkbandbreitenverbrauch und verbessert die Gesamtleistung durch das Senden kleinerer Nutzlasten zwischen Produzenten, Brokern und Verbrauchern.
Wenn ein Produzent Nachrichten zu einem Kafka-Thema sendet, kann er die Nachricht vor der Übertragung komprimieren. Die komprimierte Nachricht wird unverändert auf Brokern gespeichert und von Verbrauchern dekomprimiert, wenn sie die Nachrichten lesen.
Während die Komprimierung Ressourcen spart, ist es wichtig, den Kompromiss zwischen CPU-Auslastung und Komprimierungsvorteilen auszugleichen und den Codec auszuwählen, der zu Ihrem Anwendungsfall passt.
Zur Optimierung der Leistung von Apache Kafka gehört die Feinabstimmung verschiedener Komponenten, um Durchsatz und Latenz effektiv auszubalancieren. Dieser Artikel kratzt nur an der Oberfläche dieses Themas. Hier sind einige Aspekte, die beim Tuning von Kafka berücksichtigt werden sollten:
Partitionsverwaltung:
Produzentenkonfiguration:
Verbraucherkonfiguration:
Stellen Sie sich eine Anwendung vor, die die Temperatur in einem Raum aufzeichnet und diese Daten mithilfe von Kafka übermittelt, wo eine andere Anwendung sie verarbeitet. Der Einfachheit halber konzentrieren wir uns ausschließlich auf den Kafka-Aspekt, wobei sowohl der Produzent als auch der Verbraucher in derselben Anwendung implementiert werden. In diesem Szenario stellt jede aufgezeichnete Temperatur zu einem bestimmten Zeitpunkt ein Ereignis dar:
{ temperature: 42, timeStamp: new Date(), };
Der gesamte Code befindet sich in diesem Repository.
Zuerst brauchen wir einen Kafka-Broker, aber anstatt Kafka auf unserem Computer zu installieren, verwenden wir einfach dieses Docker-Kafka-Image.
Beginnen Sie mit dem Ziehen dieses Bildes:
Docker Pull Apache/Kafka
Dann führen Sie es aus und ordnen den Port zu, den Kafka auf demselben Port auf unserem Computer abhört:
docker run -d -p 9092:9092 --name Broker Apache/kafka:latest
Das ist es, wir haben einen Kafka-Broker im Einsatz. Bevor Sie fortfahren, möchten Sie vielleicht damit herumspielen, indem Sie Themen erstellen, Nachrichten senden und konsumieren. Befolgen Sie dazu einfach die Anweisungen auf dieser Bildseite.
Um unsere Anwendung zu erstellen, verwenden wir NestJS mit KafkaJS. Beginnen Sie mit der Erstellung der App mit Nest CLI
Nest neues My-Nest-Projekt
Im Projektordner installieren Sie kafkajs
npm i kafkajs
Und generieren Sie die folgenden Module
nest g mo kafka
Nest G MO Producer
Nest G Mo Consumer
Nest-G-Mo-Temperatur
Das Kafka-Modul übernimmt alle Kafka-spezifischen Vorgänge, einschließlich der Verwaltung von Verbraucher- und Produzentenklassen zum Verbinden, Trennen, Senden und Empfangen von Nachrichten. Dies wird das einzige Modul sein, das direkt mit dem kafkajs-Paket interagiert.
Die Produzenten- und Verbrauchermodule fungieren als Schnittstellen zwischen der Pub-Sub-Plattform (in diesem Fall Kafka) und dem Rest der Anwendung und abstrahieren plattformspezifische Details.
Das Temperaturmodul verwaltet die Ereignisse. Es muss nicht wissen, welche Pub-Sub-Plattform verwendet wird, es sind lediglich ein Verbraucher und ein Produzent erforderlich, um zu funktionieren.
Nachdem wir die Module erstellt haben, erstellen wir auch einen Ordner src/interface und fügen darin die folgenden Schnittstellen hinzu:
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
Fügen Sie im Ordner src/kafka/ die Producer- und Consumer-Klassen hinzu, die diese Schnittstellen implementieren:
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
Vergessen Sie nicht, diese Klassen in kafka.module.ts zu exportieren
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
So wie es jetzt ist, könnten wir zum Temperaturmodul gehen und diese Kafka-Klassen instanziieren und beginnen, sie zu verwenden. Allerdings wäre es besser, wenn sich das Temperaturmodul keine Gedanken darüber machen müsste, welche Pub-Sub-Plattform es verwendet. Stattdessen sollte es einfach mit einem injizierten Produzenten und/oder Konsumenten zusammenarbeiten und sich ausschließlich auf das Senden und Empfangen von Nachrichten konzentrieren, unabhängig von der zugrunde liegenden Plattform. Wenn wir uns in Zukunft dazu entschließen, auf eine andere Pub-Sub-Plattform zu wechseln, müssen wir auf diese Weise keine Änderungen am Temperaturmodul vornehmen.
Um diese Abstraktion zu erreichen, können wir Producer- und Consumer-Klassen erstellen, die die Besonderheiten der Producer- und Consumer-Implementierungen von Kafka verarbeiten. Beginnen wir mit dem Produzenten:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
Nun der Verbraucher:
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
Jetzt können wir uns auf den Aufbau des Temperaturmoduls konzentrieren. In der Datei „temperature.service.ts“ erstellen wir eine Methode zum Registrieren einer Temperatur, die in diesem Beispiel einfach die Temperaturdaten über einen Produzenten an den Broker sendet. Darüber hinaus implementieren wir zu Demonstrationszwecken eine Methode zur Verarbeitung eingehender Nachrichten.
Diese Methoden können von einem anderen Dienst oder einem Controller aufgerufen werden. Der Einfachheit halber rufen wir sie in diesem Beispiel jedoch direkt beim Start der Anwendung auf, indem wir die Methode onModuleInit verwenden.
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
Das ist es! Wenn der Broker im Docker-Container ausgeführt wird, können Sie die Anwendung zum Senden und Empfangen von Nachrichten starten. Darüber hinaus können Sie mit dem folgenden Befehl eine Shell im Broker-Container öffnen:
Docker Exec --workdir /opt/kafka/bin/ -it Broker Sh
Von dort aus können Sie direkt mit dem Broker interagieren und Nachrichten an die Anwendung senden, Nachrichten von ihr empfangen, neue Themen erstellen usw.
Dies ist das Repository mit dem Code dieses Beispiels.
Das obige ist der detaillierte Inhalt vonKafka-Grundlagen mit einem praktischen Beispiel. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!