이 기사는 주로 PHP 메시지 대기열에 대한 자세한 설명을 공유합니다. 먼저 메시지 대기열이 무엇인지 이해하겠습니다.
1. 메시지 큐란
메시지 큐(영어: Message queue)는 프로세스 간 통신 또는 동일한 프로세스의 서로 다른 스레드 간의 통신 방법입니다
2. 메시지 큐 기술을 사용하는 이유 분산 애플리케이션 간에 정보를 교환하는 기술입니다. 메시지 큐는 메모리나 디스크에 상주할 수 있으며 애플리케이션이 메시지를 읽을 때까지 메시지를 저장합니다. 메시지 큐를 사용하면 애플리케이션이 서로의 위치를 모르거나 계속하기 전에 수신 프로그램이 메시지를 수신할 때까지 기다리지 않고도 독립적으로 실행할 수 있습니다.
3. 메시지 큐를 사용하는 경우
먼저 메시지 큐와 원격 프로시저 호출의 차이점을 알아야 합니다. 많은 독자들이 저에게 상담해 본 결과, 그들에게 필요한 것은 메시지가 아닌 RPC(Remote Procedure Call)라는 것을 알았습니다. 대기줄.
메시지 대기열은 동기식 또는 비동기식으로 구현될 수 있습니다. 일반적으로 메시지 대기열은 비동기식으로 사용되며 원격 프로시저 호출은 대부분 동기식 메서드를 사용합니다.
MQ와 RPC의 차이점은 무엇인가요? MQ는 일반적으로 사용자가 정의하고 저장 및 전달을 구현하는 불규칙한 프로토콜을 제공하는 반면, RPC는 일반적으로 전용 프로토콜이며 호출 프로세스가 결과를 반환합니다.
4. 메시지 대기열을 사용하는 경우
동기화가 필요한 경우 PRC(원격 프로시저 호출)가 더 적합합니다.
비동기 요구의 경우 메시지 대기열이 더 적합합니다.
현재 많은 메시지 대기열 소프트웨어는 RPC 기능도 지원하며 많은 RPC 시스템도 비동기식으로 호출할 수 있습니다.
메시지 대기열은 다음 요구 사항을 구현하는 데 사용됩니다.
저장 및 전달
분산 트랜잭션
게시 및 구독
콘텐츠 기반 라우팅
지점 간 연결
5. 메시지 처리 책임자는 누구입니까? queue
작은 경우 일반적인 관행 프로젝트 팀은 메시지 푸시, 수신 및 처리를 포함하여 한 사람이 이를 구현하도록 할 수 있습니다. 팀 규모가 큰 경우 일반적으로 메시지 프로토콜을 정의한 다음 각자 자신의 부분을 개발합니다. 예를 들어 한 팀은 푸시 프로토콜 부분을 작성하고 다른 팀은 수신 및 처리 부분을 작성합니다.
그럼 메시지 대기열 프레이밍에 대해 이야기해 볼까요?
프레임워크에는 여러 가지 이점이 있습니다.
개발자는 메시지 대기열 인터페이스를 배울 필요가 없습니다.
개발자는 메시지 푸시 및 수신에 신경 쓸 필요가 없습니다.
개발자는 통합 API를 통해 메시지를 푸시합니다.
개발자는 비즈니스 구현에 집중합니다. 논리 함수
6. 메시지 큐 프레임워크 구현 방법
다음은 저자가 개발한 SOA 프레임워크입니다. 이 프레임워크는 SOAP, RESTful, AMQP(RabbitMQ)라는 세 가지 인터페이스를 제공합니다. 이 프레임워크를 사용하면 XML-RPC, ZeroMQ 등에 대한 지원을 추가하는 등 쉽게 확장할 수 있습니다.
https://github.com/netkiller/SOA
이 문서에서는 메시지 대기열 프레임워크 부분에 대해서만 설명합니다.
6.1. 데몬 프로세스
메시지 큐 프레임워크는 로컬 애플리케이션(명령줄 프로그램)이므로 데몬 프로세스를 구현해야 합니다.
https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php
각 인스턴스는 인스턴스화를 위해 세 가지 매개변수($queueName = 'queue name')를 제공해야 합니다. $exchangeName = 'Exchange 이름', $routeKey = 'Route'
$daemon = new FrameworkRabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email')
데몬 프로세스에는 다음이 필요합니다. 사용하려면 루트 사용자로 실행하세요. 실행 후 일반 사용자로 전환되어 프로세스가 중지될 때 사용할 프로세스 ID 파일을 생성합니다.
데몬 코어 코드 https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php
6.2. 메시지 큐 프로토콜
메시지 프로토콜은 배열이며 배열을 직렬화하거나 변환합니다. 메시지 대기열 서버에 대한 JSON 푸시, 여기서는 json 형식 프로토콜이 사용됩니다.
$msg = array( 'Namespace'=>'namespace', "Class"=>"Email", "Method"=>"smtp", "Param" => array( $mail, $subject, $message, null ) );
직렬화된 프로토콜
{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}
使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好。
6.3. 消息队列处理
消息队列处理核心代码
https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php
所以消息的处理在下面一段代码中进行
$this->queue->consume(function($envelope, $queue) { $speed = microtime(true); $msg = $envelope->getBody(); $result = $this->loader($msg); $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 //$this->logging->info(''.$msg.' '.$result) $this->logging->debug('Protocol: '.$msg.' '); $this->logging->debug('Result: '. $result.' '); $this->logging->debug('Time: '. (microtime(true) - $speed) .''); });
public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果。
Time 可以输出程序运行所花费的时间,对于后期优化十分有用。
提示
loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列。
6.4. 测试
测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php
'192.168.4.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' )); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $msg = array( 'Namespace'=>'namespace', "Class"=>"Email", "Method"=>"smtp", "Param" => array( $mail, $subject, $message, null ) ); $exchange->publish(json_encode($msg), $routeKey); printf("[x] Sent %s \r\n", json_encode($msg)); $connection->disconnect();
这里只给出了少量测试与演示程序,如有疑问请到渎者群,或者公众号询问。
7. 多线程
上面消息队列 核心代码如下
$this->queue->consume(function($envelope, $queue) { $msg = $envelope->getBody(); $result = $this->loader($msg); $queue->ack($envelope->getDeliveryTag()); });
这段代码生产环境使用了半年,发现效率比较低。有些业务场入队非常快,但处理起来所花的时间就比较长,容易出现队列堆积现象。
增加多线程可能更有效利用硬件资源,提高业务处理能力。代码如下
<?php namespace framework; require_once( __DIR__.'/autoload.class.php' ); class RabbitThread extends \Threaded { private $queue; public $classspath; protected $msg; public function __construct($queue, $logging, $msg) { $this->classspath = __DIR__.'/../queue'; $this->msg = $msg; $this->logging = $logging; $this->queue = $queue; } public function run() { $speed = microtime(true); $result = $this->loader($this->msg); $this->logging->debug('Result: '. $result.' '); $this->logging->debug('Time: '. (microtime(true) - $speed) .''); } // private public function loader($msg = null){ $protocol = json_decode($msg,true); $namespace= $protocol['Namespace']; $class = $protocol['Class']; $method = $protocol['Method']; $param = $protocol['Param']; $result = null; $classspath = $this->classspath.'/'.$this->queue.'/'.$namespace.'/'.strtolower($class) . '.class.php'; if( is_file($classspath) ){ require_once($classspath); //$class = ucfirst(substr($request_uri, strrpos($request_uri, '/')+1)); if (class_exists($class)) { if(method_exists($class, $method)){ $obj = new $class; if (!$param){ $tmp = $obj->$method(); $result = json_encode($tmp); $this->logging->info($class.'->'.$method.'()'); }else{ $tmp = call_user_func_array(array($obj, $method), $param); $result = (json_encode($tmp)); $this->logging->info($class.'->'.$method.'("'.implode('","', $param).'")'); } }else{ $this->logging->error('Object '. $class. '->' . $method. ' is not exist.'); } }else{ $msg = sprintf("Object is not exist. (%s)", $class); $this->logging->error($msg); } }else{ $msg = sprintf("Cannot loading interface! (%s)", $classspath); $this->logging->error($msg); } return $result; } } class RabbitMQ { const loop = 10; protected $queue; protected $pool; public function __construct($queueName = '', $exchangeName = '', $routeKey = '') { $this->config = new \framework\Config('rabbitmq.ini'); $this->logfile = __DIR__.'/../log/rabbitmq.%s.log'; $this->logqueue = __DIR__.'/../log/queue.%s.log'; $this->logging = new \framework\log\Logging($this->logfile, $debug=true); //.H:i:s $this->queueName= $queueName; $this->exchangeName= $exchangeName; $this->routeKey= $routeKey; $this->pool = new \Pool($this->config->get('pool')['thread']); } public function main(){ $connection = new \AMQPConnection($this->config->get('rabbitmq')); try { $connection->connect(); if (!$connection->isConnected()) { $this->logging->exception("Cannot connect to the broker!" .PHP_EOL); } $this->channel = new \AMQPChannel($connection); $this->exchange = new \AMQPExchange($this->channel); $this->exchange->setName($this->exchangeName); $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $this->exchange->setFlags(AMQP_DURABLE); //持久�? $this->exchange->declareExchange(); $this->queue = new \AMQPQueue($this->channel); $this->queue->setName($this->queueName); $this->queue->setFlags(AMQP_DURABLE); //持久�? $this->queue->declareQueue(); $this->queue->bind($this->exchangeName, $this->routeKey); $this->queue->consume(function($envelope, $queue) { $msg = $envelope->getBody(); $this->logging->debug('Protocol: '.$msg.' '); //$result = $this->loader($msg); $this->pool->submit(new RabbitThread($this->queueName, new \framework\log\Logging($this->logqueue, $debug=true), $msg)); $queue->ack($envelope->getDeliveryTag()); }); $this->channel->qos(0,1); } catch(\AMQPConnectionException $e){ $this->logging->exception($e->__toString()); } catch(\Exception $e){ $this->logging->exception($e->__toString()); $connection->disconnect(); $this->pool->shutdown(); } } private function fault($tag, $msg){ $this->logging->exception($msg); throw new \Exception($tag.': '.$msg); } public function __destruct() { } }
相关推荐:
위 내용은 PHP 메시지 큐에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!