Kafka の基礎と実際の例

Linda Hamilton
リリース: 2024-12-28 09:26:11
オリジナル
354 人が閲覧しました

ここ数週間、私は Kafka を詳しく調べ、途中でメモを取ってきました。それを整理してブログ投稿として構成することにしました。概念やヒントとは別に、Kafka で構築された実践的な例があります。 NestJS と KafkaJs。

カフカとは何ですか?

Apache Kafka は、リアルタイム イベントを処理するように設計された分散イベント ストリーミング プラットフォームです。大規模で高スループット、低遅延のデータ ストリームの保存、処理、取得が可能となり、リアルタイム データ パイプラインやイベント駆動型アプリケーションの構築に適しています。

主な特徴:

  • イベント ストリーミング: Kafka は、データを トピック に整理します。
  • トピックは、イベントの順序付けされたログです。
  • 分散アーキテクチャ: Kafka は、スケーラビリティと耐障害性を考慮して構築されています。ブローカーと呼ばれるノードのクラスター
  • として動作し、複数のサーバーにデータを分散できます。
  • パブリッシュ-サブスクライブ モデル: プロデューサートピックにメッセージを書き込み、コンシューマー
  • はトピックからメッセージを読み取ります。 Kafka は複数のコンシューマをサポートし、異なるアプリケーションが同じデータ ストリームを独立して処理できるようにします。
  • 高パフォーマンス:
  • Kafka は高スループット向けに最適化されており、1 秒あたり数百万のメッセージを低遅延で処理します。
  • 永続ストレージ:
  • Kafka は、構成可能な保持期間でメッセージをディスクに保存し、データの永続性と信頼性を確保します。
  • パーティショニングとレプリケーション: トピックはスケーラビリティのために パーティション
  • に分割され、フォールト トレランスのためにブローカー間でレプリケートされます。
  • 再生可能性: コンシューマは、オフセット
  • をリセットすることでメッセージを再読み取りでき、データの再処理または回復が可能になります。
  • 統合とエコシステム:
  • Kafka はさまざまなシステムと統合し、Kafka Connect (データ統合用) や Kafka Streams (ストリーム処理用) などのツールを備えています。

利点
  • 信頼性:
  • データの分散、レプリケーション、パーティション化を通じてフォールト トレランスを保証します。
  • スケーラビリティ:
  • Kafka は大量のデータを処理し、ダウンタイムなしで水平方向に拡張できます。
  • 耐久性:
  • メッセージは即座に保存され、回復力とデータの永続性が保証されます。
  • パフォーマンス:
  • Kafka は、極端なデータ負荷下でも高いパフォーマンスを維持し、ダウンタイムやデータ損失なしで大量のデータを処理します。

短所

これらのトレードオフは、Kafka のパフォーマンスを最大化するための意図的な設計上の選択ですが、より高い柔軟性が必要なユースケースでは課題が生じる可能性があります。
  • 柔軟性の制限: Kafka には、レポート内の特定のデータのフィルタリングなどの拡張クエリのサポートがありません。 Kafka は受信した順序でオフセットによってメッセージを取得するため、コンシューマはこれらのタスクを処理する必要があります。
  • 長期保存用に設計されていない: Kafka はデータのストリーミングには優れていますが、履歴データを長期間保存するのには適していません。データの重複により、大規模なデータセットのストレージのコストが高くなる可能性があります。
  • ワイルドカード トピックのサポートなし: Kafka では、ワイルドカード パターン (log-2024-* など) を使用して複数のトピックから消費することはできません。

ユースケース

  • リアルタイム分析: データ ストリームが発生するたびに処理および分析します。
  • イベント ソーシング: アプリケーションの状態に対するすべての変更を一連のイベントとして記録します。
  • ログ集約: 分散システムからログを収集および管理します。
  • データ パイプライン: システム間でデータを確実かつ効率的にストリーミングします。
  • IoT アプリケーション: IoT デバイスからのセンサー データをリアルタイムで処理します。

カフカはどのように機能しますか?

Kafka は、キューイング メッセージング モデルとパブリッシュ/サブスクライブ メッセージング モデルの両方の機能を統合し、消費者にそれぞれのアプローチの利点を提供します。

  • キューは、複数のコンシューマ インスタンスにタスクを分散することでスケーラブルなデータ処理を可能にしますが、従来のキューは複数のサブスクライバをサポートしません。
  • パブリッシュ-サブスクライブ モデルは複数のサブスクライバーをサポートしますが、各メッセージがすべてのサブスクライバーに送信されるため、複数のワーカー プロセスにタスクを分散することはできません。

Kafka はパーティション化されたログ システムを採用して、キューイングとパブリッシュ/サブスクライブ モデルの利点を組み合わせています。順序付けられた一連のレコードであるログはパーティションに分割され、各パーティションが異なるサブスクライバ (コンシューマ) に割り当てられます。この設定により、複数のサブスクライバがスケーラビリティを維持しながらトピックを共有できるようになります。

Kafka fundamentals with a practical example

イベント、トピック、パーティション

Kafka がリアルタイム イベントを処理するように設計されたプラットフォームであることは見てきましたが、これらのイベントがどのように処理されるかを説明する前に、それらのイベントを定義する必要があります。

イベントとは、支払い、Web サイトのクリック、温度測定など、アプリケーションに記録されたアクション、インシデント、または変更です。

Kafka の

イベント は、キー/値 ペアとしてモデル化され、キーと値の両方がバイト シーケンスにシリアル化されます。

  • は、多くの場合、シリアル化されたドメイン オブジェクト、またはセンサー出力やその他のアプリケーション データなどの生の入力を表します。これらは、Kafka イベントで送信されるコア情報をカプセル化します。
  • キー は複雑なドメイン オブジェクトにすることもできますが、多くの場合、文字列や整数などの単純なタイプになります。 (リレーショナル データベースの主キーのように) 個々のイベントを一意に識別するのではなく、通常、キーはシステム内のエンティティ (特定のユーザー、注文、接続されたデバイスなど) を識別します。

Kafka は、イベントを トピック と呼ばれる 順序付けられたログ に整理します。外部システムが Kafka にイベントを書き込むと、イベントはトピックの末尾に追加されます。メッセージは、読み取られた後でも、設定可能な期間トピック内に残ります。キューとは異なり、トピックは耐久性、複製性、耐障害性に優れており、イベント レコードを効率的に保存します。ただし、ログは順次スキャンのみ可能であり、クエリは実行できません。

トピックはログ ファイルとしてディスクに保存されますが、ディスクには有限のサイズや I/O などの制限があります。これを克服するために、Kafka ではトピックを パーティション に分割し、単一のログを複数のログに分割し、異なるサーバーに分散できるようにします。このパーティショニングにより、Kafka は水平方向に拡張できるようになり、大量のイベントと高スループットを処理する能力が強化されます。

Kafka は、キー:

があるかどうかに基づいてメッセージをパーティションに割り当てます。
  • キーなし: メッセージはすべてのパーティションにラウンドロビンで分散され、均等なデータ分散が保証されますが、メッセージの順序は維持されません。
  • キーあり: パーティションはキーのハッシュによって決定され、同じキーを持つメッセージが常に同じパーティションに送られ、順序が維持されます。

ブローカー

Kafka は、集合的に Kafka クラスターを形成する ブローカー と呼ばれるノードを使用する分散データ インフラストラクチャとして動作します。ブローカーは、ベアメタル ハードウェア、クラウド インスタンス、Kubernetes によって管理されるコンテナー、ラップトップ上の Docker、または JVM プロセスが実行できる場所ならどこでも実行できます。

ブローカーは以下に焦点を当てます:

  • 新しいイベントをパーティションに書き込みます。
  • パーティションからの読み取りを提供します。
  • ブローカー間でパーティションをレプリケートします。

メッセージの計算やトピック間のルーティングを実行せず、設計をシンプルかつ効率的に保ちます。

レプリケーション

Kafka は、複数のブローカー間でパーティション データをレプリケートすることにより、データの耐久性とフォールト トレランスを保証します。パーティションのプライマリ コピーは リーダー レプリカ であり、追加のコピーは フォロワー レプリカ です。データはリーダーに書き込まれ、更新内容が自動的にフォロワーにレプリケートされます。

このレプリケーション プロセスにより、次のことが保証されます。

  • ブローカーやストレージに障害が発生した場合でもデータを安全に保ちます。
  • 自動フェイルオーバー。現在のリーダーに障害が発生した場合、別のレプリカがリーダーとして引き継ぎます。

Kafka はレプリケーションを透過的に処理するため、開発者はレプリケーションを直接管理する必要がなく、これらの保証の恩恵を受けられます。

プロデューサー

Kafka プロデューサー は、データを Kafka トピック に送信 (パブリッシュ) するクライアント アプリケーションです。メッセージ (レコード) を作成し、Kafka クラスター に送信する役割を果たします。プロデューサーは、構成とメッセージ キーの存在に基づいて、メッセージが保存される トピックパーティション を決定します。プロデューサーは次のことに責任を負いますが、これに限定されません:

  • メッセージの構成:
    • 各メッセージは、キー (オプション)、値 (実際のデータ)、およびメタデータで構成されます。
    • キーはメッセージのパーティションを決定し、同じキーを持つメッセージの順序を保証します。
  • パーティションの割り当て:
    • キーが指定されている場合、プロデューサーはハッシュ アルゴリズムを使用してパーティションを決定します。
    • キーを使用しない場合、メッセージは負荷分散のためにラウンドロビン方式でパーティション全体に分散されます。
  • 圧縮:

    プロデューサーはメッセージを圧縮して、ネットワーク帯域幅とストレージの使用量を削減できます。

消費者

Kafka コンシューマー は、Kafka トピック からメッセージを読み取るクライアント アプリケーションです。

は独自のペースで Kafka パーティションからメッセージを取得し、データのリアルタイムまたはバッチ処理を可能にします。 。 Kafka はコンシューマーにメッセージをプッシュするのではなく、データをリクエストすることで Kafka パーティションからメッセージをプルすることに注意してください。

消費者は、処理したオフセットも追跡します。オフセットは自動的にまたは手動

でコミットでき、コンシューマーに障害が発生してもデータが失われないようにします。これにより、オフセットをリセットしてメッセージを再生するなど、柔軟な利用が可能になります。

消費者団体

コンシューマ グループは、一部のトピックからのデータを協力して消費するコンシューマのセットであり、これによりトピックのメッセージの分散処理が可能になります。

トピックのパーティションはグループ内のコンシューマー間で分割され、各メッセージがグループ内の 1 つのコンシューマーのみによって処理されるようにします。複数の消費者グループは、干渉することなく同じトピックを独立して利用できます。

新しいコンシューマがグループに参加するか、既存のコンシューマが失敗すると、Kafka はグループ内のコンシューマ間でパーティションを再割り当てし、すべてのパーティションが確実にカバーされるようにします。

シリアル化と逆シリアル化

Kafka のシリアル化と逆シリアル化は、送信および保存のために元の形式とバイト配列の間でデータを変換し、プロデューサーとコンシューマーが効率的に通信できるようにすることです。

連載

オブジェクトまたはデータ構造をバイト ストリームに変換して、送信または保存できるようにするプロセスです。プロデューサは、Kafka トピックにデータを送信する前に、データ (キーと値) をバイト配列にシリアル化します。

一般的なシリアル化形式:

  • JSON: 人間が判読可能で、幅広い互換性があります。
  • Avro: コンパクトで効率的な、スキーマベース。
  • Protobuf: コンパクト、スキーマベース、言語に依存しない。
  • String: 単純なテキストベースのシリアル化。
  • カスタムシリアル化: アプリケーション固有のニーズ用。

逆シリアル化

逆のプロセスで、バイト ストリームが元のオブジェクトまたはデータ構造に変換されます。コンシューマーが Kafka トピックからデータを読み取るとき、バイト配列をデシリアライズして処理に使用可能な形式に戻します。

圧縮

圧縮とは、メッセージを保存または送信する前にメッセージのサイズを減らすことです。プロデューサー、ブローカー、コンシューマー間でより小さなペイロードを送信することで、ストレージの使用量を最適化し、ネットワーク帯域幅の消費を削減し、全体的なパフォーマンスを向上させます。

プロデューサーが Kafka トピックにメッセージを送信するとき、送信前にメッセージを圧縮できます。圧縮されたメッセージはブローカーにそのまま保存され、コンシューマがメッセージを読み取るときに解凍されます。

メリット

  • ネットワーク帯域幅の削減: ペイロードが小さくなると、ネットワーク上で送信されるデータが少なくなります。
  • ストレージ要件の低下: 圧縮されたメッセージは、ディスク上の占有スペースを減らします。
  • スループットの向上: メッセージが小さいため、より高速なデータ転送と処理が可能になります。

いつ使用しますか?

  • メッセージ サイズが大きいユースケース: 圧縮によりデータ サイズが大幅に削減されます。
  • 高スループット システム: ネットワークとストレージ リソースの負担を軽減します。
  • バッチ処理: 圧縮は、プロデューサーが複数のメッセージをまとめてバッチ処理する場合に最適に機能します。

圧縮によりリソースが節約されますが、CPU 使用率と圧縮の利点との間のトレードオフのバランスをとり、ユースケースに合ったコーデックを選択することが重要です。

サポートされている圧縮タイプ

  • なし: 圧縮なし (デフォルト)。
  • Gzip: 圧縮率は高いですが、CPU 使用率が高くなります。
  • Snappy: 圧縮速度と CPU 使用率のバランスが取れており、リアルタイムのユースケースに適しています。
  • LZ4: 圧縮と解凍が高速化され、低遅延システム向けに最適化されています。
  • Zstd: Gzip よりも優れたパフォーマンスを備えた高い圧縮率。新しい Kafka バージョンでサポートされています。

チューニング

Apache Kafka のパフォーマンスを最適化するには、さまざまなコンポーネントを微調整してスループットとレイテンシのバランスを効果的に調整する必要があります。この記事はこの主題の表面をなぞっただけです。Kafka をチューニングする際に考慮すべきいくつかの側面を以下に示します。

  • パーティション管理:

    • パーティション数: パーティションの数を増やすと、並列処理とスループットが向上します。ただし、管理オーバーヘッドを防ぐために、過度のパーティションは避けてください。パーティションの数は、消費者の能力と希望する消費率に合わせて調整してください。
  • プロデューサー構成:

    • バッチ処理: メッセージの効率的なバッチ処理を有効にして、リクエストの数を減らし、スループットを向上させるために、batch.size と linger.ms を構成します。
    • 圧縮: 圧縮 (compression.type=snappy など) を実装してメッセージ サイズを減らし、ネットワークとストレージの使用量を削減します。圧縮によって追加の CPU オーバーヘッドが発生することに注意してください。
  • コンシューマ構成:

    • フェッチ設定: fetch.min.bytes と fetch.max.wait.ms を調整して、コンシューマがメッセージを取得する方法を制御し、アプリケーションのニーズに応じてレイテンシーとスループットのバランスをとります。

実践例

部屋の温度を記録し、このデータを Kafka を使用して送信し、別のアプリケーションがそれを処理するアプリケーションを想像してください。わかりやすくするために、プロデューサーとコンシューマーの両方が同じアプリケーション内に実装されている Kafka の側面にのみ焦点を当てます。このシナリオでは、特定の瞬間に記録された各温度がイベントを表します:

{
  temperature: 42,
  timeStamp: new Date(),
};
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー

すべてのコードはこのリポジトリにあります。

まず、Kafka ブローカーが必要ですが、マシンに Kafka をインストールする代わりに、この Docker Kafka イメージだけをインストールしましょう。

その画像をプルすることから始めます:

docker pull apache/kafka

次に、マシン上の同じポートで Kafka がリッスンするポートをマッピングして実行します。

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

以上です。Kafka ブローカーが実行されています。続行する前に、トピックを作成し、メッセージを送信して消費することで、Kafka ブローカーを試してみることができます。そのためには、そのイメージ ページの指示に従ってください。

アプリケーションを構築するには、NestJS と KafkaJS を使用します。まず、Nest CLI でアプリを作成します

ネスト新しい my-nest-プロジェクト

プロジェクトフォルダー内にkafkajsをインストールします

npm i kafkajs

そして次のモジュールを生成します

ネスト・ジー・モ・カフカ

ネストGモプロデューサー

nest g mo コンシューマー

巣鴨温度

Kafka モジュール は、メッセージの接続、切断、送信、受信のためのコンシューマー クラスとプロデューサー クラスの管理を含む、Kafka 固有のすべての操作を処理します。これは、kafkajs パッケージと直接対話する唯一のモジュールになります。

プロデューサー モジュールとコンシューマー モジュールは、パブリッシュ/サブスクライブ プラットフォーム (この場合は Kafka) とアプリケーションの残りの部分の間のインターフェイスとして機能し、プラットフォーム固有の詳細を抽象化します。

温度モジュールがイベントを管理します。どのパブリッシュ/サブスクライブ プラットフォームが使用されているかを知る必要はなく、コンシューマーとプロデューサーが機能することだけが必要です。

モジュールを作成したら、フォルダー src/interface も作成し、その中に次のインターフェースを追加しましょう:

{
  temperature: 42,
  timeStamp: new Date(),
};
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
ログイン後にコピー
ログイン後にコピー

src/kafka/ フォルダーに、これらのインターフェイスを実装するプロデューサー クラスとコンシューマー クラスを追加します。

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
ログイン後にコピー
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

ログイン後にコピー

これらのクラスを kafka.module.ts にエクスポートすることを忘れないでください

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}
ログイン後にコピー

現状では、温度モジュールに移動して、これらの Kafka クラスをインスタンス化し、使用を開始できます。ただし、温度モジュールがどのパブリッシュ/サブスクライブ プラットフォームを使用しているかを気にする必要がなければ、より良いでしょう。代わりに、基盤となるプラットフォームに関係なく、メッセージの送受信のみに焦点を当て、挿入されたプロデューサーやコンシューマーと単純に連携する必要があります。こうすることで、将来別のパブリッシュ/サブスクライブ プラットフォームに切り替えることを決定した場合でも、温度モジュールに変更を加える必要がなくなります。

この抽象化を実現するには、Kafka のプロデューサーおよびコンシューマー実装の詳細を処理するプロデューサー クラスとコンシューマー クラスを作成できます。プロデューサーから始めましょう:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
ログイン後にコピー
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}
ログイン後にコピー

さて、消費者は:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
ログイン後にコピー
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}
ログイン後にコピー

ここで、温度モジュールの構築に集中できます。温度.service.ts ファイルで、温度を登録するメソッドを作成します。この例では、プロデューサーを使用して温度データをブローカーに送信するだけです。さらに、デモ目的で受信メッセージを処理するメソッドを実装します。

これらのメソッドは、別のサービスまたはコントローラーから呼び出すことができます。ただし、簡単にするために、この例では、アプリケーションの起動時に onModuleInit メソッドを利用して直接呼び出します。

{
  temperature: 42,
  timeStamp: new Date(),
};
ログイン後にコピー
ログイン後にコピー
ログイン後にコピー
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}
ログイン後にコピー
ログイン後にコピー

それだけです! Docker コンテナ内でブローカーを実行すると、アプリケーションを起動してメッセージを送受信できます。さらに、次のコマンドを使用して、ブローカー コンテナ内でシェルを開くことができます:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

そこから、ブローカーと直接対話し、アプリケーションにメッセージを送信したり、アプリケーションからメッセージを受信したり、新しいトピックを作成したりすることができます。

これは、この例のコードが含まれるリポジトリです。

以上がKafka の基礎と実際の例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
著者別の最新記事
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート