PHP melaksanakan pemprosesan data masa nyata Kafka Stream sumber terbuka

王林
Lepaskan: 2023-06-18 09:18:01
asal
1432 orang telah melayarinya

Kafka Stream, sebagai enjin pengkomputeran strim, boleh memproses data masa nyata dengan cepat dan menyediakan keupayaan pemprosesan yang diedarkan di luar kotak. Sebagai bahasa pembangunan yang popular, PHP juga boleh menggunakan ciri bahasa yang baik dan perpustakaan sambungan untuk melaksanakan pemprosesan data Kafka Stream.

Artikel ini akan memperkenalkan cara menggunakan PHP untuk membangunkan pemprosesan data masa nyata Kafka Stream, dan menggunakan contoh untuk menunjukkan cara menggunakan PHP untuk menganalisis data masa nyata yang dijana oleh mod pemerhati.

  1. Pengenalan kepada Kafka Stream

Kafka Stream ialah enjin pengkomputeran aliran yang pantas dan stabil yang boleh memproses data masa nyata dengan pasti dan menyediakan pengedaran di luar kotak kuasa pemprosesan. Kafka Stream ialah kaedah pemprosesan data yang cekap dan fleksibel dengan menggunakan mesej dalam topik Kafka, menghantarnya ke aplikasi untuk diproses, dan kemudian menghantar hasil yang diproses kembali ke topik Kafka.

  1. Integrasi PHP dan Kafka Stream

Dalam PHP, melalui perpustakaan Kafka-PHP yang disediakan secara rasmi oleh Kafka Stream, kami boleh dengan mudah mengintegrasikan aplikasi PHP dengan Kafka Stream Untuk menyepadukan . Berikut ialah versi Kafka Stream yang disokong oleh pustaka Kafka-PHP:

  • Kafka 0.10.x
  • Kafka 0.11.x
  • Kafka 1.0.x
  • Kafka 1.1.x
  • Kafka 2.0.x
  • Kafka 2.1.x
  • Kafka 2.2.x

Kafka-PH perpustakaan menyediakan ciri teras berikut:

  • Pengeluar: Menyediakan keupayaan untuk menghasilkan mesej Kafka dan menghantarnya ke topik tertentu.
  • Pengguna: Menyediakan keupayaan untuk menggunakan mesej Kafka dan menyokong penyerahan automatik dan penyerahan manual.
  • Pengurus: Menyediakan keupayaan untuk mencipta dan memadam topik dan partition Kafka.

Selain itu, perpustakaan Kafka-PHP juga menyediakan sokongan untuk sambungan Swoole PHP Prestasi aplikasi PHP boleh dipertingkatkan lagi dengan menggunakan sambungan Swoole.

  1. Corak Pemerhati

Corak Pemerhati ialah corak reka bentuk tingkah laku yang mentakrifkan hubungan pergantungan satu-ke-banyak antara objek Apabila keadaannya berubah, semua objek yang bergantung padanya diberitahu dan dikemas kini secara automatik. Corak pemerhati digunakan secara meluas dalam pemantauan acara, pengaturcaraan UI dan bidang lain, dan boleh mencapai penghantaran dan pemprosesan mesej yang cekap.

  1. Melaksanakan pemprosesan data mod pemerhati Kafka Stream

Yang berikut akan menggunakan kod sampel untuk menunjukkan cara menggunakan PHP untuk membangunkan pemprosesan data masa nyata Kafka Stream dan memohon mod pemerhati untuk analisis data.

4.1 Melaksanakan penerbit Kafka

Pertama, kita perlu mencipta penerbit untuk menghantar mesej kepada topik Kafka. Berikut ialah kod contoh pengeluar Kafka yang ringkas:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaProducer;
use RdKafkaProducerTopic;

$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new Producer($conf);
$topic = $producer->newTopic('topic1');
for ($i = 0; $i < 10; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
}
?>
Salin selepas log masuk

Dalam kod di atas, kami menggunakan kelas Producer yang disediakan oleh pustaka sambungan RdKafka untuk melaksanakan pengeluar Kafka dan menghantar mesej kepada Kafka bernama 'topic1' dalam topik. Apabila melaksanakan pengeluar Kafka, kita perlu memberi perhatian untuk menyediakan konfigurasi sambungan gugusan Kafka untuk memastikan bahawa gugusan Kafka boleh disambungkan dengan betul.

4.2 Melaksanakan pengguna Kafka

Seterusnya, kita perlu mencipta pengguna Kafka untuk menggunakan data daripada topik Kafka. Berikut ialah kod sampel pengguna Kafka yang ringkas:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaConsumer;
use RdKafkaTopicPartition;

$conf = new Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$consumer = new Consumer($conf);
$consumer->addBrokers('kafka:9092');
$topic = $consumer->newTopic('topic1');
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 1000);
    if ($message === null) {
        continue;
    }
    if ($message->err === RD_KAFKA_RESP_ERR_NO_ERROR) {
        echo "Received message: {$message->payload}
";
    }
}

$consumer->close();
?>
Salin selepas log masuk

Dalam kod di atas, kami menggunakan kelas Pengguna yang disediakan oleh perpustakaan sambungan RdKafka untuk melaksanakan pengguna Kafka, menggunakan daripada topik Kafka bernama data 'topic1' dan mencetak data ke konsol. Ambil perhatian bahawa apabila melaksanakan pengguna Kafka, kita perlu menetapkan topik penggunaan dan offset untuk memulakan penggunaan.

4.3 Melaksanakan Corak Pemerhati

Kini kita boleh menggunakan data daripada topik Kafka, tetapi bagaimana untuk menggunakan corak pemerhati untuk menganalisis data? Berikut ialah kod contoh corak pemerhati mudah:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use SplObserver;
use SplSubject;

class Producer implements SplSubject
{
    private array $observers = [];

    public function attach(SplObserver $observer):void
    {
        array_push($this->observers, $observer);
    }

    public function detach(SplObserver $observer):void
    {
        if (($key = array_search($observer, $this->observers, true)) !== false) {
            array_splice($this->observers, $key, 1);
        }
    }

    public function notify():void
    {
        foreach ($this->observers as $observer) {
            $observer->update($this);
        }
    }

    public function produce(string $message):void
    {
        echo "Producing message: {$message}
";
        $this->notify();
    }
}

class Consumer implements SplObserver
{
    public function update(SplSubject $subject):void
    {
        echo "Consuming message: {$subject}
";
    }
}

$producer = new Producer();
$producer->attach(new Consumer());
$producer->produce('Message 1');
?>
Salin selepas log masuk

Dalam kod di atas, kami mentakrifkan kelas utama bernama Producer, melaksanakan antara muka SplSubject dan menyediakan kaedah pengurusan pemerhati yang dilampirkan, tanggalkan, maklumkan dan hasilkan. Kami juga menentukan kelas pemerhati bernama Pengguna, melaksanakan antara muka SplObserver dan menyediakan kaedah kemas kini untuk memproses mesej. Akhir sekali, kami mencipta tika Pengeluar dan melampirkan tika Pengguna sebagai pemerhati, melaksanakan kaedah hasil sekali dan mencetuskan kaedah kemas kini Pengguna.

4.4 Laksanakan pemprosesan data mod pemerhati Kafka Stream

Akhir sekali, kami menggabungkan kod dalam tiga langkah sebelumnya untuk melaksanakan pemprosesan data mod pemerhati Kafka Stream. Berikut ialah kod sampel pemprosesan data Kafka Stream yang mudah:

<?php
require_once __DIR__ . '/vendor/autoload.php';

use RdKafkaConf;
use RdKafkaConsumer;
use RdKafkaProducer;
use RdKafkaTopicPartition;
use SplSubject;
use SplObserver;

class KafkaStream implements SplSubject
{
    private array $observers;
    private Conf $conf;
    private Producer $producer;
    private Consumer $consumer;

    public function __construct(string $bootstrap_servers)
    {
        $this->conf = new Conf();
        $this->conf->set('metadata.broker.list', $bootstrap_servers);
        $this->producer = new Producer($this->conf);
        $this->consumer = new Consumer($this->conf);
        $this->observers = [];
    }

    public function attach(SplObserver $observer):void
    {
        array_push($this->observers, $observer);
    }

    public function detach(SplObserver $observer):void
    {
        if (($key = array_search($observer, $this->observers, true)) !== false) {
            array_splice($this->observers, $key, 1);
        }
    }

    public function notify():void
    {
        foreach ($this->observers as $observer) {
            $observer->update($this);
        }
    }

    public function produce(string $message, string $topic):void
    {
        echo "Producing message: {$message}
";
        $this->producer->newTopic($topic)->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        $this->notify();
    }

    public function consume(string $topic):void
    {
        $topic_partition = new TopicPartition($topic, 0);
        $this->consumer->assign([$topic_partition]);
        $this->consumer->seek($topic_partition, 0);

        while (true) {
            $message = $this->consumer->consume(0, 1000);
            if ($message === null) {
                continue;
            }
            if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
                echo "Error: {$message->errstr()}, exiting.
";
                break;
            }
            echo "Consuming message: {$message->payload}
";
        }

        $this->consumer->close();
    }
}

class Consumer implements SplObserver
{
    public function update(SplSubject $subject):void
    {
        echo "Processing message: {$subject}
";
    }
}

$bootstrap_servers = 'kafka:9092';
$kafka_stream = new KafkaStream($bootstrap_servers);
$kafka_stream->attach(new Consumer());
$kafka_stream->produce('Message 1', 'topic1');
$kafka_stream->consume('topic1');
?>
Salin selepas log masuk

Dalam kod di atas, kami mentakrifkan kelas bernama KafkaStream, melaksanakan antara muka SplSubject dan menyediakan kaedah teras pemprosesan Kafka Stream menghasilkan dan menggunakan, serta sebagai kaedah pengurusan pemerhati lampirkan, tanggalkan dan maklumkan. Kami juga menentukan kelas pemerhati bernama Pengguna, melaksanakan antara muka SplObserver dan menyediakan kaedah kemas kini untuk memproses mesej. Akhir sekali, kami mencipta tika KafkaStream dan melampirkan tika Pengguna sebagai pemerhati, melaksanakan kaedah hasil sekali untuk menghasilkan mesej, dan menggunakan dan memproses mesej dalam kaedah penggunaan.

  1. Ringkasan

Artikel ini memperkenalkan cara menggunakan PHP untuk membangunkan pemprosesan data masa nyata Kafka Stream dan menunjukkan cara menggunakan corak pemerhati untuk menganalisis data masa nyata. Kafka Stream dan corak Pemerhati ialah gabungan alatan yang berkuasa yang boleh membantu kami memproses data masa nyata berskala besar dengan cepat dan mencapai penghantaran dan pemprosesan mesej yang cekap.

Atas ialah kandungan terperinci PHP melaksanakan pemprosesan data masa nyata Kafka Stream sumber terbuka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:php.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan
Tentang kita Penafian Sitemap
Laman web PHP Cina:Latihan PHP dalam talian kebajikan awam,Bantu pelajar PHP berkembang dengan cepat!