Maison > développement back-end > tutoriel php > PHP implémente le traitement des données en temps réel open source Kafka Stream

PHP implémente le traitement des données en temps réel open source Kafka Stream

王林
Libérer: 2023-06-18 09:18:01
original
1486 Les gens l'ont consulté

Kafka Stream, en tant que moteur de calcul de flux, peut traiter rapidement des données en temps réel et fournir des capacités de traitement distribué prêtes à l'emploi. En tant que langage de développement populaire, PHP peut également utiliser ses bonnes fonctionnalités de langage et ses bibliothèques d'extensions pour implémenter le traitement des données Kafka Stream.

Cet article présentera comment utiliser PHP pour développer le traitement des données en temps réel de Kafka Stream et utilisera un exemple pour démontrer comment utiliser PHP pour analyser les données en temps réel générées par le mode observateur.

  1. Introduction à Kafka Stream

Kafka Stream est un moteur de calcul de flux rapide et stable qui peut traiter de manière fiable les données en temps réel et fournir des capacités de traitement distribué prêtes à l'emploi. Kafka Stream est une méthode de traitement de données efficace et flexible qui consiste à consommer des messages dans des sujets Kafka, à les envoyer à l'application pour traitement, puis à renvoyer les résultats traités au sujet Kafka.

  1. Intégration de PHP et Kafka Stream

En PHP, grâce à la bibliothèque Kafka-PHP officiellement fournie par Kafka Stream, nous pouvons facilement intégrer des applications PHP avec Kafka Stream. Voici les versions de Kafka Stream prises en charge par la bibliothèque Kafka-PHP :

  • Kafka 0.10.x
  • Kafka 0.11.x
  • Kafka 1.0.x
  • Kafka 1.1.x
  • Kafka 2.0.x
  • Kafka 2 . 1.x
  • Kafka 2.2.x

La bibliothèque Kafka-PHP fournit les fonctionnalités de base suivantes :

  • Producer : offre la possibilité de produire des messages Kafka et de les envoyer à un sujet spécifié.
  • Consumer : offre la possibilité de consommer des messages Kafka et prend en charge la soumission automatique et la soumission manuelle.
  • Manager : offre la possibilité de créer et de supprimer des sujets et des partitions Kafka.

De plus, la bibliothèque Kafka-PHP prend également en charge l'extension Swoole de PHP. Les performances des applications PHP peuvent être encore améliorées en utilisant l'extension Swoole.

  1. Modèle d'observateur

Le modèle d'observateur est un modèle de conception comportemental qui définit une relation de dépendance un-à-plusieurs entre les objets Lorsque l'état d'un objet change, tous les objets qui en dépendent seront avertis et automatiquement mis à jour. Le modèle d'observateur est largement utilisé dans la surveillance des événements, la programmation de l'interface utilisateur et d'autres domaines, et peut permettre une livraison et un traitement efficaces des messages.

  1. Implémentez le traitement des données en mode observateur de Kafka Stream

Ce qui suit utilisera un exemple de code pour démontrer comment utiliser PHP pour développer le traitement des données en temps réel de Kafka Stream et appliquer le mode observateur pour l'analyse des données.

4.1 Implémentation du producteur Kafka

Tout d'abord, nous devons créer un producteur pour envoyer des messages au sujet Kafka. Voici un exemple de code simple du producteur Kafka :

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaProducer;
use RdKafkaProducerTopic;

$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new Producer($conf);
$topic = $producer->newTopic('topic1');
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
}
?>
Copier après la connexion

Dans le code ci-dessus, nous utilisons la classe Producer fournie par la bibliothèque d'extension RdKafka pour implémenter le producteur Kafka et envoyer des messages au sujet Kafka nommé « topic1 ». Lors de la mise en œuvre du producteur Kafka, nous devons prêter attention à la configuration de la connexion du cluster Kafka pour garantir que le cluster Kafka peut être connecté correctement.

4.2 Implémentation du consommateur Kafka

Ensuite, nous devons créer un consommateur Kafka pour consommer les données du sujet Kafka. Ce qui suit est un simple exemple de code de consommateur Kafka :

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaConsumer;
use RdKafkaTopicPartition;

$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$consumer = new Consumer($conf);
$consumer->addBrokers('kafka:9092');
$topic = $consumer->newTopic('topic1');
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 1000);
    if ($message === null) {
        continue;
    }
    if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
        echo "Received message: {$message->payload}
";
    }
}

$consumer->close();
?>
Copier après la connexion

Dans le code ci-dessus, nous utilisons la classe Consumer fournie par la bibliothèque d'extension RdKafka pour implémenter le consommateur Kafka, consommons les données du sujet Kafka nommé « topic1 » et les données sont imprimées. à la console. Notez que lors de l'implémentation d'un consommateur Kafka, nous devons définir le sujet de consommation et le décalage pour démarrer la consommation.

4.3 Implémentation du modèle observateur

Nous pouvons désormais consommer les données du sujet Kafka, mais comment utiliser le modèle observateur pour analyser les données ? Ce qui suit est un exemple de code simple du modèle d'observateur :

<?php
require_once __DIR__ . '/vendor/autoload.php';

use SplObserver;
use SplSubject;

class Producer implements SplSubject
{
    private array $observers = [];

    public function attach(SplObserver $observer):void
    {
        array_push($this->observers, $observer);
    }

    public function detach(SplObserver $observer):void
    {
        if (($key = array_search($observer, $this->observers, true)) !== false) {
            array_splice($this->observers, $key, 1);
        }
    }

    public function notify():void
    {
        foreach ($this->observers as $observer) {
            $observer->update($this);
        }
    }

    public function produce(string $message):void
    {
        echo "Producing message: {$message}
";
        $this->notify();
    }
}

class Consumer implements SplObserver
{
    public function update(SplSubject $subject):void
    {
        echo "Consuming message: {$subject}
";
    }
}

$producer = new Producer();
$producer->attach(new Consumer());
$producer->produce('Message 1');
?>
Copier après la connexion

Dans le code ci-dessus, nous définissons une classe principale nommée Producer, implémentons l'interface SplSubject et fournissons les méthodes de gestion des observateurs attacher, détacher, notifier et produire. Nous avons également défini une classe d'observateur nommée Consumer, implémenté l'interface SplObserver et fourni la méthode de mise à jour pour traiter les messages. Enfin, nous avons créé une instance Producer et attaché une instance Consumer en tant qu'observateur, exécuté la méthode Produce une fois et déclenché la méthode de mise à jour du Consumer.

4.4 Implémenter le traitement des données en mode observateur de Kafka Stream

Enfin, nous combinons les codes des trois étapes précédentes pour implémenter le traitement des données en mode observateur de Kafka Stream. Ce qui suit est un exemple de code simple pour le traitement des données Kafka Stream :

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaConsumer;
use RdKafkaProducer;
use RdKafkaTopicPartition;
use SplSubject;
use SplObserver;

class KafkaStream implements SplSubject
{
    private array $observers;
    private Conf $conf;
    private Producer $producer;
    private Consumer $consumer;

    public function __construct(string $bootstrap_servers)
    {
        $this->conf = new Conf();
        $this->conf->set('metadata.broker.list', $bootstrap_servers);
        $this->producer = new Producer($this->conf);
        $this->consumer = new Consumer($this->conf);
        $this->observers = [];
    }

    public function attach(SplObserver $observer):void
    {
        array_push($this->observers, $observer);
    }

    public function detach(SplObserver $observer):void
    {
        if (($key = array_search($observer, $this->observers, true)) !== false) {
            array_splice($this->observers, $key, 1);
        }
    }

    public function notify():void
    {
        foreach ($this->observers as $observer) {
            $observer->update($this);
        }
    }

    public function produce(string $message, string $topic):void
    {
        echo "Producing message: {$message}
";
        $this->producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        $this->notify();
    }

    public function consume(string $topic):void
    {
        $topic_partition = new TopicPartition($topic, 0);
        $this->consumer->assign([$topic_partition]);
        $this->consumer->seek($topic_partition, 0);

        while (true) {
            $message = $this->consumer->consume(0, 1000);
            if ($message === null) {
                continue;
            }
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                echo "Error: {$message->errstr()}, exiting.
";
                break;
            }
            echo "Consuming message: {$message->payload}
";
        }

        $this->consumer->close();
    }
}

class Consumer implements SplObserver
{
    public function update(SplSubject $subject):void
    {
        echo "Processing message: {$subject}
";
    }
}

$bootstrap_servers = 'kafka:9092';
$kafka_stream = new KafkaStream($bootstrap_servers);
$kafka_stream->attach(new Consumer());
$kafka_stream->produce('Message 1', 'topic1');
$kafka_stream->consume('topic1');
?>
Copier après la connexion

Dans le code ci-dessus, nous définissons une classe nommée KafkaStream, implémentons l'interface SplSubject et fournissons les méthodes de base de traitement Kafka Stream produire et consommer, ainsi que les méthodes de gestion des observateurs. attacher, détacher, notifier. Nous avons également défini une classe d'observateur nommée Consumer, implémenté l'interface SplObserver et fourni la méthode de mise à jour pour traiter les messages. Enfin, nous créons une instance de KafkaStream et attachons une instance de Consumer en tant qu'observateur, exécutons la méthode de production une fois pour produire un message, puis consommons et traitons le message dans la méthode de consommation.

  1. Résumé

Cet article présente comment utiliser PHP pour développer le traitement des données en temps réel de Kafka Stream et montre comment utiliser le modèle d'observateur pour analyser les données en temps réel. Kafka Stream et le modèle Observer sont une puissante combinaison d'outils qui peuvent nous aider à traiter rapidement des données en temps réel à grande échelle et à parvenir à une livraison et un traitement efficaces des messages.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal