With the advent of the era of information explosion, the use and processing of data have become more and more important. Streaming data processing has become one of the important ways to process massive data. As a PHP developer, you must have experience and needs in processing real-time data. This article will introduce how to use PHP and Google Cloud Dataflow for streaming data processing and management.
1. Introduction to Google Cloud Dataflow
Google Cloud Dataflow is a cloud service that manages large-scale data processing tasks. It can effectively handle large-scale data flows while also allowing Batch and stream processing are mixed together.
Google Cloud Dataflow has the following characteristics:
2. Create a Google Cloud Dataflow project and set up the environment
First you need to create a Google Cloud project.
You need to install Google Cloud SDK to use Google Cloud Dataflow. Download the installation package and follow the prompts to complete the installation.
Use the following command to set the environment variables to the current Google Cloud project:
$ gcloud config set project [PROJECT_ID]
3. Install the necessary PHP extensions
In order to use the Dataflow service in PHP, you need to install the following extension:
Use the following command to install the gRPC extension:
$ pecl install grpc
Use the following command to install the Protobuf extension:
$ pecl install protobuf
Use the following command to install Dataflow PHP Extension:
$ pecl install google-cloud-dataflow-alpha
4. Write data flow processing code
The following is an example that can receive messages from the Pub/Sub topic and pass them to the Dataflow processing pipeline. After the processing is completed, the results will be Write to the BigQuery table:
<?php require __DIR__ . '/vendor/autoload.php'; use GoogleCloudBigQueryBigQueryClient; use GoogleCloudDataflowDataflowClient; use GoogleCloudDataflowPubSubPubSubOptions; use GoogleCloudPubSubPubSubClient; use GoogleCloudDataflowOptions; $configs = include __DIR__ . '/config.php'; $inputTopic = $configs['input_topic']; $outputTable = $configs['output_table']; $project = $configs['project_id']; $bucket = $configs['bucket']; $stagingLocation = $configs['staging_location']; $tempLocation = $configs['temp_location']; $jobName = 'test-job'; $options = [ 'project' => $project, 'stagingLocation' => $stagingLocation, 'tempLocation' => $tempLocation, 'jobName' => $jobName, ]; $pubsub = new PubSubClient([ 'projectId' => $project ]); $pubsub_topic = $pubsub->topic($inputTopic); $bigquery = new BigQueryClient([ 'projectId' => $project ]); $dataset = $bigquery->dataset('test_dataset'); $table = $dataset->table($outputTable); $table->create([ 'schema' => [ [ 'name' => 'id', 'type' => 'STRING', ], [ 'name' => 'timestamp', 'type' => 'TIMESTAMP', ], [ 'name' => 'message', 'type' => 'STRING', ], ], ]); $dataflow = new DataflowClient(); $pubsubOptions = PubSubOptions::fromArray([ 'topic' => sprintf('projects/%s/topics/%s', $project, $inputTopic), ]); $options = [ Options::PROJECT => $project, Options::STAGING_LOCATION => $stagingLocation, Options::TEMP_LOCATION => $tempLocation, Options::JOB_NAME => $jobName, ]; $job = $dataflow->createJob([ 'projectId' => $project, 'name' => $jobName, 'environment' => [ 'tempLocation' => sprintf('gs://%s/temp', $bucket), ], 'steps' => [ [ 'name' => 'Read messages from Pub/Sub', 'pubsubio' => (new GoogleCloudDataflowIoPubsubPubsubMessage()) ->expand($pubsubOptions) ->withAttributes(false) ->withIdAttribute('unique_id') ->withTimestampAttribute('publish_time') ], [ 'name' => 'Write messages to BigQuery', 'bigquery' => (new GoogleCloudDataflowIoBigQueryBigQueryWrite()) ->withJsonSchema(file_get_contents(__DIR__ . '/schema.json')) ->withTable($table->tableId()) ], ] ]); $operation = $job->run(); # Poll the operation until it is complete $operation->pollUntilComplete(); if (!$operation->isComplete()) { exit(1); } if ($operation->getError()) { print_r($operation->getError()); exit(1); } echo "Job has been launched";
5. Run the Dataflow processing pipeline
Use the following command to run the Dataflow processing pipeline:
$ php dataflow.php
6. Monitoring and management of the data processing pipeline
Google Cloud Console provides a Dataflow page that can be used to view and manage data processing pipelines.
7. Summary
This article introduces how to use PHP and Google Cloud Dataflow for stream data processing and management, from creating a Google Cloud project to setting up the environment, installing the necessary PHP extensions, and then writing Data flow processing code, running Dataflow processing pipeline, and monitoring and management of data processing pipelines introduce the process and steps of Dataflow in detail. I hope it will be helpful to everyone.
The above is the detailed content of How to use PHP and Google Cloud Dataflow for streaming data processing and management. For more information, please follow other related articles on the PHP Chinese website!