ここ数週間、私は Kafka を詳しく調べ、途中でメモを取ってきました。それを整理してブログ投稿として構成することにしました。概念やヒントとは別に、Kafka で構築された実践的な例があります。 NestJS と KafkaJs。
Apache Kafka は、リアルタイム イベントを処理するように設計された分散イベント ストリーミング プラットフォームです。大規模で高スループット、低遅延のデータ ストリームの保存、処理、取得が可能となり、リアルタイム データ パイプラインやイベント駆動型アプリケーションの構築に適しています。
これらのトレードオフは、Kafka のパフォーマンスを最大化するための意図的な設計上の選択ですが、より高い柔軟性が必要なユースケースでは課題が生じる可能性があります。
Kafka は、キューイング メッセージング モデルとパブリッシュ/サブスクライブ メッセージング モデルの両方の機能を統合し、消費者にそれぞれのアプローチの利点を提供します。
Kafka はパーティション化されたログ システムを採用して、キューイングとパブリッシュ/サブスクライブ モデルの利点を組み合わせています。順序付けられた一連のレコードであるログはパーティションに分割され、各パーティションが異なるサブスクライバ (コンシューマ) に割り当てられます。この設定により、複数のサブスクライバがスケーラビリティを維持しながらトピックを共有できるようになります。
Kafka がリアルタイム イベントを処理するように設計されたプラットフォームであることは見てきましたが、これらのイベントがどのように処理されるかを説明する前に、それらのイベントを定義する必要があります。
イベントとは、支払い、Web サイトのクリック、温度測定など、アプリケーションに記録されたアクション、インシデント、または変更です。
Kafka のイベント は、キー/値 ペアとしてモデル化され、キーと値の両方がバイト シーケンスにシリアル化されます。
Kafka は、イベントを トピック と呼ばれる 順序付けられたログ に整理します。外部システムが Kafka にイベントを書き込むと、イベントはトピックの末尾に追加されます。メッセージは、読み取られた後でも、設定可能な期間トピック内に残ります。キューとは異なり、トピックは耐久性、複製性、耐障害性に優れており、イベント レコードを効率的に保存します。ただし、ログは順次スキャンのみ可能であり、クエリは実行できません。
トピックはログ ファイルとしてディスクに保存されますが、ディスクには有限のサイズや I/O などの制限があります。これを克服するために、Kafka ではトピックを パーティション に分割し、単一のログを複数のログに分割し、異なるサーバーに分散できるようにします。このパーティショニングにより、Kafka は水平方向に拡張できるようになり、大量のイベントと高スループットを処理する能力が強化されます。
Kafka は、キー:
があるかどうかに基づいてメッセージをパーティションに割り当てます。Kafka は、集合的に Kafka クラスターを形成する ブローカー と呼ばれるノードを使用する分散データ インフラストラクチャとして動作します。ブローカーは、ベアメタル ハードウェア、クラウド インスタンス、Kubernetes によって管理されるコンテナー、ラップトップ上の Docker、または JVM プロセスが実行できる場所ならどこでも実行できます。
ブローカーは以下に焦点を当てます:
メッセージの計算やトピック間のルーティングを実行せず、設計をシンプルかつ効率的に保ちます。
Kafka は、複数のブローカー間でパーティション データをレプリケートすることにより、データの耐久性とフォールト トレランスを保証します。パーティションのプライマリ コピーは リーダー レプリカ であり、追加のコピーは フォロワー レプリカ です。データはリーダーに書き込まれ、更新内容が自動的にフォロワーにレプリケートされます。
このレプリケーション プロセスにより、次のことが保証されます。
Kafka はレプリケーションを透過的に処理するため、開発者はレプリケーションを直接管理する必要がなく、これらの保証の恩恵を受けられます。
Kafka プロデューサー は、データを Kafka トピック に送信 (パブリッシュ) するクライアント アプリケーションです。メッセージ (レコード) を作成し、Kafka クラスター に送信する役割を果たします。プロデューサーは、構成とメッセージ キーの存在に基づいて、メッセージが保存される トピック と パーティション を決定します。プロデューサーは次のことに責任を負いますが、これに限定されません:
圧縮:
プロデューサーはメッセージを圧縮して、ネットワーク帯域幅とストレージの使用量を削減できます。
Kafka コンシューマー は、Kafka トピック からメッセージを読み取るクライアント アプリケーションです。
は独自のペースで Kafka パーティションからメッセージを取得し、データのリアルタイムまたはバッチ処理を可能にします。 。 Kafka はコンシューマーにメッセージをプッシュするのではなく、データをリクエストすることで Kafka パーティションからメッセージをプルすることに注意してください。消費者は、処理したオフセットも追跡します。オフセットは自動的にまたは手動
でコミットでき、コンシューマーに障害が発生してもデータが失われないようにします。これにより、オフセットをリセットしてメッセージを再生するなど、柔軟な利用が可能になります。コンシューマ グループは、一部のトピックからのデータを協力して消費するコンシューマのセットであり、これによりトピックのメッセージの分散処理が可能になります。
トピックのパーティションはグループ内のコンシューマー間で分割され、各メッセージがグループ内の 1 つのコンシューマーのみによって処理されるようにします。複数の消費者グループは、干渉することなく同じトピックを独立して利用できます。
新しいコンシューマがグループに参加するか、既存のコンシューマが失敗すると、Kafka はグループ内のコンシューマ間でパーティションを再割り当てし、すべてのパーティションが確実にカバーされるようにします。
Kafka のシリアル化と逆シリアル化は、送信および保存のために元の形式とバイト配列の間でデータを変換し、プロデューサーとコンシューマーが効率的に通信できるようにすることです。
オブジェクトまたはデータ構造をバイト ストリームに変換して、送信または保存できるようにするプロセスです。プロデューサは、Kafka トピックにデータを送信する前に、データ (キーと値) をバイト配列にシリアル化します。
一般的なシリアル化形式:
逆のプロセスで、バイト ストリームが元のオブジェクトまたはデータ構造に変換されます。コンシューマーが Kafka トピックからデータを読み取るとき、バイト配列をデシリアライズして処理に使用可能な形式に戻します。
圧縮とは、メッセージを保存または送信する前にメッセージのサイズを減らすことです。プロデューサー、ブローカー、コンシューマー間でより小さなペイロードを送信することで、ストレージの使用量を最適化し、ネットワーク帯域幅の消費を削減し、全体的なパフォーマンスを向上させます。
プロデューサーが Kafka トピックにメッセージを送信するとき、送信前にメッセージを圧縮できます。圧縮されたメッセージはブローカーにそのまま保存され、コンシューマがメッセージを読み取るときに解凍されます。
圧縮によりリソースが節約されますが、CPU 使用率と圧縮の利点との間のトレードオフのバランスをとり、ユースケースに合ったコーデックを選択することが重要です。
Apache Kafka のパフォーマンスを最適化するには、さまざまなコンポーネントを微調整してスループットとレイテンシのバランスを効果的に調整する必要があります。この記事はこの主題の表面をなぞっただけです。Kafka をチューニングする際に考慮すべきいくつかの側面を以下に示します。
パーティション管理:
プロデューサー構成:
コンシューマ構成:
部屋の温度を記録し、このデータを Kafka を使用して送信し、別のアプリケーションがそれを処理するアプリケーションを想像してください。わかりやすくするために、プロデューサーとコンシューマーの両方が同じアプリケーション内に実装されている Kafka の側面にのみ焦点を当てます。このシナリオでは、特定の瞬間に記録された各温度がイベントを表します:
{ temperature: 42, timeStamp: new Date(), };
すべてのコードはこのリポジトリにあります。
まず、Kafka ブローカーが必要ですが、マシンに Kafka をインストールする代わりに、この Docker Kafka イメージだけをインストールしましょう。
その画像をプルすることから始めます:
docker pull apache/kafka
次に、マシン上の同じポートで Kafka がリッスンするポートをマッピングして実行します。
docker run -d -p 9092:9092 --name Broker apache/kafka:latest
以上です。Kafka ブローカーが実行されています。続行する前に、トピックを作成し、メッセージを送信して消費することで、Kafka ブローカーを試してみることができます。そのためには、そのイメージ ページの指示に従ってください。
アプリケーションを構築するには、NestJS と KafkaJS を使用します。まず、Nest CLI でアプリを作成します
ネスト新しい my-nest-プロジェクト
プロジェクトフォルダー内にkafkajsをインストールします
npm i kafkajs
そして次のモジュールを生成します
ネスト・ジー・モ・カフカ
ネストGモプロデューサー
nest g mo コンシューマー
巣鴨温度
Kafka モジュール は、メッセージの接続、切断、送信、受信のためのコンシューマー クラスとプロデューサー クラスの管理を含む、Kafka 固有のすべての操作を処理します。これは、kafkajs パッケージと直接対話する唯一のモジュールになります。
プロデューサー モジュールとコンシューマー モジュールは、パブリッシュ/サブスクライブ プラットフォーム (この場合は Kafka) とアプリケーションの残りの部分の間のインターフェイスとして機能し、プラットフォーム固有の詳細を抽象化します。
温度モジュールがイベントを管理します。どのパブリッシュ/サブスクライブ プラットフォームが使用されているかを知る必要はなく、コンシューマーとプロデューサーが機能することだけが必要です。
モジュールを作成したら、フォルダー src/interface も作成し、その中に次のインターフェースを追加しましょう:
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
src/kafka/ フォルダーに、これらのインターフェイスを実装するプロデューサー クラスとコンシューマー クラスを追加します。
// src/interfaces/consumer.interface.ts export type ConsumerMessage = { key?: string; value: any; }; export type OnMessage = (message: ConsumerMessage) => Promise<void>; export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (onMessage?: OnMessage) => Promise<void>; isConnected: () => boolean; }
// src/kafka/kafka.producer.ts export class KafkaProducer implements IProducer { private readonly logger = new Logger(KafkaProducer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly producer: Producer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, ) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.producer = this.kafka.producer(); } async produce( message: Message, compression?: CompressionTypes, acks?: number, timeout?: number, ) { // To produce, at least a topic and a list of messages must be provided await this.producer.send({ topic: this.topic, messages: [message], compression, timeout, acks, }); } // To produce a message, the producer must be connected async connect() { try { // Just hooking up some logs in the producer events // And storing the connection status this.producer.on('producer.connect', () => { this.logger.log( `producer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.producer.on('producer.disconnect', () => { this.logger.log( `producer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); // Connect to Kafka await this.producer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.producer.disconnect(); } isConnected(): boolean { return this.connected; } }
これらのクラスを kafka.module.ts にエクスポートすることを忘れないでください
// src/kafka/kafka.cosumer.ts export class KafkaConsumer implements IConsumer { private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true }); private readonly kafka: Kafka; private readonly consumer: Consumer; private connected: boolean = false; constructor( private readonly broker: string, private readonly topic: string, private readonly groupId: string, ) { if (this.broker && this.topic && this.groupId) { // The client must be configured with at least one broker this.kafka = new Kafka({ brokers: [this.broker], }); this.consumer = this.kafka.consumer({ groupId: this.groupId }); } else { this.logger.warn('Broker, topic and groupId must be provided'); } } // The onMessage function will be called when a message is received async consume(onMessage: OnMessage) { // Here we subscribe to the topic ... await this.consumer.subscribe({ topic: this.topic }); // ... and handle the messages await this.consumer.run({ eachMessage: async (payload) => { try { this.logger.log( `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`, ); await onMessage({ key: payload.message.key?.toString(), value: payload.message.value.toString(), }); } catch (err) { this.logger.error('error on consuming message', err); } }, }); } // To consume, the consumer must be connected async connect() { try { // Just hooking up some logs in the consumer events // And storing the connection status this.consumer.on('consumer.connect', () => { this.logger.log( `consumer connected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = true; }); this.consumer.on('consumer.disconnect', () => { this.logger.log( `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`, ); this.connected = false; }); await this.consumer.connect(); } catch (err) { this.logger.error( `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`, err, ); } } async disconnect() { await this.consumer.disconnect(); } isConnected(): boolean { return this.connected; } }
現状では、温度モジュールに移動して、これらの Kafka クラスをインスタンス化し、使用を開始できます。ただし、温度モジュールがどのパブリッシュ/サブスクライブ プラットフォームを使用しているかを気にする必要がなければ、より良いでしょう。代わりに、基盤となるプラットフォームに関係なく、メッセージの送受信のみに焦点を当て、挿入されたプロデューサーやコンシューマーと単純に連携する必要があります。こうすることで、将来別のパブリッシュ/サブスクライブ プラットフォームに切り替えることを決定した場合でも、温度モジュールに変更を加える必要がなくなります。
この抽象化を実現するには、Kafka のプロデューサーおよびコンシューマー実装の詳細を処理するプロデューサー クラスとコンシューマー クラスを作成できます。プロデューサーから始めましょう:
// src/kafka/kafka.module.ts @Module({ imports: [], providers: [KafkaProducer, KafkaConsumer], exports: [KafkaProducer, KafkaConsumer], }) export class KafkaModule {}
// src/producer/producer.service.ts @Injectable() export class ProducerService implements OnApplicationShutdown { // Expects any producer that implements the IProducer interface private readonly producer: IProducer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, ) { this.producer = new KafkaProducer(broker, topic); } /** The produce() and message can receive more parameters, * refer to produce method in src/kafka/kafka.producer.ts */ async produce(message: { key?: string; value: string }) { if (!this.producer.isConnected()) { await this.producer.connect(); } await this.producer.produce(message); } async onApplicationShutdown() { await this.producer.disconnect(); } }
さて、消費者は:
// src/producer/producer.module.ts @Module({ imports: [KafkaModule], providers: [ ProducerService, { provide: 'broker', useValue: 'default-broker-value', }, { provide: 'topic', useValue: 'default-topic-value', }, ], exports: [ProducerService], }) export class ProducerModule {}
// src/consumer/consumer.service.ts @Injectable() export class ConsumerService implements OnApplicationShutdown { // Expects any consumer that implements the IConsumer interface private readonly consumer: IConsumer; constructor( @Inject('broker') broker: string, @Inject('topic') topic: string, @Inject('groupId') groupId: string, ) { this.consumer = new KafkaConsumer(broker, topic, groupId); } async consume(onMessage: OnMessage) { if (!this.consumer.isConnected()) { await this.consumer.connect(); } await this.consumer.consume(onMessage); } async onApplicationShutdown() { await this.consumer.disconnect(); } }
ここで、温度モジュールの構築に集中できます。温度.service.ts ファイルで、温度を登録するメソッドを作成します。この例では、プロデューサーを使用して温度データをブローカーに送信するだけです。さらに、デモ目的で受信メッセージを処理するメソッドを実装します。
これらのメソッドは、別のサービスまたはコントローラーから呼び出すことができます。ただし、簡単にするために、この例では、アプリケーションの起動時に onModuleInit メソッドを利用して直接呼び出します。
{ temperature: 42, timeStamp: new Date(), };
// src/interfaces/producer.interface.ts export interface IProducer { produce: (message: any) => Promise<void>; connect: () => Promise<void>; disconnect: () => Promise<void>; isConnected: () => boolean; }
それだけです! Docker コンテナ内でブローカーを実行すると、アプリケーションを起動してメッセージを送受信できます。さらに、次のコマンドを使用して、ブローカー コンテナ内でシェルを開くことができます:
docker exec --workdir /opt/kafka/bin/ -it Broker sh
そこから、ブローカーと直接対話し、アプリケーションにメッセージを送信したり、アプリケーションからメッセージを受信したり、新しいトピックを作成したりすることができます。
これは、この例のコードが含まれるリポジトリです。
以上がKafka の基礎と実際の例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。