ホームページ > バックエンド開発 > PHPチュートリアル > PHP と Apache Kafka の統合による効率的なメッセージのキューイングと配布

PHP と Apache Kafka の統合による効率的なメッセージのキューイングと配布

WBOY
リリース: 2023-06-25 10:04:02
オリジナル
1873 人が閲覧しました

最新のインターネット アプリケーションの継続的な開発に伴い、大量のデータ通信を処理する必要があるアプリケーションがますます増えています。これらのデータ通信を処理する従来の方法は、ポーリングまたはブロック I/O を使用することですが、これらの方法は非常に非効率であるため、最新のアプリケーションのニーズを満たすことができなくなりました。この問題を解決するために、業界はメッセージ キューと分散システムと呼ばれるテクノロジーを開発しました。

メッセージ キューおよび配信システムでは、メッセージのプロデューサはメッセージをキューに送信し、メッセージのコンシューマはキューからメッセージを取得して、対応する操作を実行します。このアプローチにより、ポーリングや I/O のブロックなどの問題が回避されるため、データ通信の効率が大幅に向上します。

この記事では、PHP と Apache Kafka の統合を使用して効率的なメッセージ キューイングと配信を実現する方法について説明します。

Apache Kafka の概要

Apache Kafka は、高スループット、低遅延、スケーラブルな分散メッセージング システムです。大量のメッセージを処理し、より高い負荷に対応するために水平方向に拡張できます。 Apache Kafka の主なコンポーネントは次のとおりです。

  1. ブローカー: Kafka クラスター内の各ノードはブローカーであり、メッセージの保存と転送を担当します。
  2. トピック: 各メッセージは、メッセージの生成と消費の論理概念であるトピックに割り当てる必要があります。
  3. パーティション: 各トピックは複数のパーティションに分割でき、各パーティションには複数の順序付けされたメッセージが含まれます。
  4. プロデューサー: メッセージプロデューサー。メッセージをブローカーに送信します。
  5. コンシューマ: メッセージ コンシューマ。ブローカからメッセージを読み取ります。
  6. コンシューマ グループ: コンシューマのグループは、1 つ以上のパーティション内のメッセージを共同で消費します。
  7. オフセット: メッセージを一意に識別するために使用されるメッセージの番号。

PHP は Apache Kafka を統合します

Apache Kafka を使用するには、PHP の Kafka 拡張機能を使用する必要があります。この拡張機能は、Kafka を操作するために PHP で必要なすべての API を提供します。

まず、Kafka 拡張機能をインストールする必要があります。PECL からインストールできます:

pecl install kafka
ログイン後にコピー

拡張機能をインストールした後、使用を開始できます。以下は、PHP と Apache Kafka を使用したメッセージの生成と消費の簡単な例です。

<?php
$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka生产者
$producer = new RdKafkaProducer();
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers($brokers);

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaConsumer($conf);
$consumer->addBrokers($brokers);

// 生产消息
$topicProducer = $producer->newTopic($topic);
for ($i = 0; $i < 10; $i++) {
    $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i);
}

// 消费消息
$topicConsumer = $consumer->newTopic($topic);
$topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
    $message = $topicConsumer->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }
    echo $message->payload . PHP_EOL;
}
ログイン後にコピー

この例では、最初に Kafka プロデューサと Kafka コンシューマを作成します。次に、プロデューサーでは、指定されたトピックに 10 個のメッセージを送信し、コンシューマーでは、指定されたトピックからのメッセージを消費して、その内容を出力しました。

この時点で、PHP と Apache Kafka を使用した単純なメッセージの生成と消費が正常に実装されました。次に、PHP と Apache Kafka を使用して、より高度な機能を実装する方法について説明します。

高度なアプリケーションの例

実際のアプリケーションでは、通常、次のようないくつかの高度な機能を実装する必要があります。

  1. メッセージ配布: 指定されたコンシューマにメッセージを送信します。
  2. コンシューマ グループ: 複数のコンシューマが 1 つ以上のトピックのメッセージを共同で利用できるようにします。
  3. オフセット設定: メッセージの読み取り位置を制御できます。

ここでは、これらの関数の実装方法について説明します。

メッセージ配布

実際のアプリケーションでは、通常、メッセージのフローを制御する必要があります。たとえば、特定のコンシューマのみに特定のメッセージを消費させたい場合があります。この機能を実現するには、コンシューマごとにキューを作成し、特定のメッセージを特定のキューに割り当てることができます。

次は、2 つのコンシューマを使用して 2 つの異なるタスクを実行する例です。

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);
$consumer->subscribe([$topic]);

// 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息
$producer1 = new RdKafkaProducer();
$producer1->addBrokers($brokers);
$producer1Topic = $producer1->newTopic($topic . '_1');

$producer2 = new RdKafkaProducer();
$producer2->addBrokers($brokers);
$producer2Topic = $producer2->newTopic($topic . '_2');

// 消费消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 根据消息内容分配给不同的生产者
    if ($message->payload === 'task1') {
        $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    } elseif ($message->payload === 'task2') {
        $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    }
}
ログイン後にコピー

この例では、2 つのプロデューサを使用して 2 つの異なるコンシューマにメッセージを配布します。コンシューマーがメッセージを受信すると、メッセージの内容に基づいてそのメッセージを特定のプロデューサーに割り当てることができます。この方法は、メッセージ フローを制御し、メッセージの冗長な処理を回避するのに役立ちます。

コンシューマ グループ

通常の Kafka コンシューマでは、同じグループ内の異なるコンシューマが同じトピックを消費し、同じメッセージを受け取ります。これは、Kafka が自動的にパーティションのバランスをとり、各パーティションが 1 つのコンシューマのみによって処理されるようにするためです。

PHP では、group.id を使用してコンシューマをグループ化し、コンシューマ グループの機能を実現できます。

次は、同じグループ内のメッセージを並列処理できる Kafka コンシューマ グループの例です:

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$conf->set('metadata.broker.list', $brokers);
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafkaKafkaConsumer($conf);

// 添加需要订阅的topic
$consumer->subscribe([$topic]);

// 处理消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 处理完消息后手动提交offset
    $consumer->commit();
}
ログイン後にコピー

この例では、Kafka コンシューマ グループを作成し、それを追加トピックに送信します。サブスクリプションが必要なもの。その後、同じグループ内のメッセージを並行して処理できます。

注: コンシューマ グループでは、複数のコンシューマが 1 つ以上のパーティションを一緒に消費します。データを消費するときは、同じデータのマルチスレッド処理の問題に注意する必要があります。

オフセット構成

Kafka では、各パーティションに独立したオフセットがあります。コンシューマは、パーティション内のどこを読み取るか、つまりどのメッセージを読み取るかを制御できます。コンシューマは、最後のメッセージまたは最新のメッセージから読み取りを開始できます。

PHP では、オフセットを使用してメッセージの読み取り位置を制御できます。以下はオフセット構成の例です:

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);

// 订阅topic
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $consumer->newTopic($topic, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

// 消费消息
while (true) {
    $message = $topic->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;
}
ログイン後にコピー

この例では、auto.offset.reset を使用してオフセット構成を設定します。この構成は、コンシューマに最も古いオフセットからメッセージの消費を開始するように指示します。

実際のアプリケーションでは、必要に応じてさまざまなオフセットを構成できます。たとえば、プロデューサーが一部のメッセージの処理に失敗した後、失敗したメッセージが以前に処理された時点からメッセージの読み取りを再開する必要がある場合があります。

結論

この記事では、PHP と Apache Kafka の統合を使用して効率的なメッセージ キューイングと配信を実現する方法について説明しました。最初に Apache Kafka の基本を紹介し、次に PHP 用の Kafka 拡張機能を使用してメッセージの生成と消費を実装する方法について説明しました。最後に、メッセージ配布、コンシューマ グループ、オフセット構成などの高度な機能を実装する方法について説明しました。

PHP と Apache Kafka の統合を使用すると、効率的なメッセージのキューイングと配信を実装できるため、アプリケーションの応答速度とスループットが向上します。大量のデータ通信を処理する必要があるアプリケーションを開発している場合は、Apache Kafka と PHP 用の Kafka 拡張機能が適切な選択となる可能性があります。

以上がPHP と Apache Kafka の統合による効率的なメッセージのキューイングと配布の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート