這篇文章主要介紹了關於使用mixphp 打造多進程非同步郵件發送,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下
郵件發送是很常見的需求,由於發送郵件的操作一般是比較耗時的,所以我們一般採用非同步處理來提升使用者體驗,而非同步通常我們使用訊息佇列來實現。
傳統MVC 框架由於缺少多進程開發能力,通常是採用同一個腳本執行多次,產生多個進程的方式,mixphp 封裝了TaskExecutor 專用於多進程開發,用戶能非常簡單的開發出功能完善的高可用多進程應用。
下面示範一個非同步郵件發送系統的開發過程,涉及知識點:
#異步
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);
composer require swiftmailer/swiftmailer
// 连接 $redis = new \Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new \Exception('Redis Connect Failure'); } $redis->auth(''); $redis->select(0); // 投递任务 $data = [ 'to' => ['***@qq.com' => 'A name'], 'body' => 'Here is the message itself', 'subject' => 'The title content', ]; $redis->lpush('queue:email', serialize($data));
<?php namespace apps\daemon\commands; use mix\console\ExitCode; use mix\facades\Input; use mix\facades\Redis; use mix\task\CenterProcess; use mix\task\LeftProcess; use mix\task\TaskExecutor; /** * 推送模式范例 * @author 刘健 <coder.liu@qq.com> */ class PushCommand extends BaseCommand { // 配置信息 const HOST = 'smtpdm.aliyun.com'; const PORT = 465; const SECURITY = 'ssl'; const USERNAME = '****@email.***.com'; const PASSWORD = '****'; // 初始化事件 public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 获取程序名称 $this->programName = Input::getCommandName(); // 设置pidfile $this->pidFile = "/var/run/{$this->programName}.pid"; } /** * 获取服务 * @return TaskExecutor */ public function getTaskService() { return create_object( [ // 类路径 'class' => 'mix\task\TaskExecutor', // 服务名称 'name' => "mix-daemon: {$this->programName}", // 执行类型 'type' => \mix\task\TaskExecutor::TYPE_DAEMON, // 执行模式 'mode' => \mix\task\TaskExecutor::MODE_PUSH, // 左进程数 'leftProcess' => 1, // 中进程数 'centerProcess' => 5, // 任务超时时间 (秒) 'timeout' => 5, ] ); } // 启动 public function actionStart() { // 预处理 if (!parent::actionStart()) { return ExitCode::UNSPECIFIED_ERROR; } // 启动服务 $service = $this->getTaskService(); $service->on('LeftStart', [$this, 'onLeftStart']); $service->on('CenterStart', [$this, 'onCenterStart']); $service->start(); // 返回退出码 return ExitCode::OK; } // 左进程启动事件回调函数 public function onLeftStart(LeftProcess $worker) { try { // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线 $queueModel = Redis::getInstance(); // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出 for ($j = 0; $j < 16000; $j++) { // 从消息队列中间件阻塞获取一条消息 $data = $queueModel->brpop('queue:email', 10); if (empty($data)) { continue; } list(, $data) = $data; // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html) $worker->push($data, false); } } catch (\Exception $e) { // 休息一会,避免 CPU 出现 100% sleep(1); // 抛出错误 throw $e; } } // 中进程启动事件回调函数 public function onCenterStart(CenterProcess $worker) { // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出 for ($j = 0; $j < 16000; $j++) { // 从进程消息队列中抢占一条消息 $data = $worker->pop(); if (empty($data)) { continue; } // 处理消息 try { // 处理消息,比如:发送短信、发送邮件、微信推送 var_dump($data); $ret = self::sendEmail($data); var_dump($ret); } catch (\Exception $e) { // 回退数据到消息队列 $worker->rollback($data); // 休息一会,避免 CPU 出现 100% sleep(1); // 抛出错误 throw $e; } } } // 发送邮件 public static function sendEmail($data) { // Create the Transport $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY)) ->setUsername(self::USERNAME) ->setPassword(self::PASSWORD); // Create the Mailer using your created Transport $mailer = new \Swift_Mailer($transport); // Create a message $message = (new \Swift_Message($data['subject'])) ->setFrom([self::USERNAME => '**网']) ->setTo($data['to']) ->setBody($data['body']); // Send the message $result = $mailer->send($message); return $result; } }