Sejak beberapa minggu kebelakangan ini, saya telah menyelami Kafka dan mengambil nota sepanjang perjalanan, yang mana saya memutuskan untuk mengatur dan menstrukturkan mereka catatan blog, di atasnya, selain daripada konsep dan petua terdapat contoh praktikal yang dibina dengan NestJS dan KafkaJs.
Apache Kafka ialah platform penstriman acara teragih yang direka untuk mengendalikan acara masa nyata. Ia membolehkan menyimpan, memproses dan mendapatkan semula strim data berskala besar, pemprosesan tinggi, kependaman rendah, menjadikannya sesuai untuk membina saluran paip data masa nyata dan aplikasi dipacu peristiwa.
Pertukaran ini adalah pilihan reka bentuk yang disengajakan untuk memaksimumkan prestasi Kafka tetapi mungkin menimbulkan cabaran untuk kes penggunaan yang memerlukan fleksibiliti yang lebih besar:
Kafka menyepadukan ciri kedua-dua model pemesejan beratur dan terbitkan-langgan, menawarkan pengguna kelebihan setiap pendekatan.
Kafka menggunakan sistem log terbahagi untuk menggabungkan faedah model beratur dan terbitkan-langganan. Log, yang disusun urutan rekod, dibahagikan kepada partition, dengan setiap partition diperuntukkan kepada pelanggan (pengguna) yang berbeza. Persediaan ini membolehkan berbilang pelanggan berkongsi topik sambil mengekalkan kebolehskalaan.
Kami telah melihat bahawa Kafka ialah platform yang direka untuk mengendalikan acara masa nyata, sebelum bercakap tentang cara acara tersebut dikendalikan, kami perlu mempunyai definisi untuk mereka:
Peristiwa ialah tindakan, insiden atau perubahan yang direkodkan aplikasi, contohnya, pembayaran, klik tapak web atau bacaan suhu.
Acara dalam Kafka dimodelkan sebagai pasangan kunci/nilai, di mana kedua-dua kunci dan nilai disirikan ke dalam jujukan bait.
Kafka menganjurkan acara ke dalam log tertib yang dipanggil topik. Apabila sistem luaran menulis acara kepada Kafka, ia dilampirkan pada penghujung topik. Mesej kekal dalam topik untuk tempoh yang boleh dikonfigurasikan, walaupun selepas dibaca. Tidak seperti baris gilir, topik adalah tahan lama, direplikasi dan tahan terhadap kesalahan, menyimpan rekod acara dengan cekap. Walau bagaimanapun, log hanya boleh diimbas secara berurutan, bukan ditanya.
Topik disimpan sebagai fail log pada cakera, bagaimanapun, cakera mempunyai had seperti saiz terhingga dan I/O. Untuk mengatasinya, Kafka membenarkan topik dibahagikan kepada partition, memecahkan satu log kepada berbilang log yang boleh diedarkan merentas pelayan yang berbeza. Pembahagian ini membolehkan Kafka menskala secara mendatar, meningkatkan kapasitinya untuk mengendalikan jumlah acara yang besar dan daya pemprosesan yang tinggi.
Kafka memberikan mesej kepada partition berdasarkan sama ada mereka mempunyai kunci:
Kafka beroperasi sebagai infrastruktur data teragih menggunakan nod yang dipanggil broker, yang secara kolektif membentuk gugusan Kafka. Broker boleh berjalan pada perkakasan logam kosong, contoh awan, dalam bekas yang diurus oleh Kubernetes, dalam Docker pada komputer riba anda atau di mana-mana proses JVM boleh dijalankan.
Broker memberi tumpuan kepada:
Mereka tidak melakukan pengiraan mesej atau penghalaan topik ke topik, memastikan reka bentuk mereka mudah dan cekap.
Kafka memastikan ketahanan data dan toleransi kesalahan dengan mereplikasi data partition merentas berbilang broker. Salinan utama partition ialah replika pemimpin, manakala salinan tambahan ialah replika pengikut. Data ditulis kepada ketua, yang secara automatik mereplikasi kemas kini kepada pengikut.
Proses replikasi ini memastikan:
Pembangun mendapat manfaat daripada jaminan ini tanpa perlu mengurus replikasi secara langsung, kerana Kafka mengendalikannya dengan telus.
Seorang Pengeluar Kafka ialah aplikasi pelanggan yang menghantar (menerbitkan) data ke topik Kafka. Ia bertanggungjawab untuk mencipta dan menghantar mesej (rekod) ke kelompok Kafka. Pengeluar menentukan topik dan partition di mana mesej akan disimpan berdasarkan konfigurasi mereka dan kehadiran kunci mesej. Pengeluar bertanggungjawab untuk, tetapi tidak terhad kepada:
Mampatan:
Pengeluar boleh memampatkan mesej untuk mengurangkan lebar jalur rangkaian dan penggunaan storan.
Seorang Pengguna Kafka ialah aplikasi pelanggan yang membaca mesej daripada topik Kafka, ia mendapatkan semula mesej daripada partition Kafka mengikut rentak mereka sendiri, membenarkan pemprosesan data masa nyata atau kelompok . Perhatikan bahawa Kafka tidak menolak mesej kepada pengguna, mereka menarik mesej daripada partition Kafka dengan meminta data.
Pengguna juga menjejaki offset yang telah mereka proses. Offset boleh dilakukan secara automatik atau secara manual, memastikan data tidak hilang jika pengguna gagal. Ini membolehkan penggunaan fleksibel, termasuk memainkan semula mesej dengan menetapkan semula offset.
Kumpulan pengguna ialah satu set pengguna yang bekerjasama untuk menggunakan data daripada beberapa topik, yang membolehkan pemprosesan diedarkan bagi mesej topik.
Pembahagian topik dibahagikan antara pengguna dalam kumpulan, memastikan setiap mesej diproses oleh hanya seorang pengguna dalam kumpulan. Berbilang kumpulan pengguna boleh menggunakan topik yang sama secara bebas tanpa gangguan.
Apabila pengguna baharu menyertai kumpulan atau pengguna sedia ada gagal, Kafka menetapkan semula sekatan antara pengguna dalam kumpulan untuk memastikan semua sekatan dilindungi.
Pensirilan dan penyahsirilan dalam Kafka ialah tentang menukar data antara format asalnya dan tatasusunan bait untuk penghantaran dan penyimpanan, membolehkan pengeluar dan pengguna berkomunikasi dengan cekap.
Adalah proses menukar objek atau struktur data kepada aliran bait supaya ia boleh dihantar atau disimpan. Sebelum pengeluar menghantar data ke topik Kafka, ia menyerikan data (kunci dan nilai) ke dalam tatasusunan bait.
Format Pensirian Biasa:
Merupakan proses terbalik, di mana strim bait ditukar semula kepada objek atau struktur data asalnya. Apabila pengguna membaca data daripada topik Kafka, ia menyahsiri tatasusunan bait kembali ke dalam format yang boleh digunakan untuk pemprosesan.
Mampatan sedang mengurangkan saiz mesej sebelum disimpan atau dihantar. Ia mengoptimumkan penggunaan storan, mengurangkan penggunaan lebar jalur rangkaian dan meningkatkan prestasi keseluruhan dengan menghantar muatan yang lebih kecil antara pengeluar, broker dan pengguna.
Apabila pengeluar menghantar mesej kepada topik Kafka, ia boleh memampatkan mesej sebelum penghantaran. Mesej dimampatkan disimpan pada broker sebagaimana adanya dan dinyahmampatkan oleh pengguna apabila mereka membaca mesej.
Walaupun pemampatan menjimatkan sumber, adalah penting untuk mengimbangi pertukaran antara penggunaan CPU dan faedah pemampatan, memilih codec yang sesuai dengan kes penggunaan anda.
Mengoptimumkan prestasi Apache Kafka melibatkan penalaan halus pelbagai komponen untuk mengimbangi daya pemprosesan dan kependaman dengan berkesan. Artikel ini hanya menconteng permukaan subjek ini, berikut adalah beberapa aspek yang perlu dipertimbangkan semasa menala Kafka:
Pengurusan Pembahagian:
Konfigurasi Pengeluar:
Konfigurasi Pengguna:
Bayangkan aplikasi yang merekodkan suhu dalam bilik dan menghantar data ini menggunakan Kafka, di mana aplikasi lain memprosesnya. Untuk kesederhanaan, kami akan memfokuskan secara eksklusif pada aspek Kafka, dengan kedua-dua pengeluar dan pengguna dilaksanakan dalam aplikasi yang sama. Dalam senario ini, setiap suhu yang direkodkan pada masa tertentu mewakili peristiwa:
{ temperature: 42, timeStamp: new Date(), };
Semua kod akan berada dalam repositori ini.
Mula-mula, kami memerlukan broker Kafka, tetapi bukannya memasang Kafka dalam mesin kami, kami hanya menggunakan imej Docker Kafka ini.
Mulakan dengan menarik imej itu:
docker pull apache/kafka
Kemudian jalankan ia memetakan port yang Kafka dengar pada port yang sama pada mesin kami:
larian buruh pelabuhan -d -p 9092:9092 --nama broker apache/kafka:terkini
Itu sahaja, kami mempunyai broker Kafka yang sedang beroperasi, sebelum meneruskan, anda mungkin mahu bermain-main dengannya dengan mencipta topik, menghantar dan menggunakan mesej, untuk melakukannya hanya ikut arahan pada halaman imej tersebut.
Untuk membina aplikasi kami, kami akan menggunakan NestJS dengan KafkaJS, mulakan dengan mencipta aplikasi dengan Nest CLI
projek sarang saya baharu
Di dalam folder projek pasang kafkajs
npm dan kafkaj
Dan jana modul berikut
sarang g mo kafka
pengeluar sarang g mo
pengguna sarang g mo
suhu sarang g mo
Modul Kafka akan mengendalikan semua operasi khusus Kafka, termasuk mengurus kelas pengguna dan pengeluar untuk menyambung, memutuskan sambungan, menghantar dan menerima mesej. Ini akan menjadi satu-satunya modul yang berinteraksi secara langsung dengan pakej kafkajs.
Modul Pengeluar dan Pengguna akan bertindak sebagai antara muka antara platform pub-sub (Kafka, dalam kes ini) dan aplikasi yang lain, mengabstrakkan butiran khusus platform.
Modul suhu akan menguruskan acara. Ia tidak perlu mengetahui platform pub-sub yang sedang digunakan, ia hanya memerlukan pengguna dan pengeluar untuk berfungsi.
Dengan modul yang dicipta, mari buat folder src/antara muka dan tambahkan antara muka berikut di dalamnya:
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
Dalam folder src/kafka/ tambahkan kelas pengeluar dan pengguna yang melaksanakan antara muka tersebut:
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
Jangan lupa untuk mengeksport kelas ini dalam kafka.module.ts
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
Seperti sekarang kita boleh pergi ke modul suhu dan membuat contoh kelas Kafka tersebut dan mula menggunakannya. Walau bagaimanapun, adalah lebih baik jika modul suhu tidak perlu bimbang tentang platform pub-sub yang digunakannya. Sebaliknya, ia hanya perlu bekerja dengan pengeluar dan/atau pengguna yang disuntik, memfokuskan semata-mata pada penghantaran dan penerimaan mesej, tanpa mengira platform asas. Dengan cara ini, jika kami memutuskan untuk beralih kepada platform pub-sub yang berbeza pada masa hadapan, kami tidak perlu membuat sebarang perubahan pada modul suhu.
Untuk mencapai abstraksi ini, kami boleh mencipta kelas Pengeluar dan Pengguna yang mengendalikan spesifikasi pelaksanaan Pengeluar dan Pengguna Kafka. Mari mulakan dengan Penerbit:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
Kini, Pengguna:
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
Kini, kita boleh menumpukan pada membina modul suhu. Dalam fail temperature.service.ts, kami akan mencipta kaedah untuk mendaftarkan suhu, yang dalam contoh ini hanya akan menghantar data suhu kepada broker menggunakan pengeluar. Selain itu, kami akan melaksanakan kaedah untuk mengendalikan mesej masuk untuk tujuan demonstrasi.
Kaedah ini boleh digunakan oleh perkhidmatan lain atau pengawal. Walau bagaimanapun, untuk kesederhanaan, dalam contoh ini, kami akan memanggilnya terus apabila aplikasi bermula, menggunakan kaedah onModuleInit.
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
Itu sahaja! Dengan broker berjalan dalam bekas Docker, anda boleh memulakan aplikasi untuk menghantar dan menerima mesej. Selain itu, anda boleh membuka cangkerang di dalam bekas broker menggunakan arahan berikut:
docker exec --workdir /opt/kafka/bin/ -it broker sh
Dari situ, anda boleh berinteraksi dengan broker secara langsung dan menghantar mesej kepada aplikasi, menerima mesej daripadanya, mencipta topik baharu, dsb.
Ini ialah repositori dengan kod contoh ini.
Atas ialah kandungan terperinci Asas Kafka dengan contoh praktikal. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!