Use mixphp to create multi-process asynchronous email sending

不言
Release: 2023-04-02 17:46:02
Original
1822 people have browsed it

This article mainly introduces the use of mixphp to create multi-process asynchronous email sending. It has certain reference value. Now I share it with you. Friends in need can refer to it.

Email sending is a very common requirement. , Since the operation of sending emails is generally time-consuming, we generally use asynchronous processing to improve user experience, and asynchronous processing is usually implemented using message queues.

Due to the lack of multi-process development capabilities, the traditional MVC framework usually uses the same script to be executed multiple times to generate multiple processes. Mixphp encapsulates TaskExecutor specifically for multi-process development, and users can develop it very simply. A fully functional and highly available multi-process application.

The following demonstrates the development process of an asynchronous email sending system, involving knowledge points:

  • Asynchronous

  • Message Queue

  • Multiple processes

  • Daemon process

How to use message queue to achieve asynchronous

PHP's use of message queues is usually implemented using middleware. Commonly used message middleware are:

  • redis

  • rabbitmq

  • kafka

This time we choose redis to implement asynchronous email sending. There is a list type in the data type of redis, which can implement message queue. Use the following command:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);
Copy after login

Architecture Design

This example uses the traditional MVC framework to deliver email sending requirements, and MixPHP multi-process to perform the sending task.

Selection of email sending library

In the past, we usually used the email sending library provided by the framework, or downloaded the library shared by other users online. After composer appeared, https://packagist.org/ There are a lot of high-quality libraries on the Internet, we just need to choose the best one. In this case, we choose swiftmailer.

Since the sending task is executed by MixPHP, swiftmailer is installed in the MixPHP project. Execute the following command in the project root directory to install:

composer require swiftmailer/swiftmailer
Copy after login

Producer development

In In the requirement of email sending, the producer refers to the party that delivers the sending task. This party is usually an interface or web page. This part does not necessarily need to be developed with mixphp. TP, CI, and YII are all acceptable. Just add it in the interface or web page. Just post the task information to the message queue.

Add the following code to the controller of the traditional MVC framework:

Usually using redis in the framework will install a class library for use. This example uses native code for easy understanding.
// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));
Copy after login

Usually in asynchronous development, a message will be responded to the user immediately after the delivery is completed. Of course, the task is not executed at this time.

Consumer Development

In this example we use MixPHP's multi-process development tool TaskExecutor to complete this requirement. Resident processes are usually used to handle queue consumption, so we use the TYPE_DAEMON type of TaskExecutor. MODE_PUSH mode.

TaskExecutor's MODE_PUSH mode has two processes:

  • Left process: Responsible for taking out task data from the message queue and putting it into the middle process.

  • Medium process: Responsible for performing email sending tasks.

PushCommand.php code is as follows:

<?php

namespace apps\daemon\commands;

use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;

/**
 * 推送模式范例
 * @author 刘健 <coder.liu@qq.com>
 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = &#39;smtpdm.aliyun.com&#39;;
    const PORT = 465;
    const SECURITY = &#39;ssl&#39;;
    const USERNAME = &#39;****@email.***.com&#39;;
    const PASSWORD = &#39;****&#39;;

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                &#39;class&#39;         => &#39;mix\task\TaskExecutor&#39;,
                // 服务名称
                &#39;name&#39;          => "mix-daemon: {$this->programName}",
                // 执行类型
                &#39;type&#39;          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                &#39;mode&#39;          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                &#39;leftProcess&#39;   => 1,
                // 中进程数
                &#39;centerProcess&#39; => 5,
                // 任务超时时间 (秒)
                &#39;timeout&#39;       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on(&#39;LeftStart&#39;, [$this, &#39;onLeftStart&#39;]);
        $service->on(&#39;CenterStart&#39;, [$this, &#39;onCenterStart&#39;]);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop(&#39;queue:email&#39;, 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j < 16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data[&#39;subject&#39;]))
            ->setFrom([self::USERNAME => &#39;**网&#39;])
            ->setTo($data[&#39;to&#39;])
            ->setBody($data[&#39;body&#39;]);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}
Copy after login

Test

  1. Start the push resident program in the shell.

[root@localhost bin]# ./mix-daemon push start
mix-daemon &#39;push&#39; start successed.
Copy after login
  1. Call the interface to put tasks into the message queue.

At this time the shell terminal will print:

Use mixphp to create multi-process asynchronous email sending

Successfully received the test email:

Use mixphp to create multi-process asynchronous email sending

The above is the entire content of this article. I hope it will be helpful to everyone's study. For more related content, please pay attention to the PHP Chinese website!

Related recommendations:

Enable shmop extension for PHP to implement shared memory

php implements shared memory process communication function (_shm )

The above is the detailed content of Use mixphp to create multi-process asynchronous email sending. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template