Dalam beberapa tahun kebelakangan ini, permintaan untuk pemprosesan data masa nyata terus meningkat. Teknologi permulaan sejuk dan berasaskan kelompok tidak lagi dapat memenuhi keperluan pemprosesan data masa nyata. Oleh itu, lebih banyak syarikat beralih kepada teknologi pemprosesan data masa nyata. Artikel ini akan memperkenalkan cara menggunakan PHP dan Kafka untuk mencapai pemprosesan data masa nyata.
Kafka ialah platform pemprosesan strim teragih berkemampuan tinggi yang pada asalnya dibangunkan oleh LinkedIn. Kafka boleh digunakan untuk mencipta pemprosesan strim baharu, pemprosesan kelompok, sistem pemesejan, sistem koordinasi, dsb.
PHP ialah bahasa pengaturcaraan dinamik popular yang digunakan secara meluas untuk membina aplikasi Internet. Walaupun PHP bukanlah pilihan pertama untuk pemprosesan data masa nyata, ia digunakan secara meluas dalam pembangunan web dan pemprosesan data.
Kini kami akan memperkenalkan langkah tentang cara menggunakan PHP dan Kafka untuk mencapai pemprosesan data masa nyata.
Langkah 1: Pasang dan konfigurasi PHP
Sebelum memulakan pemprosesan data masa nyata dengan PHP, kami perlu memasang persekitaran PHP dan menambah sambungan PHP yang diperlukan, seperti sambungan Kafka dan sambungan Redis.
Kafka extension boleh dimuat turun dan dipasang dari link ini kafka, pecl install kafka untuk install kafka extension.
Sambungan Redis Anda boleh memuat turun dan memasang sambungan PHP Redis dari sini, atau anda boleh menggunakan PECL untuk memasangnya, arahan: pecl install redis.
Selepas memasang dan mengkonfigurasi sambungan PHP, kami boleh mula menulis program pemprosesan data masa nyata.
Langkah 2: Sambung ke Kafka
Di Kafka, pengeluar Kafka dan pengguna Kafka digunakan untuk menyambungkan aliran data untuk menghantar data ke "saluran paip data". Dalam PHP, kita boleh menggunakan kelas KafkaProducer dan KafkaConsumer yang disediakan oleh Kafka dan membuat instantiate mereka untuk menyambung ke Kafka.
Kod sampel adalah seperti berikut:
<?php $kafkaConf = new RdKafkaConf(); $kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaProducer = new RdKafkaProducer($kafkaConf); $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topic = $kafkaProducer->newTopic('sample'); ?>
Langkah 3: Pembacaan data
Kita boleh menggunakan kelas KafkaConsumer untuk mendapatkan aliran data masa nyata. Dalam Kafka, terdapat konsep aliran, yang membahagikan aliran data kepada satu atau lebih partition, setiap partition terdiri daripada partition master dan partition slave sifar atau lebih. Dalam PHP, kita boleh menggunakan kelas KafkaConsumer untuk membuat instantiate objek pengguna dan melanggan satu atau lebih partition untuk membaca data.
Kod sampel adalah seperti berikut:
<?php $kafkaConf = new RdKafkaConf(); $kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $kafkaConsumer->newTopic('sample', $topicConf); var_dump($topic->getMetadata(true, 10000)); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 1000); if (null !== $message) { print_r($message->payload); } } ?>
Langkah 4: Pemprosesan Data
Selepas menerima data, kami boleh memproses data dan menyimpannya dalam ingatan. Kami boleh menggunakan Redis untuk menyimpan data dan menyimpannya dengan selamat dengan sentiasa menyegarkan data ke dalam pangkalan data pada masa yang sesuai.
Kod sampel adalah seperti berikut:
<?php $kafkaConf = new RdKafkaConf(); $kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $kafkaConsumer->newTopic('sample', $topicConf); $redisClient = new Redis(); $redisClient->connect('127.0.0.1', 6379); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 1000); if (null !== $message) { $data = json_decode($message->payload); $redisClient->hMSet('my_data', [ $data->key1 => $data->value1, $data->key2 => $data->value2, ]); } } ?>
Langkah 5: Penyegerakan Data
Akhir sekali, kami perlu mengepam strim data masa nyata kembali ke pangkalan data kami. Kita boleh menggunakan pemasa dan proses PHP untuk kerap membuang cache Redis kembali ke pangkalan data.
Kod sampel adalah seperti berikut:
<?php $kafkaConf = new RdKafkaConf(); $kafkaConf->set('metadata.broker.list', 'localhost:9092');//设置kafka连接信息 $kafkaConsumer = new RdKafkaConsumer($kafkaConf); $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $topic = $kafkaConsumer->newTopic('sample', $topicConf); $redisClient = new Redis(); $redisClient->connect('127.0.0.1', 6379); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); $count = 0; while (true) { $message = $topic->consume(0, 1000); if (null !== $message) { $data = json_decode($message->payload); $redisClient->hMSet('my_data', [ $data->key1 => $data->value1, $data->key2 => $data->value2, ]); $count++; if ($count == 5) { $count = 0; $allData = $redisClient->hGetAll('my_data'); //将数据更新到数据库中 //... } } } ?>
Kesimpulan
Dalam artikel ini, kami memperkenalkan cara melaksanakan pemprosesan data masa nyata menggunakan PHP dan Kafka. Data masa nyata boleh disalurkan dengan mudah ke saluran paip data menggunakan Kafka, dan diproses serta disimpan menggunakan PHP. Kami juga menggunakan Redis sebagai cache dan storan dalam memori untuk mengendalikan data masa nyata. Pendekatan ini boleh menggantikan caching dan penyelesaian pemesejan dengan mudah sambil memberikan prestasi dan kebolehskalaan yang lebih besar.
Atas ialah kandungan terperinci Bagaimana untuk melaksanakan pemprosesan data masa nyata menggunakan PHP dan Kafka. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!