背景简介:
首次接触yii2框架,对于rabbitMq报有学习的态度进行了推队列的应用;
博主使用rabbitMq安装的是
"php-amqplib/php-amqplib": ">=2.6.1"
首先封装了rabbitMq.php的服务文件:
use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use Exception; class RabbitMq { public $channel; //信道 public $exchange; //交换机 public $queueName; //队列名称 public $route = 'routeKey'; //路由键 public $exchangeType = 'direct'; //交换机类型 protected $conn; static protected $connection; //静态rabbitMq连接 //实例化该service时首先加载的方法:检测是否已经有rabbitMq连接【始终保持是同一连接】 static public function instance($conf) { if (!self::$connection) { self::$connection = new self($conf); } return self::$connection; } /** * RabbitMq constructor. * * @param $conf array Mq的默认连接配置 * @$conf['host'] rabbitMq配置的ip地址 * @$conf['port'] rabbitMq配置的端口号 * @$conf['user'] 用户名 * @$conf['pwd'] 密码 * @$conf['vhost'] 虚拟host */ public function __construct($conf) { try { $this->conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['pwd'], $conf['vhost']); $this->exchange = $conf['exchange']; $this->queueName = $conf['queue']; $this->getConnection(); } catch (Exception $e) { throw new Exception('cannot connection rabbitMq:' . $e->getMessage()); } } public function getConnection() { if (!isset($this->channel)) { $this->channel = $this->conn->channel(); } $this->createExchange(); } public function createExchange() { //passive: 消极处理, 判断是否存在队列,存在则返回,不存在直接抛出 PhpAmqpLib\Exception\AMQPProtocolChannelException 异常 //durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在 //autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange $this->channel->exchange_declare($this->exchange, $this->exchangeType, false, true, false); //passive: 消极处理,判断是否存在队列,存在则返回,不存在则直接抛出 PhpAmqpLib\Exception\AMQPProtocolChannelException 异常 //durable: true/false true :在服务器重启时,能够存活 //exclusive: 是否为当前连接的专用队列,在连接段开后,会自动删除该队列 //autodelete: 当没有任何消费者使用时,自动删除该队列 //arguments: 自定义规则 $this->channel->queue_declare($this->queueName, false, true, false, false); } /** * 绑定消息队列 * 博主个人看法:在创建交换机与队列的时候,可以手动在rabbitMq界面将二者绑定,没有必要每次进行发送或者消费队列时进行绑定; */ public function bindQueue() { $this->channel->queue_bind($this->queueName, $this->exchange, $this->route); } /** * 发送消息 * * @param $msgBody string 消息类型 */ public function sendMsg($msgBody) { // content_type: 发送消息的类型 // delivery_mode: 设置的属性,比如设置该消息持久化['delivery_mode' => 2] if (is_array($msgBody)) { $msgBody = json_encode($msgBody); } $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息 $this->channel->basic_publish($msg, $this->exchange, $this->route); //推送消息到某个交换机 } /** * 消费消息 * * @param $callback callable|null 回调函数 在这里可以添加消费消息的具体逻辑 */ public function consumeMsg($callback) { $this->bindQueue(); //1.队列名称 //2.consumer_tag 消费者标签 //3.no_local false 这个功能属于AMPQ的标准,但是rabbitMq并没有做实现 //4.no_ack false 收到消息后,是否不需要回复确认即被认为是被消费 //5.exclusive false 排他消费者,即这个队列只能有一个消费者消费,适用于人物不允许进行并打处理的情况下,比如系统对接 //6.callback 回调函数 $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback); //监听消息 while (count($this->channel->callbacks)) { $this->channel->wait(); } } public function __destruct() { $this->channel()->close(); $this->conn->close(); } }
生产者:
public function actionIndex() { try { $params = Yii::$app->params['rabbitmq_config']; $MqConfig = Yii::$app->params['WuliuPushWomsMq']; $params['exchange'] = $MqConfig['exchange']; $params['queue'] = $MqConfig['queue']; $mq = RabbitMq::instance($params); for ($i = 0;$i<=100;$i++) { $content = [ 'order_goods_id' => rand(000, 999), 'express' => 'JD', 'express_number' => 'VB52806545124535' . rand(000, 999), 'status' => 'Cancel', ]; echo $i.' '; $mq->sendMsg($content); RequestLog::instance()->addLog('pushQueueMsg', json_encode($content, 256), 'add success'); } } catch (Exception $e) { RequestLog::instance()->addLog('logisticStatusChangePushQueueFailed:', json_encode($content, 256), $e->getMessage()); Yii::error($e->getMessage()); } }
消费者:
public function actionTest() { $params = Yii::$app->params['rabbitmq_config']; $MqConfig = Yii::$app->params['WuliuPushWomsMq']; $params['exchange'] = $MqConfig['exchange']; $params['queue'] = $MqConfig['queue']; $mq = RabbitMq::instance($params); $callback = function ($msg) { $res = RequestLog::instance()->addLog('dealQueueMsg', $msg->body, 'consume success'); if ($res) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $mq->consumeMsg($callback); }
消费时会遇到由于mysql的长链接8小时超时自动断开的问题,所以若8个小时之内没有mysql的请求,之后再去消费的时候会出现bug;
这个解决办法可以有:
在业务逻辑里隔一段时间检测如果没有mysql链接的话就重新与数据库建立连接;
使用Java处理消费者,Java的连接池会保证数据库的连接一直是有效状态;
PS:当然如果数据库挂了就没办法了
请支持原创哦!