> 백엔드 개발 > PHP 튜토리얼 > PHP 메시지 큐에 대한 자세한 설명

PHP 메시지 큐에 대한 자세한 설명

小云云
풀어 주다: 2023-03-22 11:18:01
원래의
3985명이 탐색했습니다.

이 기사는 주로 PHP 메시지 대기열에 대한 자세한 설명을 공유합니다. 먼저 메시지 대기열이 무엇인지 이해하겠습니다.

1. 메시지 큐란

메시지 큐(영어: Message queue)는 프로세스 간 통신 또는 동일한 프로세스의 서로 다른 스레드 간의 통신 방법입니다

2. 메시지 큐 기술을 사용하는 이유 분산 애플리케이션 간에 정보를 교환하는 기술입니다. 메시지 큐는 메모리나 디스크에 상주할 수 있으며 애플리케이션이 메시지를 읽을 때까지 메시지를 저장합니다. 메시지 큐를 사용하면 애플리케이션이 서로의 위치를 ​​모르거나 계속하기 전에 수신 프로그램이 메시지를 수신할 때까지 기다리지 않고도 독립적으로 실행할 수 있습니다.

3. 메시지 큐를 사용하는 경우

먼저 메시지 큐와 원격 프로시저 호출의 차이점을 알아야 합니다. 많은 독자들이 저에게 상담해 본 결과, 그들에게 필요한 것은 메시지가 아닌 RPC(Remote Procedure Call)라는 것을 알았습니다. 대기줄.

메시지 대기열은 동기식 또는 비동기식으로 구현될 수 있습니다. 일반적으로 메시지 대기열은 비동기식으로 사용되며 원격 프로시저 호출은 대부분 동기식 메서드를 사용합니다.

MQ와 RPC의 차이점은 무엇인가요? MQ는 일반적으로 사용자가 정의하고 저장 및 전달을 구현하는 불규칙한 프로토콜을 제공하는 반면, RPC는 일반적으로 전용 프로토콜이며 호출 프로세스가 결과를 반환합니다.

4. 메시지 대기열을 사용하는 경우

동기화가 필요한 경우 PRC(원격 프로시저 호출)가 더 적합합니다.

비동기 요구의 경우 메시지 대기열이 더 적합합니다.

현재 많은 메시지 대기열 소프트웨어는 RPC 기능도 지원하며 많은 RPC 시스템도 비동기식으로 호출할 수 있습니다.

메시지 대기열은 다음 요구 사항을 구현하는 데 사용됩니다.

저장 및 전달

분산 트랜잭션

게시 및 구독

콘텐츠 기반 라우팅

지점 간 연결

5. 메시지 처리 책임자는 누구입니까? queue

작은 경우 일반적인 관행 프로젝트 팀은 메시지 푸시, 수신 및 처리를 포함하여 한 사람이 이를 구현하도록 할 수 있습니다. 팀 규모가 큰 경우 일반적으로 메시지 프로토콜을 정의한 다음 각자 자신의 부분을 개발합니다. 예를 들어 한 팀은 푸시 프로토콜 부분을 작성하고 다른 팀은 수신 및 처리 부분을 작성합니다.

그럼 메시지 대기열 프레이밍에 대해 이야기해 볼까요?

프레임워크에는 여러 가지 이점이 있습니다.

개발자는 메시지 대기열 인터페이스를 배울 필요가 없습니다.

개발자는 메시지 푸시 및 수신에 신경 쓸 필요가 없습니다.

개발자는 통합 API를 통해 메시지를 푸시합니다.

개발자는 비즈니스 구현에 집중합니다. 논리 함수

6. 메시지 큐 프레임워크 구현 방법

다음은 저자가 개발한 SOA 프레임워크입니다. 이 프레임워크는 SOAP, RESTful, AMQP(RabbitMQ)라는 세 가지 인터페이스를 제공합니다. ​이 프레임워크를 사용하면 XML-RPC, ZeroMQ 등에 대한 지원을 추가하는 등 쉽게 확장할 수 있습니다.

https://github.com/netkiller/SOA

이 문서에서는 메시지 대기열 프레임워크 부분에 대해서만 설명합니다.

6.1. 데몬 프로세스

메시지 큐 프레임워크는 로컬 애플리케이션(명령줄 프로그램)이므로 데몬 프로세스를 구현해야 합니다.

https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php

각 인스턴스는 인스턴스화를 위해 세 가지 매개변수($queueName = 'queue name')를 제공해야 합니다. $exchangeName = 'Exchange 이름', $routeKey = 'Route'

$daemon = new FrameworkRabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email')

데몬 프로세스에는 다음이 필요합니다. 사용하려면 루트 사용자로 실행하세요. 실행 후 일반 사용자로 전환되어 프로세스가 중지될 때 사용할 프로세스 ID 파일을 생성합니다.

데몬 코어 코드 https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php

6.2. 메시지 큐 프로토콜

메시지 프로토콜은 배열이며 배열을 직렬화하거나 변환합니다. 메시지 대기열 서버에 대한 JSON 푸시, 여기서는 json 형식 프로토콜이 사용됩니다.

$msg = array(
'Namespace'=>'namespace',
"Class"=>"Email",
"Method"=>"smtp",
"Param" => array(
$mail, $subject, $message, null
)
);
로그인 후 복사

직렬화된 프로토콜

{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}

使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好。

6.3. 消息队列处理

消息队列处理核心代码

https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php

所以消息的处理在下面一段代码中进行

$this->queue->consume(function($envelope, $queue) {
$speed = microtime(true);
$msg = $envelope->getBody();
$result = $this->loader($msg);
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
//$this->logging->info(''.$msg.' '.$result)
$this->logging->debug('Protocol: '.$msg.' ');
$this->logging->debug('Result: '. $result.' ');
$this->logging->debug('Time: '. (microtime(true) - $speed) .'');
});
로그인 후 복사

public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果。

Time 可以输出程序运行所花费的时间,对于后期优化十分有用。

提示

loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列。

6.4. 测试

测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php

 '192.168.4.1', 
'port' => '5672', 
'vhost' => '/', 
'login' => 'guest', 
'password' => 'guest'
));
$connection->connect() or die("Cannot connect to the broker!\n");
 
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$msg = array(
'Namespace'=>'namespace',
"Class"=>"Email",
"Method"=>"smtp",
"Param" => array(
$mail, $subject, $message, null
)
);
$exchange->publish(json_encode($msg), $routeKey);
printf("[x] Sent %s \r\n", json_encode($msg));
$connection->disconnect();
로그인 후 복사

这里只给出了少量测试与演示程序,如有疑问请到渎者群,或者公众号询问。

7. 多线程

上面消息队列 核心代码如下

$this->queue->consume(function($envelope, $queue) {
$msg = $envelope->getBody();
$result = $this->loader($msg);
$queue->ack($envelope->getDeliveryTag());
});
로그인 후 복사

这段代码生产环境使用了半年,发现效率比较低。有些业务场入队非常快,但处理起来所花的时间就比较长,容易出现队列堆积现象。

增加多线程可能更有效利用硬件资源,提高业务处理能力。代码如下

<?php
namespace framework;
require_once( __DIR__.&#39;/autoload.class.php&#39; );
class RabbitThread extends \Threaded {
private $queue;
public $classspath;
protected $msg;
public function __construct($queue, $logging, $msg) {
$this->classspath = __DIR__.&#39;/../queue&#39;;
$this->msg = $msg;
$this->logging = $logging;
$this->queue = $queue;
}
public function run() {
$speed = microtime(true);
$result = $this->loader($this->msg);
$this->logging->debug(&#39;Result: &#39;. $result.&#39; &#39;);
$this->logging->debug(&#39;Time: &#39;. (microtime(true) - $speed) .&#39;&#39;);
}
// private
public  function loader($msg = null){
$protocol = json_decode($msg,true);
$namespace= $protocol[&#39;Namespace&#39;];
$class = $protocol[&#39;Class&#39;];
$method = $protocol[&#39;Method&#39;];
$param = $protocol[&#39;Param&#39;];
$result = null;
$classspath = $this->classspath.&#39;/&#39;.$this->queue.&#39;/&#39;.$namespace.&#39;/&#39;.strtolower($class)  . &#39;.class.php&#39;;
if( is_file($classspath) ){
require_once($classspath);
//$class = ucfirst(substr($request_uri, strrpos($request_uri, &#39;/&#39;)+1));
if (class_exists($class)) {
if(method_exists($class, $method)){
$obj = new $class;
if (!$param){
$tmp = $obj->$method();
$result = json_encode($tmp);
$this->logging->info($class.&#39;->&#39;.$method.&#39;()&#39;);
}else{
$tmp = call_user_func_array(array($obj, $method), $param);
$result = (json_encode($tmp));
$this->logging->info($class.&#39;->&#39;.$method.&#39;("&#39;.implode(&#39;","&#39;, $param).&#39;")&#39;);
}
}else{
$this->logging->error(&#39;Object &#39;. $class. &#39;->&#39; . $method. &#39; is not exist.&#39;);
}
}else{
$msg = sprintf("Object is not exist. (%s)", $class);
$this->logging->error($msg);
}
}else{
$msg = sprintf("Cannot loading interface! (%s)", $classspath);
$this->logging->error($msg);
}
return $result;
}
}
class RabbitMQ {
const loop = 10;
protected $queue;
protected $pool;
public function __construct($queueName = &#39;&#39;, $exchangeName = &#39;&#39;, $routeKey = &#39;&#39;) {
$this->config = new \framework\Config(&#39;rabbitmq.ini&#39;);
$this->logfile = __DIR__.&#39;/../log/rabbitmq.%s.log&#39;;
$this->logqueue = __DIR__.&#39;/../log/queue.%s.log&#39;;
$this->logging = new \framework\log\Logging($this->logfile, $debug=true);
 //.H:i:s
$this->queueName= $queueName;
$this->exchangeName= $exchangeName;
$this->routeKey= $routeKey; 
$this->pool = new \Pool($this->config->get(&#39;pool&#39;)[&#39;thread&#39;]);
}
public function main(){
$connection = new \AMQPConnection($this->config->get(&#39;rabbitmq&#39;));
try {
$connection->connect();
if (!$connection->isConnected()) {
$this->logging->exception("Cannot connect to the broker!"
.PHP_EOL);
}
$this->channel = new \AMQPChannel($connection);
$this->exchange = new \AMQPExchange($this->channel);
$this->exchange->setName($this->exchangeName);
$this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$this->exchange->setFlags(AMQP_DURABLE); //持久�?
$this->exchange->declareExchange();
$this->queue = new \AMQPQueue($this->channel);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE); //持久�?
$this->queue->declareQueue();
$this->queue->bind($this->exchangeName, $this->routeKey);
$this->queue->consume(function($envelope, $queue) {
$msg = $envelope->getBody();
$this->logging->debug(&#39;Protocol: &#39;.$msg.&#39; &#39;);
//$result = $this->loader($msg);
$this->pool->submit(new RabbitThread($this->queueName, 
new \framework\log\Logging($this->logqueue, $debug=true), $msg));
$queue->ack($envelope->getDeliveryTag()); 
});
$this->channel->qos(0,1);
}
catch(\AMQPConnectionException $e){
$this->logging->exception($e->__toString());
}
catch(\Exception $e){
$this->logging->exception($e->__toString());
$connection->disconnect();
$this->pool->shutdown();
}
}
private function fault($tag, $msg){
$this->logging->exception($msg);
throw new \Exception($tag.&#39;: &#39;.$msg);
}
public function __destruct() {
}
}
로그인 후 복사

相关推荐:

PHP实现消息队列

php实现消息队列类实例分享

什么是消息队列?在Linux中使用消息队列

위 내용은 PHP 메시지 큐에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿