Blogger Information
Blog 42
fans 3
comment 2
visits 93531
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
YII2 框架如何使用rabbitMq
Whitney的博客
Original
4221 people have browsed it

背景简介:

首次接触yii2框架,对于rabbitMq报有学习的态度进行了推队列的应用;

博主使用rabbitMq安装的是

"php-amqplib/php-amqplib": ">=2.6.1"

首先封装了rabbitMq.php的服务文件:

实例

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Exception;

class RabbitMq
{

    public $channel;         //信道
    public $exchange;   //交换机
    public $queueName;  //队列名称
    public $route        = 'routeKey';  //路由键
    public $exchangeType = 'direct';  //交换机类型

    protected $conn;

    static protected $connection;  //静态rabbitMq连接

    //实例化该service时首先加载的方法:检测是否已经有rabbitMq连接【始终保持是同一连接】
    static public function instance($conf)
    {
        if (!self::$connection) {
            self::$connection = new self($conf);
        }

        return self::$connection;
    }

    /**
     * RabbitMq constructor.
     *
     * @param $conf  array  Mq的默认连接配置
     *               @$conf['host']  rabbitMq配置的ip地址
     *               @$conf['port']  rabbitMq配置的端口号
     *               @$conf['user']  用户名
     *               @$conf['pwd']   密码
     *               @$conf['vhost'] 虚拟host
     */
    public function __construct($conf)
    {
        try {
            $this->conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['pwd'],
                $conf['vhost']);
            $this->exchange = $conf['exchange'];
            $this->queueName = $conf['queue'];
            $this->getConnection();
        } catch (Exception $e) {
            throw new Exception('cannot connection rabbitMq:' . $e->getMessage());
        }

    }


    public function getConnection()
    {
        if (!isset($this->channel)) {
            $this->channel = $this->conn->channel();
        }

        $this->createExchange();
    }


    public function createExchange()
    {
        //passive: 消极处理, 判断是否存在队列,存在则返回,不存在直接抛出 PhpAmqpLib\Exception\AMQPProtocolChannelException 异常
        //durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在
        //autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange

        $this->channel->exchange_declare($this->exchange, $this->exchangeType, false, true, false);

        //passive: 消极处理,判断是否存在队列,存在则返回,不存在则直接抛出 PhpAmqpLib\Exception\AMQPProtocolChannelException 异常
        //durable: true/false true :在服务器重启时,能够存活
        //exclusive: 是否为当前连接的专用队列,在连接段开后,会自动删除该队列
        //autodelete: 当没有任何消费者使用时,自动删除该队列
        //arguments: 自定义规则
        $this->channel->queue_declare($this->queueName, false, true, false, false);
    }


    /**
     * 绑定消息队列
     * 博主个人看法:在创建交换机与队列的时候,可以手动在rabbitMq界面将二者绑定,没有必要每次进行发送或者消费队列时进行绑定;
     */
    public function bindQueue()
    {
        $this->channel->queue_bind($this->queueName, $this->exchange, $this->route);
    }

    /**
     * 发送消息
     *
     * @param $msgBody  string  消息类型
     */
    public function sendMsg($msgBody)
    {
        // content_type: 发送消息的类型
        // delivery_mode: 设置的属性,比如设置该消息持久化['delivery_mode' => 2]
        if (is_array($msgBody)) {
            $msgBody = json_encode($msgBody);
        }
        $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
        $this->channel->basic_publish($msg, $this->exchange, $this->route); //推送消息到某个交换机
    }

    /**
     * 消费消息
     *
     * @param $callback callable|null  回调函数 在这里可以添加消费消息的具体逻辑
     */
    public function consumeMsg($callback)
    {
        $this->bindQueue();
        //1.队列名称
        //2.consumer_tag 消费者标签
        //3.no_local false 这个功能属于AMPQ的标准,但是rabbitMq并没有做实现
        //4.no_ack false 收到消息后,是否不需要回复确认即被认为是被消费
        //5.exclusive false 排他消费者,即这个队列只能有一个消费者消费,适用于人物不允许进行并打处理的情况下,比如系统对接
        //6.callback 回调函数
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);

        //监听消息
        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    public function __destruct()
    {
        $this->channel()->close();
        $this->conn->close();
    }

}

生产者:

实例

public function actionIndex()
{
    try {
        $params = Yii::$app->params['rabbitmq_config'];
        $MqConfig = Yii::$app->params['WuliuPushWomsMq'];
        $params['exchange'] = $MqConfig['exchange'];
        $params['queue'] = $MqConfig['queue'];

        $mq = RabbitMq::instance($params);

        for ($i = 0;$i<=100;$i++) {
            $content = [
                'order_goods_id' => rand(000, 999),
                'express'        => 'JD',
                'express_number' => 'VB52806545124535' . rand(000, 999),
                'status'         => 'Cancel',
            ];
            echo $i.' ';
            $mq->sendMsg($content);
            RequestLog::instance()->addLog('pushQueueMsg', json_encode($content, 256), 'add success');
        }

    } catch (Exception $e) {
        RequestLog::instance()->addLog('logisticStatusChangePushQueueFailed:', json_encode($content, 256),
            $e->getMessage());
        Yii::error($e->getMessage());
    }

}

消费者:

实例

public function actionTest()
{
    $params = Yii::$app->params['rabbitmq_config'];
    $MqConfig = Yii::$app->params['WuliuPushWomsMq'];
    $params['exchange'] = $MqConfig['exchange'];
    $params['queue'] = $MqConfig['queue'];
    $mq = RabbitMq::instance($params);
    $callback = function ($msg) {
        $res = RequestLog::instance()->addLog('dealQueueMsg', $msg->body, 'consume success');
        if ($res) {
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
    };

    $mq->consumeMsg($callback);
}

消费时会遇到由于mysql的长链接8小时超时自动断开的问题,所以若8个小时之内没有mysql的请求,之后再去消费的时候会出现bug;

这个解决办法可以有:

  1. 在业务逻辑里隔一段时间检测如果没有mysql链接的话就重新与数据库建立连接;

  2. 使用Java处理消费者,Java的连接池会保证数据库的连接一直是有效状态;

PS:当然如果数据库挂了就没办法了

请支持原创哦!





Statement of this Website
The copyright of this blog article belongs to the blogger. Please specify the address when reprinting! If there is any infringement or violation of the law, please contact admin@php.cn Report processing!
All comments Speak rationally on civilized internet, please comply with News Comment Service Agreement
0 comments
Author's latest blog post