Mit der Entwicklung der Internet-Technologie und dem explosionsartigen Wachstum des Datenvolumens ist die Verarbeitung riesiger Datenmengen zu einem der Probleme geworden, denen sich Internetunternehmen heute stellen müssen. Herkömmliche Datenverarbeitungslösungen, insbesondere Stapelverarbeitungslösungen, können den Anforderungen an Echtzeit und Hochverfügbarkeit nicht mehr gerecht werden. Derzeit ist die Echtzeit-Datenverarbeitung zu einer der besten Lösungen geworden. Als Entwickler müssen wir auch auf den eleganten und effizienten Umgang mit großen Datenmengen achten.
Pulsar ist ein Echtzeit-Datenverarbeitungs-Framework von Yahoo. Es nutzt eine mehrschichtige Architektur, um die Datenverarbeitung effizienter und skalierbarer zu gestalten. Es unterstützt mehrere Clientsprachen, darunter Java, Python, Ruby und PHP. Als sehr beliebte Sprache verfügt PHP über eine einfache Syntax und einen geringen Lernaufwand. Sie ist für viele Unternehmen zu einer der bevorzugten Sprachen für die Entwicklung von Echtzeit-Datenverarbeitungsanwendungen geworden. In diesem Artikel wird erläutert, wie Sie mit PHP die Echtzeit-Datenverarbeitung von Open-Source-Pulsar implementieren.
Bevor Sie Pulsar verwenden, müssen Sie Pulsar zunächst herunterladen und installieren. Relevante Softwarepakete und Dokumentationen können von der offiziellen Website von Pulsar bezogen und auf dem lokalen Computer oder auf einem Knoten im Cluster für lokale Entwicklung und Tests installiert werden.
Während des PHP-Entwicklungsprozesses müssen Sie das Client-SDK pulsar-client-php verwenden. Es kann über Tools wie Composer installiert werden. Der spezifische Prozess ist wie folgt:
// 安装pulsar-client-php composer require apache/pulsar
Nach Abschluss der Installation folgt die grundlegende Konfiguration für die Verwendung von Pulsar.
use ApachePulsarAuthenticationAuthenticationFactory; use ApachePulsarClientBuilder; use ApachePulsarProducerConfiguration; use ApachePulsarSerializationSerialization; // 配置生产者的信息 $clientBuilder = new ClientBuilder(); $clientBuilder->setServiceUrl('pulsar://localhost:6650'); $clientBuilder->setAuthentication( AuthenticationFactory::token('your-token-string') ); $producerConf = new ProducerConfiguration(); $producerConf->setTopic('your-topic-name'); $producerConf->setSendTimeout(3000); $producerConf->setSerialization(Serialization::JSON); // 创建生产者实例 $producer = $clientBuilder->build()->createProducer($producerConf); $producer->send('your message');
Im obigen Code erstellen wir zunächst den Pulsar-Produzenten über die Klasse ClientBuilder
. Beim Erstellen eines Produzenten müssen wir die Methode setServiceUrl
festlegen, um die URL des Pulsar-Dienstes anzugeben, und die Methode setAuthentication
, um die Authentifizierung durchzuführen. Darüber hinaus müssen Sie die Konfigurationsinformationen des Herstellers festlegen, z. B. Thema, Zeitüberschreitung usw. ClientBuilder
类来创建Pulsar的生产者。在创建生产者的时候,我们需要设置setServiceUrl
方法来指定Pulsar Service的URL,setAuthentication
方法来进行身份验证。另外需要设置生产者的配置信息,如话题、超时等。
Pulsar提供了Producer和Consumer两种基本的组件实现实时数据处理。Producer用于将数据发送到指定的Pulsar topic,而Consumer则从topic中消费数据。下面我们将详细介绍如何使用这两种组件来完成实时数据处理。
首先,我们通过以下步骤来创建一个Producer实例:
// 导入命名空间 use ApachePulsarClientBuilder; // 创建Pulsar client实例 $clientBuilder = new ClientBuilder(); $client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build(); // 创建Producer对象 $producer = $client->createProducer( [ 'topic' => 'your-topic', ] );
在创建生产者时,需要设置生产者所属的Pulsar topic。此外,还有其他可选项,如“producerName”、“initialSequenceId”、“sendTimeout”等。这些选项可以根据需要进行配置。
下面我们来看一下如何向Pulsar topic发送消息:
// 对Pulsar topic发送消息 $result = $producer->send('your-message');
send方法返回一个MessageId
对象。如果消息之前已经发送过,则返回对应的MessageId
。如果消息发送失败,则抛出PulsarClientException
异常。
与生产者一样,Pulsar Consumer的创建也是分为多个步骤。
// 导入命名空间 use ApachePulsarClientBuilder; // 创建Pulsar client实例 $clientBuilder = new ClientBuilder(); $client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build(); // 创建Consumer对象 $consumer = $client->subscribe( [ 'topic' => 'your-topic', 'subscriptionName' => 'your-subscription-name', ] );
在创建Consumer时,我们需要设置订阅的Pulsar topic和订阅名称。另外有其他可选项,如设置“receiverQueueSize”、“ackTimeout”、“subscriptionType”等。
下面我们将看到如何从指定的Pulsar topic中获取消息:
// 从topic中消费消息 $message = $consumer->receive(); // 对消息进行处理 echo 'Received message with ID: ' . $message->getMessageId() . PHP_EOL; // markAsReceived表示通知Pulsar这条消息已经被处理 $consumer->acknowledge($message);
在调用receive()
方法时,程序会保持等待状态,直到有消息从指定的Pulsar topic中返回。当有消息返回时,程序会继续执行,对消息进行处理。
调用acknowledge()
方法后,Pulsar才会将消息从该订阅的队列中删除。如果没有调用acknowledge()
rrreee
Beim Erstellen eines Produzenten müssen Sie das Pulsar-Thema festlegen, zu dem der Produzent gehört. Darüber hinaus gibt es weitere Optionen wie „producerName“, „initialSequenceId“, „sendTimeout“ usw. Diese Optionen können nach Bedarf konfiguriert werden. Sehen wir uns an, wie man eine Nachricht an das Pulsar-Thema sendet: 🎜rrreee🎜Die Sendemethode gibt einMessageId
-Objekt zurück. Wenn die Nachricht schon einmal gesendet wurde, wird die entsprechende MessageId
zurückgegeben. Wenn die Nachricht nicht gesendet werden kann, wird eine PulsarClientException
-Ausnahme ausgelöst. 🎜receive()
bleibt das Programm im Wartezustand, bis eine Nachricht vom angegebenen Thema eingeht Rückkehr zum Pulsar-Thema. Wenn eine Nachricht zurückgegeben wird, führt das Programm die Nachricht weiter aus und verarbeitet sie. 🎜🎜Pulsar löscht die Nachricht erst aus der Warteschlange des Abonnements, nachdem die Methode acknowledge()
aufgerufen wurde. Wenn die Methode acknowledge()
nicht aufgerufen wird, bleibt die Nachricht in der Warteschlange, bis die Nachricht abläuft (Standard ist 1 Stunde). 🎜🎜Zusammenfassung🎜🎜In diesem Artikel haben wir vorgestellt, wie man mit PHP die Echtzeit-Datenverarbeitung des Open-Source-Pulsar implementiert. Wir begannen mit der Einrichtung der Pulsar-Umgebung und beschrieben Schritt für Schritt, wie man die Producer- und Consumer-Komponenten von Pulsar nutzt, um eine Echtzeit-Datenverarbeitung zu implementieren. 🎜🎜Pulsar verwendet eine mehrschichtige Architektur und kann eine groß angelegte Echtzeit-Datenverarbeitung gut unterstützen. Derzeit wird Pulsar von vielen Internetunternehmen wie Alibaba, Meituan, Baidu usw. verwendet. 🎜🎜Wir glauben, dass Sie durch das Studium der in diesem Artikel vorgestellten Inhalte bereits verstehen können, wie Sie PHP und Pulsar nutzen können, um die Datenverarbeitung in Echtzeit effizienter und eleganter zu gestalten. 🎜Das obige ist der detaillierte Inhalt vonPHP implementiert die Open-Source-Pulsar-Echtzeit-Datenverarbeitung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!