PHPメッセージキューの詳しい説明

小云云
リリース: 2023-03-22 11:18:01
オリジナル
3930 人が閲覧しました

この記事では主に PHP メッセージキューについて詳しく説明します。まず、メッセージキューとは何かを理解してください。

1. メッセージキューとは

メッセージキュー(英語: Message queue)は、プロセス間通信、または同じプロセスの異なるスレッド間の通信方法です

2. メッセージキューを使用する理由

分散 アプリケーション間で情報を交換する技術。メッセージ キューはメモリまたはディスク上に常駐でき、アプリケーションによって読み取られるまでメッセージを保存します。メッセージ キューを使用すると、アプリケーションは、互いの位置を認識したり、続行する前に受信側プログラムがメッセージを受信するのを待つことなく、独立して実行できます。

3. メッセージ キューを使用する場合

まず、メッセージ キューとリモート プロシージャ コールの違いを理解する必要があります。多くの読者が私に相談したところ、必要なのはメッセージではなく RPC (リモート プロシージャ コール) であることがわかりました。列。

メッセージ キューは同期または非同期で実装できます。通常、メッセージ キューは非同期で使用され、リモート プロシージャ コールでは主に同期メソッドが使用されます。

MQ と RPC の違いは何ですか? MQ は通常、ユーザーによって定義され、ストアと転送を実装する不規則なプロトコルを提供します。一方、RPC は通常、専用のプロトコルであり、呼び出しプロセスが結果を返します。

4. メッセージ キューを使用する場合

同期のニーズには、リモート プロシージャ コール (PRC) が適しています。

非同期のニーズには、メッセージキューの方が適しています。

現在、多くのメッセージ キュー ソフトウェアも RPC 機能をサポートしており、多くの RPC システムを非同期で呼び出すこともできます。

メッセージ キューは、次の要件を実装するために使用されます

ストアと転送

分散トランザクション

パブリッシュとサブスクライブ

コンテンツ ベースのルーティング

ポイントツーポイント接続

5. メッセージの処理の責任者キュー

通常の方法 (規模が小さい場合) プロジェクト チームは、メッセージのプッシュ、受信、処理を含め、1 人で実装できます。チームが大規模な場合は、通常、メッセージ プロトコルを定義してから、それぞれが独自の部分を開発します。たとえば、あるチームがプッシュ プロトコル部分の作成を担当し、別のチームが受信および処理部分の作成を担当します。

それでは、メッセージキューのフレーム化について話しましょう?

フレームワークにはいくつかの利点があります:

開発者はメッセージキューインターフェイスを学ぶ必要がありません

開発者はメッセージのプッシュと受信を気にする必要がありません

開発者は統合されたAPIを通じてメッセージをプッシュします

開発者はビジネスの実装に集中しますロジック関数

6. メッセージ キュー フレームワークの実装方法

以下は、著者が開発した SOA フレームワークです。このフレームワークは、SOAP、RESTful、AMQP (RabbitMQ) の概念を理解している場合に提供されます。このフレームワークは、XML-RPC、ZeroMQ などのサポートを追加するなど、さらに簡単に拡張できます。

https://github.com/netkiller/SOA

この記事では、メッセージ キュー フレームワークの部分についてのみ説明します。

6.1. デーモンプロセス

メッセージキューフレームワークはローカルアプリケーション(コマンドラインプログラム)であり、バックグラウンドで実行できるようにするには、デーモンプロセスを実装する必要があります。

https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php

各インスタンスは、インスタンス化のために 3 つのパラメーターを指定する必要があります。$queueName = 'queue name'、 $exchangeName = 'Exchange 名', $routeKey = 'Route'

$daemon = new FrameworkRabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email');使用する場合 root ユーザーで実行します。 実行後は一般ユーザーに切り替わり、プロセス停止時に使用するプロセス ID ファイルが作成されます。

デーモンコアコード https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php

6.2. メッセージキュープロトコル

メッセージプロトコルは配列であり、配列をシリアル化または変換しますメッセージ キュー サーバーへの JSON プッシュ。ここでは json 形式のプロトコルが使用されます。

$msg = array(
'Namespace'=>'namespace',
"Class"=>"Email",
"Method"=>"smtp",
"Param" => array(
$mail, $subject, $message, null
)
);
ログイン後にコピー

シリアル化されたプロトコル

{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}

使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好。

6.3. 消息队列处理

消息队列处理核心代码

https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php

所以消息的处理在下面一段代码中进行

$this->queue->consume(function($envelope, $queue) {
$speed = microtime(true);
$msg = $envelope->getBody();
$result = $this->loader($msg);
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
//$this->logging->info(''.$msg.' '.$result)
$this->logging->debug('Protocol: '.$msg.' ');
$this->logging->debug('Result: '. $result.' ');
$this->logging->debug('Time: '. (microtime(true) - $speed) .'');
});
ログイン後にコピー

public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果。

Time 可以输出程序运行所花费的时间,对于后期优化十分有用。

提示

loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列。

6.4. 测试

测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php

 '192.168.4.1', 
'port' => '5672', 
'vhost' => '/', 
'login' => 'guest', 
'password' => 'guest'
));
$connection->connect() or die("Cannot connect to the broker!\n");
 
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$msg = array(
'Namespace'=>'namespace',
"Class"=>"Email",
"Method"=>"smtp",
"Param" => array(
$mail, $subject, $message, null
)
);
$exchange->publish(json_encode($msg), $routeKey);
printf("[x] Sent %s \r\n", json_encode($msg));
$connection->disconnect();
ログイン後にコピー

这里只给出了少量测试与演示程序,如有疑问请到渎者群,或者公众号询问。

7. 多线程

上面消息队列 核心代码如下

$this->queue->consume(function($envelope, $queue) {
$msg = $envelope->getBody();
$result = $this->loader($msg);
$queue->ack($envelope->getDeliveryTag());
});
ログイン後にコピー

这段代码生产环境使用了半年,发现效率比较低。有些业务场入队非常快,但处理起来所花的时间就比较长,容易出现队列堆积现象。

增加多线程可能更有效利用硬件资源,提高业务处理能力。代码如下

<?php
namespace framework;
require_once( __DIR__.&#39;/autoload.class.php&#39; );
class RabbitThread extends \Threaded {
private $queue;
public $classspath;
protected $msg;
public function __construct($queue, $logging, $msg) {
$this->classspath = __DIR__.&#39;/../queue&#39;;
$this->msg = $msg;
$this->logging = $logging;
$this->queue = $queue;
}
public function run() {
$speed = microtime(true);
$result = $this->loader($this->msg);
$this->logging->debug(&#39;Result: &#39;. $result.&#39; &#39;);
$this->logging->debug(&#39;Time: &#39;. (microtime(true) - $speed) .&#39;&#39;);
}
// private
public  function loader($msg = null){
$protocol = json_decode($msg,true);
$namespace= $protocol[&#39;Namespace&#39;];
$class = $protocol[&#39;Class&#39;];
$method = $protocol[&#39;Method&#39;];
$param = $protocol[&#39;Param&#39;];
$result = null;
$classspath = $this->classspath.&#39;/&#39;.$this->queue.&#39;/&#39;.$namespace.&#39;/&#39;.strtolower($class)  . &#39;.class.php&#39;;
if( is_file($classspath) ){
require_once($classspath);
//$class = ucfirst(substr($request_uri, strrpos($request_uri, &#39;/&#39;)+1));
if (class_exists($class)) {
if(method_exists($class, $method)){
$obj = new $class;
if (!$param){
$tmp = $obj->$method();
$result = json_encode($tmp);
$this->logging->info($class.&#39;->&#39;.$method.&#39;()&#39;);
}else{
$tmp = call_user_func_array(array($obj, $method), $param);
$result = (json_encode($tmp));
$this->logging->info($class.&#39;->&#39;.$method.&#39;("&#39;.implode(&#39;","&#39;, $param).&#39;")&#39;);
}
}else{
$this->logging->error(&#39;Object &#39;. $class. &#39;->&#39; . $method. &#39; is not exist.&#39;);
}
}else{
$msg = sprintf("Object is not exist. (%s)", $class);
$this->logging->error($msg);
}
}else{
$msg = sprintf("Cannot loading interface! (%s)", $classspath);
$this->logging->error($msg);
}
return $result;
}
}
class RabbitMQ {
const loop = 10;
protected $queue;
protected $pool;
public function __construct($queueName = &#39;&#39;, $exchangeName = &#39;&#39;, $routeKey = &#39;&#39;) {
$this->config = new \framework\Config(&#39;rabbitmq.ini&#39;);
$this->logfile = __DIR__.&#39;/../log/rabbitmq.%s.log&#39;;
$this->logqueue = __DIR__.&#39;/../log/queue.%s.log&#39;;
$this->logging = new \framework\log\Logging($this->logfile, $debug=true);
 //.H:i:s
$this->queueName= $queueName;
$this->exchangeName= $exchangeName;
$this->routeKey= $routeKey; 
$this->pool = new \Pool($this->config->get(&#39;pool&#39;)[&#39;thread&#39;]);
}
public function main(){
$connection = new \AMQPConnection($this->config->get(&#39;rabbitmq&#39;));
try {
$connection->connect();
if (!$connection->isConnected()) {
$this->logging->exception("Cannot connect to the broker!"
.PHP_EOL);
}
$this->channel = new \AMQPChannel($connection);
$this->exchange = new \AMQPExchange($this->channel);
$this->exchange->setName($this->exchangeName);
$this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$this->exchange->setFlags(AMQP_DURABLE); //持久�?
$this->exchange->declareExchange();
$this->queue = new \AMQPQueue($this->channel);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE); //持久�?
$this->queue->declareQueue();
$this->queue->bind($this->exchangeName, $this->routeKey);
$this->queue->consume(function($envelope, $queue) {
$msg = $envelope->getBody();
$this->logging->debug(&#39;Protocol: &#39;.$msg.&#39; &#39;);
//$result = $this->loader($msg);
$this->pool->submit(new RabbitThread($this->queueName, 
new \framework\log\Logging($this->logqueue, $debug=true), $msg));
$queue->ack($envelope->getDeliveryTag()); 
});
$this->channel->qos(0,1);
}
catch(\AMQPConnectionException $e){
$this->logging->exception($e->__toString());
}
catch(\Exception $e){
$this->logging->exception($e->__toString());
$connection->disconnect();
$this->pool->shutdown();
}
}
private function fault($tag, $msg){
$this->logging->exception($msg);
throw new \Exception($tag.&#39;: &#39;.$msg);
}
public function __destruct() {
}
}
ログイン後にコピー

相关推荐:

PHP实现消息队列

php实现消息队列类实例分享

什么是消息队列?在Linux中使用消息队列

以上がPHPメッセージキューの詳しい説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート
私たちについて 免責事項 Sitemap
PHP中国語ウェブサイト:福祉オンライン PHP トレーニング,PHP 学習者の迅速な成長を支援します!