Avec l'avènement de l'ère de l'explosion de l'information, l'utilisation et le traitement des données sont devenus de plus en plus importants. Le traitement des données en streaming est devenu l’un des moyens importants de traiter des données massives. En tant que développeur PHP, vous devez avoir une expérience et des besoins en matière de traitement de données en temps réel. Cet article explique comment utiliser PHP et Google Cloud Dataflow pour le traitement et la gestion des données en streaming.
1. Introduction à Google Cloud Dataflow
Google Cloud Dataflow est un service cloud qui gère des tâches de traitement de données à grande échelle, tout en permettant également de mélanger le traitement par lots et le traitement par flux. utiliser.
Google Cloud Dataflow présente les caractéristiques suivantes :
2. Créez un projet Google Cloud Dataflow et configurez l'environnement
Vous devez d'abord créer un projet Google Cloud .
Vous devez installer le SDK Google Cloud pour utiliser Google Cloud Dataflow. Téléchargez le package d'installation et suivez les invites pour terminer l'installation.
Utilisez la commande suivante pour définir les variables d'environnement sur le projet Google Cloud actuel :
$ gcloud config set project [PROJECT_ID]
3. Installez les extensions PHP nécessaires
Pour utiliser le service Dataflow en PHP, vous devez installez les extensions suivantes :
$ pecl install grpc
PROTOBUF
$ pecl install protobuf
Extension PHP DAFLOW
$ pecl install google-cloud-dataflow-alpha
<?php require __DIR__ . '/vendor/autoload.php'; use GoogleCloudBigQueryBigQueryClient; use GoogleCloudDataflowDataflowClient; use GoogleCloudDataflowPubSubPubSubOptions; use GoogleCloudPubSubPubSubClient; use GoogleCloudDataflowOptions; $configs = include __DIR__ . '/config.php'; $inputTopic = $configs['input_topic']; $outputTable = $configs['output_table']; $project = $configs['project_id']; $bucket = $configs['bucket']; $stagingLocation = $configs['staging_location']; $tempLocation = $configs['temp_location']; $jobName = 'test-job'; $options = [ 'project' => $project, 'stagingLocation' => $stagingLocation, 'tempLocation' => $tempLocation, 'jobName' => $jobName, ]; $pubsub = new PubSubClient([ 'projectId' => $project ]); $pubsub_topic = $pubsub->topic($inputTopic); $bigquery = new BigQueryClient([ 'projectId' => $project ]); $dataset = $bigquery->dataset('test_dataset'); $table = $dataset->table($outputTable); $table->create([ 'schema' => [ [ 'name' => 'id', 'type' => 'STRING', ], [ 'name' => 'timestamp', 'type' => 'TIMESTAMP', ], [ 'name' => 'message', 'type' => 'STRING', ], ], ]); $dataflow = new DataflowClient(); $pubsubOptions = PubSubOptions::fromArray([ 'topic' => sprintf('projects/%s/topics/%s', $project, $inputTopic), ]); $options = [ Options::PROJECT => $project, Options::STAGING_LOCATION => $stagingLocation, Options::TEMP_LOCATION => $tempLocation, Options::JOB_NAME => $jobName, ]; $job = $dataflow->createJob([ 'projectId' => $project, 'name' => $jobName, 'environment' => [ 'tempLocation' => sprintf('gs://%s/temp', $bucket), ], 'steps' => [ [ 'name' => 'Read messages from Pub/Sub', 'pubsubio' => (new GoogleCloudDataflowIoPubsubPubsubMessage()) ->expand($pubsubOptions) ->withAttributes(false) ->withIdAttribute('unique_id') ->withTimestampAttribute('publish_time') ], [ 'name' => 'Write messages to BigQuery', 'bigquery' => (new GoogleCloudDataflowIoBigQueryBigQueryWrite()) ->withJsonSchema(file_get_contents(__DIR__ . '/schema.json')) ->withTable($table->tableId()) ], ] ]); $operation = $job->run(); # Poll the operation until it is complete $operation->pollUntilComplete(); if (!$operation->isComplete()) { exit(1); } if ($operation->getError()) { print_r($operation->getError()); exit(1); } echo "Job has been launched";
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!