1) メッセージキューの基本クラスを確立します
<?php/** * @desc 消息队列 * @author caifangjie * @date 2016/05/03 */class Queue{ //交换机名称 protected $_exchangeName = 'ex_auto_home'; //队列名称 protected $_queueName = 'qu_auto_home'; //路由 protected $_routeKey = 'ru_auto_home'; protected $_connectHandler; protected $_channelObject; protected $_exchangeObject; protected $_queueObject; //配置信息 protected $_config = array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'); //构造函数,依次创建通道,交换机,队列 public function __construct() { try{ $this->_connectHandler = new AMQPConnection($this->_config); if(!$this->_connectHandler->connect()) { die('connect failed'); } $this->createChannel(); $this->createExchange(); $this->createQueue(); } catch(Exception $e) { echo $e->getMessage(); } } //创建通道 protected function createChannel() { $this->_channelObject = new AMQPChannel($this->_connectHandler); } //创建交换机 public function createExchange($exchangeName='', $exchangeType=AMQP_EX_TYPE_DIRECT) { $exName = $exchangeName?$exchangeName:$this->_exchangeName; $this->_exchangeObject = new AMQPExchange($this->_channelObject); $this->_exchangeObject->setName($exName); $this->_exchangeObject->setType($exchangeType); $this->_exchangeObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $this->_exchangeObject->declareExchange(); } //创建队列 public function createQueue() { $this->_queueObject = new AMQPQueue($this->_channelObject); $this->_queueObject->setName($this->_queueName); $this->_queueObject->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); $this->_queueObject->declareQueue(); $this->_queueObject->bind($this->_exchangeObject->getName(), $this->_routeKey); } }
<?phprequire_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{ public function __construct() { parent::__construct(); } //接受消息 public function recvMessage() { while (true) { $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) { $requestUrl = $e->getBody(); if ($requestUrl) { // var_dump($requestUrl); $execHandler = new ExecProcess(); $execHandler->start($requestUrl); $execHandler->execSave(); unset($execHandler); $q->nack($e->getDeliveryTag()); } else { usleep(100); } }); } }}$reciver = new Recv();$reciver->recvMessage();
<?php//require_once 'ExecProcess.class.php';require_once 'Queue/Queue.class.php';class Recv extends Queue{ public function __construct() { parent::__construct(); } //接受消息 public function recvMessage() { while (true) { $this->_queueObject->consume(function(AMQPEnvelope $e, AMQPQueue $q) { $requestUrl = $e->getBody(); if ($requestUrl) { var_dump($requestUrl);// $execHandler = new ExecProcess();// $execHandler->start($requestUrl);// $execHandler->execSave();// unset($execHandler); $q->nack($e->getDeliveryTag()); } else { usleep(100); } }); } }}$reciver = new Recv();$reciver->recvMessage();
とても悲しいと思います...これが沈没と転覆のリズムでしょうか?
ExecProcess に何か問題がありますか?
require_once 'ExecProcess.class.php'; を削除して、キュー内のメッセージを出力できます...
そこにあります。 ExecProcess.class.php のメッセージ処理部分に問題があるはずです。コードのこの部分を重点的にチェックして、例外が何であるかを確認してください。
重いタスクを実行するとエラーが発生します
実行タイムアウトが原因かどうかを確認できます。