PHP는 오픈 소스 Pulsar 실시간 데이터 처리를 구현합니다.

PHPz
풀어 주다: 2023-06-18 09:16:01
원래의
1835명이 탐색했습니다.

인터넷 기술의 발전과 데이터 양의 폭발적인 증가로 인해 대용량 데이터의 처리는 오늘날 인터넷 기업이 직면해야 할 문제 중 하나가 되었습니다. 기존 데이터 처리 솔루션, 특히 일괄 처리 솔루션은 더 이상 실시간 및 고가용성 요구 사항을 충족할 수 없습니다. 현재 실시간 데이터 처리는 최고의 솔루션 중 하나가 되었습니다. 개발자로서 대규모 데이터를 어떻게 우아하고 효율적으로 처리할 것인가 역시 우리가 주목해야 할 주제입니다.

Pulsar는 Yahoo에서 오픈 소스로 제공하는 실시간 데이터 처리 프레임워크로, 계층화된 아키텍처를 사용하여 데이터 처리를 더욱 효율적이고 확장 가능하게 만듭니다. Java, Python, Ruby 및 PHP를 포함한 여러 클라이언트 언어를 지원합니다. 매우 널리 사용되는 언어인 PHP는 구문이 간단하고 학습 곡선이 낮습니다. 이는 많은 기업에서 실시간 데이터 처리 애플리케이션을 개발하는 데 선호되는 언어 중 하나가 되었습니다. 이 기사에서는 PHP를 사용하여 오픈 소스 Pulsar의 실시간 데이터 처리를 구현하는 방법을 소개합니다.

준비

Pulsar를 사용하기 전에 먼저 Pulsar를 다운로드하고 설치해야 합니다. 관련 소프트웨어 패키지 및 문서는 Pulsar의 공식 웹사이트에서 얻을 수 있으며 로컬 개발 및 테스트를 위해 로컬 시스템이나 클러스터의 노드에 설치할 수 있습니다.

PHP 개발 프로세스 중에는 클라이언트 SDK pulsar-client-php를 사용해야 합니다. Composer 등의 도구를 통해 설치할 수 있습니다. 구체적인 과정은 다음과 같습니다.

// 安装pulsar-client-php
composer require apache/pulsar
로그인 후 복사

설치가 완료된 후 Pulsar 사용 방법에 대한 기본 구성은 다음과 같습니다.

use ApachePulsarAuthenticationAuthenticationFactory;
use ApachePulsarClientBuilder;
use ApachePulsarProducerConfiguration;
use ApachePulsarSerializationSerialization;

// 配置生产者的信息
$clientBuilder = new ClientBuilder();
$clientBuilder->setServiceUrl('pulsar://localhost:6650');
$clientBuilder->setAuthentication(
    AuthenticationFactory::token('your-token-string')
);

$producerConf = new ProducerConfiguration();
$producerConf->setTopic('your-topic-name');
$producerConf->setSendTimeout(3000);
$producerConf->setSerialization(Serialization::JSON);

// 创建生产者实例
$producer = $clientBuilder->build()->createProducer($producerConf);
$producer->send('your message');
로그인 후 복사

위 코드에서는 먼저 ClientBuilder 클래스를 통해 Pulsar 생산자를 생성합니다. 생산자를 생성할 때 setServiceUrl 메서드를 설정하여 Pulsar 서비스의 URL을 지정하고 setAuthentication 메서드를 설정하여 인증을 수행해야 합니다. 또한 주제, 시간 초과 등 생산자의 구성 정보를 설정해야 합니다. ClientBuilder类来创建Pulsar的生产者。在创建生产者的时候,我们需要设置setServiceUrl方法来指定Pulsar Service的URL,setAuthentication方法来进行身份验证。另外需要设置生产者的配置信息,如话题、超时等。

Pulsar的使用

Pulsar提供了Producer和Consumer两种基本的组件实现实时数据处理。Producer用于将数据发送到指定的Pulsar topic,而Consumer则从topic中消费数据。下面我们将详细介绍如何使用这两种组件来完成实时数据处理。

Producer

首先,我们通过以下步骤来创建一个Producer实例:

// 导入命名空间
use ApachePulsarClientBuilder;

// 创建Pulsar client实例
$clientBuilder = new ClientBuilder();
$client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build();

// 创建Producer对象
$producer = $client->createProducer(
    [
        'topic' => 'your-topic',
    ]
);
로그인 후 복사

在创建生产者时,需要设置生产者所属的Pulsar topic。此外,还有其他可选项,如“producerName”、“initialSequenceId”、“sendTimeout”等。这些选项可以根据需要进行配置。

下面我们来看一下如何向Pulsar topic发送消息:

// 对Pulsar topic发送消息
$result = $producer->send('your-message');
로그인 후 복사

send方法返回一个MessageId对象。如果消息之前已经发送过,则返回对应的MessageId。如果消息发送失败,则抛出PulsarClientException异常。

Consumer

与生产者一样,Pulsar Consumer的创建也是分为多个步骤。

// 导入命名空间
use ApachePulsarClientBuilder;

// 创建Pulsar client实例
$clientBuilder = new ClientBuilder();
$client = $clientBuilder->serviceUrl('pulsar://localhost:6650')->build();

// 创建Consumer对象
$consumer = $client->subscribe(
    [
        'topic' => 'your-topic',
        'subscriptionName' => 'your-subscription-name',
    ]
);
로그인 후 복사

在创建Consumer时,我们需要设置订阅的Pulsar topic和订阅名称。另外有其他可选项,如设置“receiverQueueSize”、“ackTimeout”、“subscriptionType”等。

下面我们将看到如何从指定的Pulsar topic中获取消息:

// 从topic中消费消息
$message = $consumer->receive();

// 对消息进行处理
echo 'Received message with ID: ' . $message->getMessageId() . PHP_EOL;

// markAsReceived表示通知Pulsar这条消息已经被处理
$consumer->acknowledge($message);
로그인 후 복사

在调用receive()方法时,程序会保持等待状态,直到有消息从指定的Pulsar topic中返回。当有消息返回时,程序会继续执行,对消息进行处理。

调用acknowledge()方法后,Pulsar才会将消息从该订阅的队列中删除。如果没有调用acknowledge()

Pulsar 사용

Pulsar는 실시간 데이터 처리를 달성하기 위해 생산자와 소비자라는 두 가지 기본 구성 요소를 제공합니다. 생산자는 지정된 Pulsar 주제로 데이터를 보내는 데 사용되는 반면 소비자는 주제의 데이터를 소비합니다. 아래에서는 이 두 구성요소를 사용하여 실시간 데이터 처리를 완료하는 방법을 자세히 소개합니다.

Producer

먼저 다음 단계를 통해 Producer 인스턴스를 생성합니다.

rrreee

Producer를 생성할 때 해당 프로듀서가 속한 Pulsar 주제를 설정해야 합니다. 또한 "producerName", "initialSequenceId", "sendTimeout" 등과 같은 다른 옵션도 있습니다. 이러한 옵션은 필요에 따라 구성할 수 있습니다.

Pulsar 주제에 메시지를 보내는 방법을 살펴보겠습니다. 🎜rrreee🎜 send 메서드는 MessageId 개체를 반환합니다. 이전에 메시지가 전송된 경우 해당 MessageId가 반환됩니다. 메시지 전송에 실패하면 PulsarClientException 예외가 발생합니다. 🎜

Consumer

🎜Producer와 마찬가지로 Pulsar Consumer 생성도 여러 단계로 나누어집니다. 🎜rrreee🎜Consumer를 생성할 때 구독하는 Pulsar 주제와 구독 이름을 설정해야 합니다. "receiverQueueSize", "ackTimeout", "subscriptionType" 등을 설정하는 것과 같은 다른 옵션이 있습니다. 🎜🎜 아래에서는 지정된 Pulsar 주제에서 메시지를 가져오는 방법을 살펴보겠습니다. 🎜rrreee🎜 receive() 메서드를 호출하면 프로그램은 지정된 Pulsar 주제에서 메시지가 있을 때까지 대기 상태로 유지됩니다. Pulsar 주제 반환. 메시지가 반환되면 프로그램은 계속해서 메시지를 실행하고 처리합니다. 🎜🎜Pulsar는 acknowledge() 메서드를 호출한 후에만 구독 대기열에서 메시지를 삭제합니다. acknowledge() 메서드가 호출되지 않으면 메시지는 만료될 때까지(기본값은 1시간) 대기열에 남아 있습니다. 🎜🎜요약🎜🎜이번 글에서는 PHP를 사용해 오픈소스 Pulsar의 실시간 데이터 처리를 구현하는 방법을 소개했습니다. 우리는 Pulsar 환경 설정부터 시작하여 Pulsar의 생산자 및 소비자 구성 요소를 사용하여 실시간 데이터 처리를 구현하는 방법을 단계별로 설명했습니다. 🎜🎜Pulsar는 계층화된 아키텍처를 채택하고 대규모 실시간 데이터 처리를 잘 지원할 수 있습니다. 현재 Pulsar는 Alibaba, Meituan, Baidu 등과 같은 많은 인터넷 회사에서 사용되고 있습니다. 🎜🎜이 기사에 소개된 내용을 연구하면 실시간 데이터 처리에서 PHP와 Pulsar를 사용하여 보다 효율적이고 우아하게 작업하는 방법을 이미 이해할 수 있다고 믿습니다. 🎜

위 내용은 PHP는 오픈 소스 Pulsar 실시간 데이터 처리를 구현합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿