백엔드 개발 PHP 튜토리얼 효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합

효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합

Jun 25, 2023 am 09:48 AM
php 메시지 대기열 apache kafka

최신 인터넷 애플리케이션의 지속적인 개발로 인해 점점 더 많은 애플리케이션에서 대량의 데이터 통신을 처리해야 합니다. 이러한 데이터 통신을 처리하는 전통적인 방법은 폴링 또는 차단 I/O를 사용하는 것이지만 이러한 방법은 매우 비효율적이므로 더 이상 최신 애플리케이션의 요구 사항을 충족할 수 없습니다. 이러한 문제를 해결하기 위해 업계에서는 메시지 큐 및 분산 시스템이라는 기술을 개발해 왔습니다.

메시지 대기열 및 배포 시스템에서 메시지 생산자는 메시지를 대기열로 보내고, 메시지 소비자는 대기열에서 메시지를 가져와 해당 작업을 수행합니다. 이 접근 방식은 I/O 폴링 및 차단과 같은 문제를 피할 수 있으므로 데이터 통신의 효율성을 크게 향상시킬 수 있습니다.

이 기사에서는 PHP와 Apache Kafka 통합을 사용하여 효율적인 메시지 대기열 및 배포를 달성하는 방법에 대해 설명합니다.

Apache Kafka 소개

Apache Kafka는 처리량이 높고 지연 시간이 짧으며 확장 가능한 분산 메시징 시스템입니다. 대량의 메시지를 처리하고 더 높은 로드를 수용할 수 있도록 수평으로 확장할 수 있습니다. Apache Kafka의 주요 구성 요소는 다음과 같습니다.

  1. Broker: Kafka 클러스터의 각 노드는 브로커이며 메시지 저장 및 전달을 담당합니다.
  2. 주제: 각 메시지는 메시지 생산 및 소비의 논리적 개념인 주제에 할당되어야 합니다.
  3. 파티션: 각 주제는 여러 파티션으로 나눌 수 있으며, 각 파티션에는 순서가 지정된 여러 메시지가 포함되어 있습니다.
  4. Producer: 메시지 생산자, 브로커에게 메시지를 보냅니다.
  5. Consumer: 메시지 소비자, 브로커의 메시지를 읽습니다.
  6. 소비자 그룹: 소비자 그룹은 하나 이상의 파티션에서 메시지를 공동으로 소비합니다.
  7. 오프셋: 메시지를 고유하게 식별하는 데 사용되는 메시지 번호입니다.

Apache Kafka와 통합된 PHP

Apache Kafka를 사용하려면 PHP용 Kafka 확장 프로그램을 사용해야 합니다. 이 확장은 PHP가 Kafka를 작동하는 데 필요한 모든 API를 제공합니다.

먼저 PECL에서 설치할 수 있는 Kafka 확장을 설치해야 합니다.

pecl install kafka
로그인 후 복사

확장을 설치한 후 사용을 시작할 수 있습니다. 다음은 PHP와 Apache Kafka를 사용한 메시지 생성 및 소비의 간단한 예입니다.

<?php
$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka生产者
$producer = new RdKafkaProducer();
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers($brokers);

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaConsumer($conf);
$consumer->addBrokers($brokers);

// 生产消息
$topicProducer = $producer->newTopic($topic);
for ($i = 0; $i < 10; $i++) {
    $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i);
}

// 消费消息
$topicConsumer = $consumer->newTopic($topic);
$topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
    $message = $topicConsumer->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }
    echo $message->payload . PHP_EOL;
}
로그인 후 복사

이 예에서는 먼저 Kafka 생산자와 Kafka 소비자를 만듭니다. 그런 다음 생산자에서는 지정된 주제로 10개의 메시지를 보냈고 소비자에서는 지정된 주제의 메시지를 소비하고 해당 내용을 출력했습니다.

이 시점에서 우리는 PHP와 Apache Kafka를 사용하여 간단한 메시지 생성 및 소비를 성공적으로 구현했습니다. 다음으로 PHP와 Apache Kafka를 사용하여 고급 기능을 구현하는 방법에 대해 설명하겠습니다.

고급 애플리케이션 예시

실제 애플리케이션에서는 일반적으로 다음과 같은 일부 고급 기능을 구현해야 합니다.

  1. 메시지 배포: 지정된 소비자에게 메시지를 보냅니다.
  2. 소비자 그룹: 여러 소비자가 하나 이상의 주제에 대한 메시지를 공동으로 소비할 수 있습니다.
  3. 오프셋 구성: 메시지를 읽는 위치를 제어할 수 있습니다.

여기에서는 이러한 기능을 구현하는 방법에 대해 설명합니다.

메시지 배포

실제 애플리케이션에서는 일반적으로 메시지 흐름을 제어해야 합니다. 예를 들어 특정 소비자만 특정 메시지를 소비하기를 원할 수 있습니다. 이 기능을 달성하기 위해 각 소비자에 대한 대기열을 만든 다음 특정 메시지를 특정 대기열에 할당할 수 있습니다.

다음은 두 명의 소비자를 사용하여 두 가지 다른 작업을 소비하는 예입니다.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);
$consumer->subscribe([$topic]);

// 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息
$producer1 = new RdKafkaProducer();
$producer1->addBrokers($brokers);
$producer1Topic = $producer1->newTopic($topic . '_1');

$producer2 = new RdKafkaProducer();
$producer2->addBrokers($brokers);
$producer2Topic = $producer2->newTopic($topic . '_2');

// 消费消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 根据消息内容分配给不同的生产者
    if ($message->payload === 'task1') {
        $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    } elseif ($message->payload === 'task2') {
        $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload);
    }
}
로그인 후 복사

이 예에서는 두 명의 생산자를 사용하여 두 명의 다른 소비자에게 메시지를 배포합니다. 소비자가 메시지를 받으면 메시지 내용을 기반으로 특정 생산자에게 메시지를 할당할 수 있습니다. 이 방법을 사용하면 메시지 흐름을 제어하고 메시지의 중복 처리를 방지할 수 있습니다.

Consumer Group

일반적인 Kafka Consumer에서는 같은 그룹에 속한 여러 Consumer가 동일한 주제를 함께 소비하고 동일한 메시지를 받게 됩니다. 이는 Kafka가 자동으로 파티션의 균형을 맞추고 각 파티션이 한 명의 소비자에 의해서만 처리되도록 하기 때문입니다.

PHP에서는 group.id를 사용하여 소비자를 그룹화하여 소비자 그룹의 기능을 구현할 수 있습니다.

다음은 동일한 그룹 내의 메시지를 병렬로 처리할 수 있는 Kafka 소비자 그룹의 예입니다.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者组
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$conf->set('metadata.broker.list', $brokers);
$conf->set('enable.auto.commit', 'false');
$consumer = new RdKafkaKafkaConsumer($conf);

// 添加需要订阅的topic
$consumer->subscribe([$topic]);

// 处理消息
while (true) {
    $message = $consumer->consume(1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;

    // 处理完消息后手动提交offset
    $consumer->commit();
}
로그인 후 복사

이 예에서는 Kafka 소비자 그룹을 생성하고 이를 구독해야 하는 주제를 추가합니다. 그런 다음 동일한 그룹 내의 메시지를 병렬로 처리할 수 있습니다.

참고: 소비자 그룹에서는 여러 소비자가 하나 이상의 파티션을 함께 소비합니다. 데이터를 소비할 때 동일한 데이터를 처리하는 멀티스레딩 문제에 주의해야 합니다.

오프셋 구성

Kafka에서는 각 파티션이 독립적인 오프셋을 갖습니다. 소비자는 파티션에서 읽는 위치와 읽는 메시지를 제어할 수 있습니다. 소비자는 마지막 메시지나 최신 메시지부터 읽기 시작할 수 있습니다.

PHP에서는 오프셋을 사용하여 메시지 읽기 위치를 제어할 수 있습니다. 다음은 오프셋 구성의 예입니다.

<?php

$brokers = 'kafka:9092';    // Kafka集群地址
$topic = 'test';            // Topic名称

// 创建一个Kafka消费者
$conf = new RdKafkaConf();
$conf->set('group.id', 'myGroup');
$consumer = new RdKafkaKafkaConsumer($conf);

// 订阅topic
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $consumer->newTopic($topic, $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

// 消费消息
while (true) {
    $message = $topic->consume(0, 1000);
    if (null === $message) {
        continue;
    }
    if ($message->err) {
        throw new Exception('Error occurred while consuming message');
    }

    echo 'Received message: ' . $message->payload . PHP_EOL;
}
로그인 후 복사

이 예에서는 auto.offset.reset을 사용하여 오프셋 구성을 설정합니다. 이 구성은 소비자에게 가장 빠른 오프셋부터 메시지 소비를 시작하라고 지시합니다.

실제 응용 분야에서는 필요에 따라 다양한 오프셋을 구성할 수 있습니다. 예를 들어 생산자가 일부 메시지를 처리하지 못한 후 실패한 메시지가 이전에 처리된 지점부터 메시지 읽기를 다시 시작해야 할 수도 있습니다.

결론

이 기사에서는 PHP와 Apache Kafka 통합을 사용하여 효율적인 메시지 대기열 및 배포를 달성하는 방법에 대해 논의했습니다. 먼저 Apache Kafka의 기본 사항을 소개한 다음 PHP용 Kafka 확장을 사용하여 메시지 생성 및 소비를 구현하는 방법을 논의했습니다. 마지막으로 메시지 배포, 소비자 그룹, 오프셋 구성과 같은 일부 고급 기능을 구현하는 방법을 논의했습니다.

PHP와 Apache Kafka 통합을 사용하면 효율적인 메시지 대기열 및 배포를 구현하여 애플리케이션 응답 속도와 처리량을 향상시킬 수 있습니다. 대량의 데이터 통신을 처리해야 하는 애플리케이션을 개발하는 경우 Apache Kafka 및 PHP용 Kafka 확장이 좋은 선택일 수 있습니다.

위 내용은 효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

Ubuntu 및 Debian용 PHP 8.4 설치 및 업그레이드 가이드 Ubuntu 및 Debian용 PHP 8.4 설치 및 업그레이드 가이드 Dec 24, 2024 pm 04:42 PM

PHP 8.4는 상당한 양의 기능 중단 및 제거를 통해 몇 가지 새로운 기능, 보안 개선 및 성능 개선을 제공합니다. 이 가이드에서는 Ubuntu, Debian 또는 해당 파생 제품에서 PHP 8.4를 설치하거나 PHP 8.4로 업그레이드하는 방법을 설명합니다.

CakePHP 날짜 및 시간 CakePHP 날짜 및 시간 Sep 10, 2024 pm 05:27 PM

cakephp4에서 날짜와 시간을 다루기 위해 사용 가능한 FrozenTime 클래스를 활용하겠습니다.

CakePHP 토론 CakePHP 토론 Sep 10, 2024 pm 05:28 PM

CakePHP는 PHP용 오픈 소스 프레임워크입니다. 이는 애플리케이션을 훨씬 쉽게 개발, 배포 및 유지 관리할 수 있도록 하기 위한 것입니다. CakePHP는 강력하고 이해하기 쉬운 MVC와 유사한 아키텍처를 기반으로 합니다. 모델, 뷰 및 컨트롤러 gu

CakePHP 파일 업로드 CakePHP 파일 업로드 Sep 10, 2024 pm 05:27 PM

파일 업로드 작업을 위해 양식 도우미를 사용할 것입니다. 다음은 파일 업로드의 예입니다.

CakePHP 유효성 검사기 만들기 CakePHP 유효성 검사기 만들기 Sep 10, 2024 pm 05:26 PM

컨트롤러에 다음 두 줄을 추가하면 유효성 검사기를 만들 수 있습니다.

PHP 개발을 위해 Visual Studio Code(VS Code)를 설정하는 방법 PHP 개발을 위해 Visual Studio Code(VS Code)를 설정하는 방법 Dec 20, 2024 am 11:31 AM

VS Code라고도 알려진 Visual Studio Code는 모든 주요 운영 체제에서 사용할 수 있는 무료 소스 코드 편집기 또는 통합 개발 환경(IDE)입니다. 다양한 프로그래밍 언어에 대한 대규모 확장 모음을 통해 VS Code는

CakePHP 빠른 가이드 CakePHP 빠른 가이드 Sep 10, 2024 pm 05:27 PM

CakePHP는 오픈 소스 MVC 프레임워크입니다. 이를 통해 애플리케이션 개발, 배포 및 유지 관리가 훨씬 쉬워집니다. CakePHP에는 가장 일반적인 작업의 과부하를 줄이기 위한 여러 라이브러리가 있습니다.

PHP에서 HTML/XML을 어떻게 구문 분석하고 처리합니까? PHP에서 HTML/XML을 어떻게 구문 분석하고 처리합니까? Feb 07, 2025 am 11:57 AM

이 튜토리얼은 PHP를 사용하여 XML 문서를 효율적으로 처리하는 방법을 보여줍니다. XML (Extensible Markup Language)은 인간의 가독성과 기계 구문 분석을 위해 설계된 다목적 텍스트 기반 마크 업 언어입니다. 일반적으로 데이터 저장 AN에 사용됩니다

See all articles