Rumah > hujung hadapan web > tutorial js > Asas Kafka dengan contoh praktikal

Asas Kafka dengan contoh praktikal

Linda Hamilton
Lepaskan: 2024-12-28 09:26:11
asal
354 orang telah melayarinya

Sejak beberapa minggu kebelakangan ini, saya telah menyelami Kafka dan mengambil nota sepanjang perjalanan, yang mana saya memutuskan untuk mengatur dan menstrukturkan mereka catatan blog, di atasnya, selain daripada konsep dan petua terdapat contoh praktikal yang dibina dengan NestJS dan KafkaJs.

Apa itu Kafka?

Apache Kafka ialah platform penstriman acara teragih yang direka untuk mengendalikan acara masa nyata. Ia membolehkan menyimpan, memproses dan mendapatkan semula strim data berskala besar, pemprosesan tinggi, kependaman rendah, menjadikannya sesuai untuk membina saluran paip data masa nyata dan aplikasi dipacu peristiwa.

Ciri-ciri Utama:

  • Penstriman Acara: Kafka menyusun data ke dalam topik, yang merupakan log acara yang disusun.
  • Seni Bina Teragih: Kafka dibina untuk kebolehskalaan dan toleransi kesalahan. Ia beroperasi sebagai kelompok nod yang dipanggil broker dan boleh mengedarkan data merentas berbilang pelayan.
  • Model Terbit-Langgan: Pengeluar tulis mesej ke topik dan pengguna baca mesej daripada mereka. Kafka menyokong berbilang pengguna, membenarkan aplikasi berbeza memproses aliran data yang sama secara bebas.
  • Prestasi Tinggi: Kafka dioptimumkan untuk pemprosesan tinggi, memproses berjuta-juta mesej sesaat dengan kependaman rendah.
  • Storan Tahan Lama: Kafka menyimpan mesej pada cakera dengan tempoh pengekalan boleh dikonfigurasikan, memastikan kegigihan dan kebolehpercayaan data.
  • Pembahagian dan Replikasi: Topik dibahagikan kepada pembahagian untuk skalabiliti dan direplikasi merentas broker untuk toleransi kesalahan.
  • Kebolehmain semula: Pengguna boleh membaca semula mesej dengan menetapkan semula mengimbangi mereka, membolehkan pemprosesan semula atau pemulihan data.
  • Integrasi dan Ekosistem: Kafka menyepadukan dengan pelbagai sistem dan mempunyai alatan seperti Kafka Connect (untuk penyepaduan data) dan Kafka Streams (untuk pemprosesan strim).

Kelebihan

  • Kebolehpercayaan: Ia memastikan toleransi kesalahan melalui pengedaran data, replikasi dan pembahagian.
  • Skalabiliti: Kafka boleh memproses volum data yang besar dan menskala secara mendatar tanpa masa henti.
  • Ketahanan: Mesej disimpan dengan segera, memastikan daya tahan dan ketekalan data.
  • Prestasi: Kafka mengekalkan prestasi tinggi di bawah beban data yang melampau, mengendalikan volum data yang besar tanpa masa henti atau kehilangan data.

Keburukan

Pertukaran ini adalah pilihan reka bentuk yang disengajakan untuk memaksimumkan prestasi Kafka tetapi mungkin menimbulkan cabaran untuk kes penggunaan yang memerlukan fleksibiliti yang lebih besar:

  • Fleksibiliti Terhad: Kafka tidak mempunyai sokongan untuk pertanyaan lanjutan, seperti menapis data khusus dalam laporan. Pengguna mesti mengendalikan tugas-tugas ini, kerana Kafka mengambil semula mesej dengan mengimbangi mengikut urutan ia diterima.
  • Tidak Direka untuk Storan Jangka Panjang: Kafka cemerlang dalam penstriman data tetapi tidak sesuai untuk menyimpan data sejarah untuk tempoh yang lama. Penduaan data boleh menyebabkan storan mahal untuk set data yang besar.
  • Tiada Sokongan Topik Wildcard: Kafka tidak membenarkan penggunaan daripada berbilang topik menggunakan corak kad liar (cth., log-2024-*).

Kes guna

  • Analitis Masa Nyata: Proses dan analisis aliran data apabila ia berlaku.
  • Sumber Acara: Rekod semua perubahan pada keadaan aplikasi sebagai urutan acara.
  • Pengagregatan Log: Kumpul dan uruskan log daripada sistem yang diedarkan.
  • Saluran Paip Data: Strim data antara sistem dengan pasti dan cekap.
  • Aplikasi IoT: Kendalikan data penderia daripada peranti IoT dalam masa nyata.

Bagaimanakah Kafka berfungsi?

Kafka menyepadukan ciri kedua-dua model pemesejan beratur dan terbitkan-langgan, menawarkan pengguna kelebihan setiap pendekatan.

  • Baris gilir membolehkan pemprosesan data berskala dengan mengagihkan tugas merentas berbilang kejadian pengguna tetapi baris gilir tradisional tidak menyokong berbilang pelanggan.
  • Model terbit-langgan menyokong berbilang pelanggan tetapi tidak boleh mengagihkan tugas di antara berbilang proses pekerja kerana setiap mesej dihantar kepada semua pelanggan.

Kafka menggunakan sistem log terbahagi untuk menggabungkan faedah model beratur dan terbitkan-langganan. Log, yang disusun urutan rekod, dibahagikan kepada partition, dengan setiap partition diperuntukkan kepada pelanggan (pengguna) yang berbeza. Persediaan ini membolehkan berbilang pelanggan berkongsi topik sambil mengekalkan kebolehskalaan.

Kafka fundamentals with a practical example

Acara, Topik dan Pembahagian

Kami telah melihat bahawa Kafka ialah platform yang direka untuk mengendalikan acara masa nyata, sebelum bercakap tentang cara acara tersebut dikendalikan, kami perlu mempunyai definisi untuk mereka:

Peristiwa ialah tindakan, insiden atau perubahan yang direkodkan aplikasi, contohnya, pembayaran, klik tapak web atau bacaan suhu.

Acara dalam Kafka dimodelkan sebagai pasangan kunci/nilai, di mana kedua-dua kunci dan nilai disirikan ke dalam jujukan bait.

  • Nilai selalunya mewakili objek domain bersiri atau input mentah, seperti output penderia atau data aplikasi lain. Mereka merangkum maklumat teras yang dihantar dalam acara Kafka.
  • Kunci boleh menjadi objek domain yang kompleks, namun selalunya jenis mudah seperti rentetan atau integer. Daripada mengenal pasti peristiwa individu secara unik (seperti yang dilakukan oleh kunci utama dalam pangkalan data hubungan), kunci biasanya mengenal pasti entiti dalam sistem, seperti pengguna tertentu, pesanan atau peranti yang disambungkan.

Kafka menganjurkan acara ke dalam log tertib yang dipanggil topik. Apabila sistem luaran menulis acara kepada Kafka, ia dilampirkan pada penghujung topik. Mesej kekal dalam topik untuk tempoh yang boleh dikonfigurasikan, walaupun selepas dibaca. Tidak seperti baris gilir, topik adalah tahan lama, direplikasi dan tahan terhadap kesalahan, menyimpan rekod acara dengan cekap. Walau bagaimanapun, log hanya boleh diimbas secara berurutan, bukan ditanya.

Topik disimpan sebagai fail log pada cakera, bagaimanapun, cakera mempunyai had seperti saiz terhingga dan I/O. Untuk mengatasinya, Kafka membenarkan topik dibahagikan kepada partition, memecahkan satu log kepada berbilang log yang boleh diedarkan merentas pelayan yang berbeza. Pembahagian ini membolehkan Kafka menskala secara mendatar, meningkatkan kapasitinya untuk mengendalikan jumlah acara yang besar dan daya pemprosesan yang tinggi.

Kafka memberikan mesej kepada partition berdasarkan sama ada mereka mempunyai kunci:

  • Tiada Kunci: Mesej diedarkan round-robin merentas semua partition, memastikan pengedaran data sekata tetapi tidak mengekalkan susunan mesej.
  • Dengan Kunci: Pembahagian ditentukan dengan mencincang kekunci, memastikan mesej dengan kunci yang sama sentiasa pergi ke partition yang sama dan mengekalkan susunannya.

Broker

Kafka beroperasi sebagai infrastruktur data teragih menggunakan nod yang dipanggil broker, yang secara kolektif membentuk gugusan Kafka. Broker boleh berjalan pada perkakasan logam kosong, contoh awan, dalam bekas yang diurus oleh Kubernetes, dalam Docker pada komputer riba anda atau di mana-mana proses JVM boleh dijalankan.

Broker memberi tumpuan kepada:

  • Menulis acara baharu pada partition.
  • Menyajikan bacaan daripada partition.
  • Meneplikasi partition merentas broker.

Mereka tidak melakukan pengiraan mesej atau penghalaan topik ke topik, memastikan reka bentuk mereka mudah dan cekap.

Replikasi

Kafka memastikan ketahanan data dan toleransi kesalahan dengan mereplikasi data partition merentas berbilang broker. Salinan utama partition ialah replika pemimpin, manakala salinan tambahan ialah replika pengikut. Data ditulis kepada ketua, yang secara automatik mereplikasi kemas kini kepada pengikut.

Proses replikasi ini memastikan:

  • Keselamatan data, walaupun sekiranya berlaku kegagalan broker atau storan.
  • Failover automatik, di mana replika lain mengambil alih sebagai ketua jika pemimpin semasa gagal.

Pembangun mendapat manfaat daripada jaminan ini tanpa perlu mengurus replikasi secara langsung, kerana Kafka mengendalikannya dengan telus.

Pengeluar

Seorang Pengeluar Kafka ialah aplikasi pelanggan yang menghantar (menerbitkan) data ke topik Kafka. Ia bertanggungjawab untuk mencipta dan menghantar mesej (rekod) ke kelompok Kafka. Pengeluar menentukan topik dan partition di mana mesej akan disimpan berdasarkan konfigurasi mereka dan kehadiran kunci mesej. Pengeluar bertanggungjawab untuk, tetapi tidak terhad kepada:

  • Komposisi Mesej:
    • Setiap mesej terdiri daripada kunci (pilihan), nilai (data sebenar) dan metadata.
    • Kunci menentukan partition untuk mesej, memastikan susunan untuk mesej dengan kunci yang sama.
  • Tugasan Pembahagian:
    • Jika kunci disediakan, pengeluar menggunakan algoritma pencincangan untuk menentukan partition.
    • Tanpa kunci, mesej diedarkan merentasi partition secara round-robin untuk pengimbangan beban.
  • Mampatan:

    Pengeluar boleh memampatkan mesej untuk mengurangkan lebar jalur rangkaian dan penggunaan storan.

Pengguna

Seorang Pengguna Kafka ialah aplikasi pelanggan yang membaca mesej daripada topik Kafka, ia mendapatkan semula mesej daripada partition Kafka mengikut rentak mereka sendiri, membenarkan pemprosesan data masa nyata atau kelompok . Perhatikan bahawa Kafka tidak menolak mesej kepada pengguna, mereka menarik mesej daripada partition Kafka dengan meminta data.

Pengguna juga menjejaki offset yang telah mereka proses. Offset boleh dilakukan secara automatik atau secara manual, memastikan data tidak hilang jika pengguna gagal. Ini membolehkan penggunaan fleksibel, termasuk memainkan semula mesej dengan menetapkan semula offset.

Kumpulan pengguna

Kumpulan pengguna ialah satu set pengguna yang bekerjasama untuk menggunakan data daripada beberapa topik, yang membolehkan pemprosesan diedarkan bagi mesej topik.

Pembahagian topik dibahagikan antara pengguna dalam kumpulan, memastikan setiap mesej diproses oleh hanya seorang pengguna dalam kumpulan. Berbilang kumpulan pengguna boleh menggunakan topik yang sama secara bebas tanpa gangguan.

Apabila pengguna baharu menyertai kumpulan atau pengguna sedia ada gagal, Kafka menetapkan semula sekatan antara pengguna dalam kumpulan untuk memastikan semua sekatan dilindungi.

Serialisasi dan Deserialisasi

Pensirilan dan penyahsirilan dalam Kafka ialah tentang menukar data antara format asalnya dan tatasusunan bait untuk penghantaran dan penyimpanan, membolehkan pengeluar dan pengguna berkomunikasi dengan cekap.

Serialisasi

Adalah proses menukar objek atau struktur data kepada aliran bait supaya ia boleh dihantar atau disimpan. Sebelum pengeluar menghantar data ke topik Kafka, ia menyerikan data (kunci dan nilai) ke dalam tatasusunan bait.

Format Pensirian Biasa:

  • JSON: Boleh dibaca manusia, serasi secara meluas.
  • Avro: Padat dan cekap, berasaskan skema.
  • Protobuf: Padat, berasaskan skema dan agnostik bahasa.
  • Rentetan: Pensirian berasaskan teks ringkas.
  • Pensiri Tersuai: Untuk keperluan khusus aplikasi.

Penyahserialisasian

Merupakan proses terbalik, di mana strim bait ditukar semula kepada objek atau struktur data asalnya. Apabila pengguna membaca data daripada topik Kafka, ia menyahsiri tatasusunan bait kembali ke dalam format yang boleh digunakan untuk pemprosesan.

Mampatan

Mampatan sedang mengurangkan saiz mesej sebelum disimpan atau dihantar. Ia mengoptimumkan penggunaan storan, mengurangkan penggunaan lebar jalur rangkaian dan meningkatkan prestasi keseluruhan dengan menghantar muatan yang lebih kecil antara pengeluar, broker dan pengguna.

Apabila pengeluar menghantar mesej kepada topik Kafka, ia boleh memampatkan mesej sebelum penghantaran. Mesej dimampatkan disimpan pada broker sebagaimana adanya dan dinyahmampatkan oleh pengguna apabila mereka membaca mesej.

Kelebihan

  • Lebar Jalur Rangkaian yang Dikurangkan: Muatan yang lebih kecil bermakna kurang data dihantar melalui rangkaian.
  • Keperluan Storan Rendah: Mesej mampat mengambil lebih sedikit ruang pada cakera.
  • Keupayaan yang Dipertingkatkan: Mesej yang lebih kecil membolehkan pemindahan dan pemprosesan data yang lebih pantas.

Bila nak guna?

  • Kes penggunaan dengan saiz mesej yang besar: Pemampatan sangat mengurangkan saiz data.
  • Sistem keupayaan tinggi: Mengurangkan tekanan pada rangkaian dan sumber storan.
  • Pengumpulan: Pemampatan berfungsi paling baik apabila pengeluar menyusun berbilang mesej bersama-sama.

Walaupun pemampatan menjimatkan sumber, adalah penting untuk mengimbangi pertukaran antara penggunaan CPU dan faedah pemampatan, memilih codec yang sesuai dengan kes penggunaan anda.

Jenis Mampatan Disokong

  • Tiada: Tiada mampatan (lalai).
  • Gzip: Nisbah mampatan tinggi tetapi penggunaan CPU yang lebih tinggi.
  • Snappy: Kelajuan mampatan seimbang dan penggunaan CPU, sesuai untuk kes penggunaan masa nyata.
  • LZ4: Pemampatan dan penyahmampatan yang lebih pantas, dioptimumkan untuk sistem kependaman rendah.
  • Zstd: Nisbah mampatan tinggi dengan prestasi yang lebih baik daripada Gzip, disokong dalam versi Kafka yang lebih baharu.

Penalaan

Mengoptimumkan prestasi Apache Kafka melibatkan penalaan halus pelbagai komponen untuk mengimbangi daya pemprosesan dan kependaman dengan berkesan. Artikel ini hanya menconteng permukaan subjek ini, berikut adalah beberapa aspek yang perlu dipertimbangkan semasa menala Kafka:

  • Pengurusan Pembahagian:

    • Kira Partition: Tambahkan bilangan partition untuk meningkatkan keselarian dan daya pemprosesan. Walau bagaimanapun, elakkan partition yang berlebihan untuk mengelakkan overhed pengurusan. Selaraskan bilangan partition dengan keupayaan pengguna anda dan kadar penggunaan yang diingini.
  • Konfigurasi Pengeluar:

    • Batching: Konfigurasikan batch.size dan linger.ms untuk mendayakan kumpulan mesej yang cekap, mengurangkan bilangan permintaan dan meningkatkan daya pemprosesan.
    • Mampatan: Laksanakan pemampatan (cth., compression.type=snappy) untuk mengecilkan saiz mesej, mengurangkan penggunaan rangkaian dan storan. Berhati-hati dengan overhed CPU tambahan yang diperkenalkan melalui pemampatan.
  • Konfigurasi Pengguna:

    • Tetapan Ambil: Laraskan fetch.min.bytes dan fetch.max.wait.ms untuk mengawal cara pengguna mendapatkan semula mesej, mengimbangi kependaman dan pemprosesan mengikut keperluan aplikasi anda.

Contoh praktikal

Bayangkan aplikasi yang merekodkan suhu dalam bilik dan menghantar data ini menggunakan Kafka, di mana aplikasi lain memprosesnya. Untuk kesederhanaan, kami akan memfokuskan secara eksklusif pada aspek Kafka, dengan kedua-dua pengeluar dan pengguna dilaksanakan dalam aplikasi yang sama. Dalam senario ini, setiap suhu yang direkodkan pada masa tertentu mewakili peristiwa:

{
  temperature: 42,
  timeStamp: new Date(),
};
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk

Semua kod akan berada dalam repositori ini.

Mula-mula, kami memerlukan broker Kafka, tetapi bukannya memasang Kafka dalam mesin kami, kami hanya menggunakan imej Docker Kafka ini.

Mulakan dengan menarik imej itu:

docker pull apache/kafka

Kemudian jalankan ia memetakan port yang Kafka dengar pada port yang sama pada mesin kami:

larian buruh pelabuhan -d -p 9092:9092 --nama broker apache/kafka:terkini

Itu sahaja, kami mempunyai broker Kafka yang sedang beroperasi, sebelum meneruskan, anda mungkin mahu bermain-main dengannya dengan mencipta topik, menghantar dan menggunakan mesej, untuk melakukannya hanya ikut arahan pada halaman imej tersebut.

Untuk membina aplikasi kami, kami akan menggunakan NestJS dengan KafkaJS, mulakan dengan mencipta aplikasi dengan Nest CLI

projek sarang saya baharu

Di dalam folder projek pasang kafkajs

npm dan kafkaj

Dan jana modul berikut

sarang g mo kafka

pengeluar sarang g mo

pengguna sarang g mo

suhu sarang g mo

Modul Kafka akan mengendalikan semua operasi khusus Kafka, termasuk mengurus kelas pengguna dan pengeluar untuk menyambung, memutuskan sambungan, menghantar dan menerima mesej. Ini akan menjadi satu-satunya modul yang berinteraksi secara langsung dengan pakej kafkajs.

Modul Pengeluar dan Pengguna akan bertindak sebagai antara muka antara platform pub-sub (Kafka, dalam kes ini) dan aplikasi yang lain, mengabstrakkan butiran khusus platform.

Modul suhu akan menguruskan acara. Ia tidak perlu mengetahui platform pub-sub yang sedang digunakan, ia hanya memerlukan pengguna dan pengeluar untuk berfungsi.

Dengan modul yang dicipta, mari buat folder src/antara muka dan tambahkan antara muka berikut di dalamnya:

{
  temperature: 42,
  timeStamp: new Date(),
};
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
Salin selepas log masuk
Salin selepas log masuk

Dalam folder src/kafka/ tambahkan kelas pengeluar dan pengguna yang melaksanakan antara muka tersebut:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
Salin selepas log masuk
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

Salin selepas log masuk

Jangan lupa untuk mengeksport kelas ini dalam kafka.module.ts

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}
Salin selepas log masuk

Seperti sekarang kita boleh pergi ke modul suhu dan membuat contoh kelas Kafka tersebut dan mula menggunakannya. Walau bagaimanapun, adalah lebih baik jika modul suhu tidak perlu bimbang tentang platform pub-sub yang digunakannya. Sebaliknya, ia hanya perlu bekerja dengan pengeluar dan/atau pengguna yang disuntik, memfokuskan semata-mata pada penghantaran dan penerimaan mesej, tanpa mengira platform asas. Dengan cara ini, jika kami memutuskan untuk beralih kepada platform pub-sub yang berbeza pada masa hadapan, kami tidak perlu membuat sebarang perubahan pada modul suhu.

Untuk mencapai abstraksi ini, kami boleh mencipta kelas Pengeluar dan Pengguna yang mengendalikan spesifikasi pelaksanaan Pengeluar dan Pengguna Kafka. Mari mulakan dengan Penerbit:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
Salin selepas log masuk
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}
Salin selepas log masuk

Kini, Pengguna:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
Salin selepas log masuk
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
Salin selepas log masuk

Kini, kita boleh menumpukan pada membina modul suhu. Dalam fail temperature.service.ts, kami akan mencipta kaedah untuk mendaftarkan suhu, yang dalam contoh ini hanya akan menghantar data suhu kepada broker menggunakan pengeluar. Selain itu, kami akan melaksanakan kaedah untuk mengendalikan mesej masuk untuk tujuan demonstrasi.

Kaedah ini boleh digunakan oleh perkhidmatan lain atau pengawal. Walau bagaimanapun, untuk kesederhanaan, dalam contoh ini, kami akan memanggilnya terus apabila aplikasi bermula, menggunakan kaedah onModuleInit.

{
  temperature: 42,
  timeStamp: new Date(),
};
Salin selepas log masuk
Salin selepas log masuk
Salin selepas log masuk
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
Salin selepas log masuk
Salin selepas log masuk

Itu sahaja! Dengan broker berjalan dalam bekas Docker, anda boleh memulakan aplikasi untuk menghantar dan menerima mesej. Selain itu, anda boleh membuka cangkerang di dalam bekas broker menggunakan arahan berikut:

docker exec --workdir /opt/kafka/bin/ -it broker sh

Dari situ, anda boleh berinteraksi dengan broker secara langsung dan menghantar mesej kepada aplikasi, menerima mesej daripadanya, mencipta topik baharu, dsb.

Ini ialah repositori dengan kod contoh ini.

Atas ialah kandungan terperinci Asas Kafka dengan contoh praktikal. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

sumber:dev.to
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Artikel terbaru oleh pengarang
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan