インターネット技術の発展とデータ量の爆発的な増加に伴い、大量のデータの処理は今日のインターネット企業が直面しなければならない問題の 1 つとなっています。従来のデータ処理ソリューション、特にバッチ処理ソリューションでは、リアルタイムと高可用性のニーズを満たすことができなくなり、現時点では、リアルタイム データ処理が最適なソリューションの 1 つとなっています。開発者としては、大規模なデータをどのようにエレガントかつ効率的に処理するかという点にも注意を払う必要があります。
Pulsar は、Yahoo がオープンソース化したリアルタイム データ処理フレームワークで、レイヤード アーキテクチャを使用してデータ処理をより効率的かつスケーラブルにします。 Java、Python、Ruby、PHP などの複数のクライアント言語をサポートします。 PHP は非常に人気のある言語であり、構文が単純で学習期間が短いため、多くの企業がリアルタイム データ処理アプリケーションを開発する際に好まれる言語の 1 つとなっています。この記事では、PHPを使用してオープンソースのPulsarのリアルタイムデータ処理を実装する方法を紹介します。
Pulsar の使用を開始する前に、Pulsar をダウンロードしてインストールする必要があります。関連するソフトウェア パッケージとドキュメントは Pulsar の公式 Web サイトから入手でき、ローカル マシンまたはクラスター内のノードにインストールして、ローカルでの開発とテストを行うことができます。
PHP 開発プロセスでは、クライアント SDK pulsar-client-php を使用する必要があります。 Composer などのツールを使用してインストールできますが、具体的な手順は次のとおりです:
// 安装pulsar-client-php composer require apache/pulsar
インストール完了後、Pulsar を使用するための基本的な設定は次のとおりです。
use ApachePulsarAuthenticationAuthenticationFactory; use ApachePulsarClientBuilder; use ApachePulsarProducerConfiguration; use ApachePulsarSerializationSerialization; // 配置生产者的信息 $clientBuilder = new ClientBuilder(); $clientBuilder->setServiceUrl('pulsar://localhost:6650'); $clientBuilder->setAuthentication( AuthenticationFactory::token('your-token-string') ); $producerConf = new ProducerConfiguration(); $producerConf->setTopic('your-topic-name'); $producerConf->setSendTimeout(3000); $producerConf->setSerialization(Serialization::JSON); // 创建生产者实例 $producer = $clientBuilder->build()->createProducer($producerConf); $producer->send('your message');
上記のコードでは、まず ClientBuilder
クラスを通じて Pulsar プロデューサーを作成します。プロデューサを作成するときは、setServiceUrl
メソッドを設定して Pulsar サービスの URL を指定し、setAuthentication
メソッドを設定して認証を実行する必要があります。さらに、トピック、タイムアウトなどのプロデューサーの構成情報を設定する必要があります。
Pulsar は、リアルタイム データ処理を実装するために、Producer と Consumer という 2 つの基本コンポーネントを提供します。プロデューサーは指定された Pulsar トピックにデータを送信するために使用され、コンシューマーはトピックからのデータを消費します。以下では、これら 2 つのコンポーネントを使用してリアルタイム データ処理を完了する方法を詳しく紹介します。
まず、次の手順でプロデューサー インスタンスを作成します。
// 导入命名空间 use ApachePulsarClientBuilder; // 创建Pulsar client实例 $clientBuilder = new ClientBuilder(); $client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build(); // 创建Producer对象 $producer = $client->createProducer( [ 'topic' => 'your-topic', ] );
プロデューサーを作成するときは、プロデューサーが属する Pulsar トピックを設定する必要があります。さらに、「ProducerName」、「initialSequenceId」、「sendTimeout」などの他のオプションもあります。これらのオプションは必要に応じて構成できます。
Pulsar トピックにメッセージを送信する方法を見てみましょう。
// 对Pulsar topic发送消息 $result = $producer->send('your-message');
send メソッドは MessageId
オブジェクトを返します。メッセージが以前に送信されたことがある場合は、対応する MessageId
が返されます。メッセージの送信に失敗した場合は、PulsarClientException
例外がスローされます。
プロデューサと同様に、パルサー コンシューマの作成も複数のステップに分かれています。
// 导入命名空间 use ApachePulsarClientBuilder; // 创建Pulsar client实例 $clientBuilder = new ClientBuilder(); $client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build(); // 创建Consumer对象 $consumer = $client->subscribe( [ 'topic' => 'your-topic', 'subscriptionName' => 'your-subscription-name', ] );
Consumer を作成するときは、Pulsar トピックとサブスクリプションのサブスクリプション名を設定する必要があります。 「receiverQueueSize」、「ackTimeout」、「subscriptionType」などの設定など、他のオプションもあります。
以下では、指定された Pulsar トピックからメッセージを取得する方法を説明します。
// 从topic中消费消息 $message = $consumer->receive(); // 对消息进行处理 echo 'Received message with ID: ' . $message->getMessageId() . PHP_EOL; // markAsReceived表示通知Pulsar这条消息已经被处理 $consumer->acknowledge($message);
receive()
メソッドを呼び出すと、プログラムは待機状態のままになります。指定された Pulsar トピックからメッセージが返されます。メッセージが返されると、プログラムは実行を続行し、メッセージを処理します。
acknowledge()
メソッドを呼び出した後、Pulsar はサブスクリプションのキューからメッセージを削除します。 acknowledge()
メソッドが呼び出されない場合、メッセージは有効期限が切れるまでキュー内に残ります (デフォルトは 1 時間)。
この記事では、PHP を使用してオープンソースの Pulsar のリアルタイム データ処理を実装する方法を紹介しました。まず Pulsar 環境をセットアップし、Pulsar のプロデューサ コンポーネントとコンシューマ コンポーネントを使用してリアルタイム データ処理を実装する方法を段階的に説明しました。
Pulsar は階層型アーキテクチャを採用しており、大規模なリアルタイム データ処理を十分にサポートできます。現在、Pulsar は Alibaba、Meituan、Baidu などの多くのインターネット企業で使用されています。
この記事で紹介する内容を学ぶことで、PHP と Pulsar を使用してリアルタイム データ処理をより効率的かつエレガントに行う方法をすでに理解できると思います。
以上がPHP はオープンソースの Pulsar リアルタイム データ処理を実装しますの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。