최신 인터넷 애플리케이션의 지속적인 개발로 인해 점점 더 많은 애플리케이션에서 대량의 데이터 통신을 처리해야 합니다. 이러한 데이터 통신을 처리하는 전통적인 방법은 폴링 또는 차단 I/O를 사용하는 것이지만 이러한 방법은 매우 비효율적이므로 더 이상 최신 애플리케이션의 요구 사항을 충족할 수 없습니다. 이러한 문제를 해결하기 위해 업계에서는 메시지 큐 및 분산 시스템이라는 기술을 개발해 왔습니다.
메시지 대기열 및 배포 시스템에서 메시지 생산자는 메시지를 대기열로 보내고, 메시지 소비자는 대기열에서 메시지를 가져와 해당 작업을 수행합니다. 이 접근 방식은 I/O 폴링 및 차단과 같은 문제를 피할 수 있으므로 데이터 통신의 효율성을 크게 향상시킬 수 있습니다.
이 기사에서는 PHP와 Apache Kafka 통합을 사용하여 효율적인 메시지 대기열 및 배포를 달성하는 방법에 대해 설명합니다.
Apache Kafka 소개
Apache Kafka는 처리량이 높고 지연 시간이 짧으며 확장 가능한 분산 메시징 시스템입니다. 대량의 메시지를 처리하고 더 높은 로드를 수용할 수 있도록 수평으로 확장할 수 있습니다. Apache Kafka의 주요 구성 요소는 다음과 같습니다.
Apache Kafka와 통합된 PHP
Apache Kafka를 사용하려면 PHP용 Kafka 확장 프로그램을 사용해야 합니다. 이 확장은 PHP가 Kafka를 작동하는 데 필요한 모든 API를 제공합니다.
먼저 PECL에서 설치할 수 있는 Kafka 확장을 설치해야 합니다.
pecl install kafka
확장을 설치한 후 사용을 시작할 수 있습니다. 다음은 PHP와 Apache Kafka를 사용한 메시지 생성 및 소비의 간단한 예입니다.
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka生产者 $producer = new RdKafkaProducer(); $producer->setLogLevel(LOG_DEBUG); $producer->addBrokers($brokers); // 创建一个Kafka消费者 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaConsumer($conf); $consumer->addBrokers($brokers); // 生产消息 $topicProducer = $producer->newTopic($topic); for ($i = 0; $i < 10; $i++) { $topicProducer->produce(RD_KAFKA_PARTITION_UA, 0, 'Message ' . $i); } // 消费消息 $topicConsumer = $consumer->newTopic($topic); $topicConsumer->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $message = $topicConsumer->consume(0, 1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo $message->payload . PHP_EOL; }
이 예에서는 먼저 Kafka 생산자와 Kafka 소비자를 만듭니다. 그런 다음 생산자에서는 지정된 주제로 10개의 메시지를 보냈고 소비자에서는 지정된 주제의 메시지를 소비하고 해당 내용을 출력했습니다.
이 시점에서 우리는 PHP와 Apache Kafka를 사용하여 간단한 메시지 생성 및 소비를 성공적으로 구현했습니다. 다음으로 PHP와 Apache Kafka를 사용하여 고급 기능을 구현하는 방법에 대해 설명하겠습니다.
고급 애플리케이션 예시
실제 애플리케이션에서는 일반적으로 다음과 같은 일부 고급 기능을 구현해야 합니다.
여기에서는 이러한 기능을 구현하는 방법에 대해 설명합니다.
메시지 배포
실제 애플리케이션에서는 일반적으로 메시지 흐름을 제어해야 합니다. 예를 들어 특정 소비자만 특정 메시지를 소비하기를 원할 수 있습니다. 이 기능을 달성하기 위해 각 소비자에 대한 대기열을 만든 다음 특정 메시지를 특정 대기열에 할당할 수 있습니다.
다음은 두 명의 소비자를 사용하여 두 가지 다른 작업을 소비하는 예입니다.
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者组 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaKafkaConsumer($conf); $consumer->subscribe([$topic]); // 创建两个Kafka生产者,一个生产者用于向消费者1发送消息,另一个生产者用于向消费者2发送消息 $producer1 = new RdKafkaProducer(); $producer1->addBrokers($brokers); $producer1Topic = $producer1->newTopic($topic . '_1'); $producer2 = new RdKafkaProducer(); $producer2->addBrokers($brokers); $producer2Topic = $producer2->newTopic($topic . '_2'); // 消费消息 while (true) { $message = $consumer->consume(1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; // 根据消息内容分配给不同的生产者 if ($message->payload === 'task1') { $producer1Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload); } elseif ($message->payload === 'task2') { $producer2Topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->payload); } }
이 예에서는 두 명의 생산자를 사용하여 두 명의 다른 소비자에게 메시지를 배포합니다. 소비자가 메시지를 받으면 메시지 내용을 기반으로 특정 생산자에게 메시지를 할당할 수 있습니다. 이 방법을 사용하면 메시지 흐름을 제어하고 메시지의 중복 처리를 방지할 수 있습니다.
Consumer Group
일반적인 Kafka Consumer에서는 같은 그룹에 속한 여러 Consumer가 동일한 주제를 함께 소비하고 동일한 메시지를 받게 됩니다. 이는 Kafka가 자동으로 파티션의 균형을 맞추고 각 파티션이 한 명의 소비자에 의해서만 처리되도록 하기 때문입니다.
PHP에서는 group.id를 사용하여 소비자를 그룹화하여 소비자 그룹의 기능을 구현할 수 있습니다.
다음은 동일한 그룹 내의 메시지를 병렬로 처리할 수 있는 Kafka 소비자 그룹의 예입니다.
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者组 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $conf->set('metadata.broker.list', $brokers); $conf->set('enable.auto.commit', 'false'); $consumer = new RdKafkaKafkaConsumer($conf); // 添加需要订阅的topic $consumer->subscribe([$topic]); // 处理消息 while (true) { $message = $consumer->consume(1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; // 处理完消息后手动提交offset $consumer->commit(); }
이 예에서는 Kafka 소비자 그룹을 생성하고 이를 구독해야 하는 주제를 추가합니다. 그런 다음 동일한 그룹 내의 메시지를 병렬로 처리할 수 있습니다.
참고: 소비자 그룹에서는 여러 소비자가 하나 이상의 파티션을 함께 소비합니다. 데이터를 소비할 때 동일한 데이터를 처리하는 멀티스레딩 문제에 주의해야 합니다.
오프셋 구성
Kafka에서는 각 파티션이 독립적인 오프셋을 갖습니다. 소비자는 파티션에서 읽는 위치와 읽는 메시지를 제어할 수 있습니다. 소비자는 마지막 메시지나 최신 메시지부터 읽기 시작할 수 있습니다.
PHP에서는 오프셋을 사용하여 메시지 읽기 위치를 제어할 수 있습니다. 다음은 오프셋 구성의 예입니다.
<?php $brokers = 'kafka:9092'; // Kafka集群地址 $topic = 'test'; // Topic名称 // 创建一个Kafka消费者 $conf = new RdKafkaConf(); $conf->set('group.id', 'myGroup'); $consumer = new RdKafkaKafkaConsumer($conf); // 订阅topic $topicConf = new RdKafkaTopicConf(); $topicConf->set('auto.offset.reset', 'earliest'); $topic = $consumer->newTopic($topic, $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); // 消费消息 while (true) { $message = $topic->consume(0, 1000); if (null === $message) { continue; } if ($message->err) { throw new Exception('Error occurred while consuming message'); } echo 'Received message: ' . $message->payload . PHP_EOL; }
이 예에서는 auto.offset.reset을 사용하여 오프셋 구성을 설정합니다. 이 구성은 소비자에게 가장 빠른 오프셋부터 메시지 소비를 시작하라고 지시합니다.
실제 응용 분야에서는 필요에 따라 다양한 오프셋을 구성할 수 있습니다. 예를 들어 생산자가 일부 메시지를 처리하지 못한 후 실패한 메시지가 이전에 처리된 지점부터 메시지 읽기를 다시 시작해야 할 수도 있습니다.
결론
이 기사에서는 PHP와 Apache Kafka 통합을 사용하여 효율적인 메시지 대기열 및 배포를 달성하는 방법에 대해 논의했습니다. 먼저 Apache Kafka의 기본 사항을 소개한 다음 PHP용 Kafka 확장을 사용하여 메시지 생성 및 소비를 구현하는 방법을 논의했습니다. 마지막으로 메시지 배포, 소비자 그룹, 오프셋 구성과 같은 일부 고급 기능을 구현하는 방법을 논의했습니다.
PHP와 Apache Kafka 통합을 사용하면 효율적인 메시지 대기열 및 배포를 구현하여 애플리케이션 응답 속도와 처리량을 향상시킬 수 있습니다. 대량의 데이터 통신을 처리해야 하는 애플리케이션을 개발하는 경우 Apache Kafka 및 PHP용 Kafka 확장이 좋은 선택일 수 있습니다.
위 내용은 효율적인 메시지 대기열 및 배포를 위한 PHP 및 Apache Kafka 통합의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!