使用 mixphp 打造多進程非同步郵件傳送

不言
發布: 2023-04-02 17:46:02
原創
1809 人瀏覽過

這篇文章主要介紹了關於使用mixphp 打造多進程非同步郵件發送,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下

郵件發送是很常見的需求,由於發送郵件的操作一般是比較耗時的,所以我們一般採用非同步處理來提升使用者體驗,而非同步通常我們使用訊息佇列來實現。

傳統MVC 框架由於缺少多進程開發能力,通常是採用同一個腳本執行多次,產生多個進程的方式,mixphp 封裝了TaskExecutor 專用於多進程開發,用戶能非常簡單的開發出功能完善的高可用多進程應用。

下面示範一個非同步郵件發送系統的開發過程,涉及知識點:

  • #異步

  • ##訊息佇列

  • 多進程

  • #守護程式

如何使用訊息佇列實作非同步

## PHP 使用訊息佇列通常是使用中間件來實現,常用的訊息中間件有:
  • redis
  • rabbitmq
  • kafka

本次我們選用redis 來實現非同步郵件發送,redis 的資料類型中有一個list 類型,可實現訊息佇列,使用下列指令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);
登入後複製

架構設計

本實例由傳統MVC 框架投遞郵件傳送需求,MixPHP 多重處理執行傳送任務。

郵件發送庫選型

以往我們通常使用框架提供的郵件發送庫,或者網上下載別的用戶分享的庫,composer 出現後,https://packagist.org/上有大量優質的庫,我們只需選擇一個最好的即可,本例選擇swiftmailer。

由於發送任務是由MixPHP 執行,所以swiftmailer 是安裝在MixPHP 專案中,在專案根目錄中執行以下命令安裝:

composer require swiftmailer/swiftmailer
登入後複製

生產者開發

在郵件發送這個需求中生產者是指投遞發送任務的一方,這一方通常是一個介面或網頁,這個部分不一定需mixphp 開發,TP、CI、YII 這些都可以,只需在介面或網頁中把任務訊息投遞到訊息佇列中即可。

在傳統 MVC 框架的控制器中增加如下程式碼:
通常框架中使用 redis 會安裝一個類別庫來使用,本例使用原生程式碼,以便於理解。

// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));
登入後複製

通常在非同步開發中,投遞完成後就會立即回應一個訊息給用戶,當然此時該任務並沒有執行。

消費者開發

本例我們使用MixPHP 的多進程開發工具TaskExecutor 來完成這個需求,通常使用常駐進程來處理佇列的消費,所以我們使用TaskExecutor 的TYPE_DAEMON 類型, MODE_PUSH 模式。

TaskExecutor  的 MODE_PUSH 模式有二種程序:
  • 左進程:負責從訊息佇列取出任務數據,投放給中行程。
  • 中進程:負責執行郵件發送任務。

PushCommand.php 程式碼如下:

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式范例
 * @author 刘健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = &#39;smtpdm.aliyun.com&#39;;
    const PORT = 465;
    const SECURITY = &#39;ssl&#39;;
    const USERNAME = &#39;****@email.***.com&#39;;
    const PASSWORD = &#39;****&#39;;

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                &#39;class&#39;         => &#39;mix\task\TaskExecutor&#39;,
                // 服务名称
                &#39;name&#39;          => "mix-daemon: {$this->programName}",
                // 执行类型
                &#39;type&#39;          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                &#39;mode&#39;          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                &#39;leftProcess&#39;   => 1,
                // 中进程数
                &#39;centerProcess&#39; => 5,
                // 任务超时时间 (秒)
                &#39;timeout&#39;       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on(&#39;LeftStart&#39;, [$this, &#39;onLeftStart&#39;]);
        $service->on(&#39;CenterStart&#39;, [$this, &#39;onCenterStart&#39;]);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop(&#39;queue:email&#39;, 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data[&#39;subject&#39;]))
            ->setFrom([self::USERNAME => &#39;**网&#39;])
            ->setTo($data[&#39;to&#39;])
            ->setBody($data[&#39;body&#39;]);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}
登入後複製

測試
  1. 在 shell 中啟動 push 常駐程式。
    [root@localhost bin]# ./mix-daemon push start
    mix-daemon &#39;push&#39; start successed.
    登入後複製
  1. 呼叫介面往訊息佇列執行任務。
  2. 此時shell 終端將列印:

    使用 mixphp 打造多進程非同步郵件傳送

    #成功收到測試郵件:

    使用 mixphp 打造多進程非同步郵件傳送

    以上就是本文的全部內容,希望對大家的學習有所幫助,更多相關內容請關注PHP中文網!

    相關推薦:

    給PHP開啟shmop擴充實作共享記憶體

    php實作共享記憶體進程通訊函數(_shm )

    ###

    以上是使用 mixphp 打造多進程非同步郵件傳送的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!