ホームページ > バックエンド開発 > PHPチュートリアル > php は RabbitMQ メッセージを受信し、重い作業を実行します

php は RabbitMQ メッセージを受信し、重い作業を実行します

WBOY
リリース: 2016-06-23 13:07:03
オリジナル
1142 人が閲覧しました

1) メッセージキューの基本クラスを確立します

<?php/** * @desc 消息队列 * @author caifangjie * @date 2016/05/03 */class Queue{    //交换机名称    protected $_exchangeName = 'ex_auto_home';        //队列名称    protected $_queueName = 'qu_auto_home';        //路由    protected $_routeKey = 'ru_auto_home';        protected $_connectHandler;        protected $_channelObject;    protected $_exchangeObject;        protected $_queueObject;        //配置信息    protected $_config = array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest');    //构造函数,依次创建通道,交换机,队列    public function __construct()    {        try{            $this->_connectHandler = new AMQPConnection($this->_config);            if(!$this->_connectHandler->connect()) {                die('connect failed');            }            $this->createChannel();            $this->createExchange();            $this->createQueue();        } catch(Exception $e) {            echo $e->getMessage();        }    }        //创建通道    protected function createChannel()    {        $this->_channelObject = new AMQPChannel($this->_connectHandler);    }        //创建交换机    public function createExchange($exchangeName='', $exchangeType=AMQP_EX_TYPE_DIRECT)    {        $exName = $exchangeName?$exchangeName:$this->_exchangeName;        $this->_exchangeObject = new AMQPExchange($this->_channelObject);        $this->_exchangeObject->setName($exName);        $this->_exchangeObject->setType($exchangeType);        $this->_exchangeObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_exchangeObject->declareExchange();    }        //创建队列    public function createQueue()    {        $this->_queueObject = new AMQPQueue($this->_channelObject);        $this->_queueObject->setName($this->_queueName);        $this->_queueObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);        $this->_queueObject->declareQueue();        $this->_queueObject->bind($this->_exchangeObject->getName(), $this->_routeKey);    }  }
ログイン後にコピー


2) キューにメッセージを継続的にプッシュするタスクがあります。累積的に、キューには大量のメッセージが存在します...

3) client は継続的にキュー内のメッセージを受け入れ、対応するタスクを実行します

<?phprequire_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{    public function __construct()    {        parent::__construct();    }    //接受消息    public function recvMessage()    {        while (true) {            $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                   // var_dump($requestUrl);                    $execHandler = new ExecProcess();                    $execHandler->start($requestUrl);                    $execHandler->execSave();                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();
ログイン後にコピー


既知の require_once 'ExecProcess.class.php'; このクラスは単独で実行できますが、クライアントはメッセージ キューに追加されます。メッセージを受け取り、重いタスクを実行するときは、(php-cli モードで) 実行時にクライアントはエラーを報告せずに直接終了することに注意してください。

以下のようであれば、正常に実行でき、キュー内のメッセージを出力できます

<?php//require_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{    public function __construct()    {        parent::__construct();    }    //接受消息    public function recvMessage()    {        while (true) {            $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) {                $requestUrl = $e->getBody();                if ($requestUrl) {                    var_dump($requestUrl);//                    $execHandler = new ExecProcess();//                    $execHandler->start($requestUrl);//                    $execHandler->execSave();//                    unset($execHandler);                    $q->nack($e->getDeliveryTag());                } else {                    usleep(100);                }            });        }    }}$reciver = new Recv();$reciver->recvMessage();
ログイン後にコピー


つまり、ExecProcess.class.php'を追加すると、メッセージを受信したクライアントは自動的に終了し、Noエラーが報告されます。 。 。

逆に、require_once 'ExecProcess.class.php'; を削除して、メッセージを処理するロジックを削除すると、キュー内のメッセージを出力できます... ゴーストが何なのかはわかりません

買ったからです。本を読んだのですが、見る時間がありませんでした。

問題は明らかなはずですが、誰かアイデアやヒントを教えていただけますか? 3Q


ディスカッションへの返信(解決策)

とても悲しいと思います...これが沈没と転覆のリズムでしょうか?

ExecProcess に何か問題がありますか?

require_once 'ExecProcess.class.php'; を削除して、キュー内のメッセージを出力できます...

そこにあります。 ExecProcess.class.php のメッセージ処理部分に問題があるはずです。コードのこの部分を重点的にチェックして、例外が何であるかを確認してください。

重いタスクを実行するとエラーが発生します
実行タイムアウトが原因かどうかを確認できます。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート