Laravel 的隊列系統介紹

不言
發布: 2023-03-24 06:20:01
原創
2140 人瀏覽過

這篇文章主要介紹的內容是關於Laravel 的隊列系統介紹,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下

Laravel 隊列為不同的後台隊列服務提供統一的API , 例如Beanstalk,Amazon SQS, Redis,甚至其他基於關係型資料庫的佇列。佇列的目的是將耗時的任務延時處理,例如發送郵件,從而大幅縮短Web請求和相應的時間。

佇列設定檔存放在 config/queue.php。每一種佇列驅動的配置都可以在該文件中找到, 包括資料庫, Beanstalkd, Amazon SQS, Redis, 以及同步(本地使用)驅動。其中也包含了一個null佇列驅動程式用於那些放棄佇列的任務。


連接 Vs. 隊列

在開始使用 Laravel 隊列前,弄清楚 「連接」 和 「隊列」 的差異是很重要的。在你的 config/queue.php 設定檔裡, 有一個 connections 設定選項。這個選項給 Amazon SQS, Beanstalk ,或 Redis 這樣的後端服務定義了一個獨特的連線。不管是哪一種,一個給定的連接可能會有多個“隊列”,而 “隊列”可以被認為是不同的堆疊或大量的隊列任務。

要注意的是, queue 設定檔中每個連線的設定範例中都包含一個 queue 屬性。這是預設佇列,任務被發給指定連線的時候會被分送到這個佇列。換句話說,如果你分發任務的時候沒有明確定義隊列,那麼它就會被放到連接配置中queue 屬性所定義的隊列中:

// 这个任务将被分发到默认队列...dispatch(new Job);// 这个任务将被发送到「emails」队列...dispatch((new Job)->onQueue('emails'));
登入後複製

有些應用可能不需要把任務發到不同的隊列,而只發到一個簡單的隊列就行了。但把任務推到不同的佇列仍然是非常有用的,因為 Laravel 佇列處理器允許你定義佇列的優先權,所以你能給不同的佇列劃分不同的優先權或區分不同任務的不同處理方式了。比方說,如果你把任務推到high 佇列中,你就能讓佇列處理器優先處理這些任務了:

php artisan queue:work --queue=high,default
登入後複製


驅動的必要設定

資料庫

要使用database 這個佇列驅動程式的話, 你需要建立一個資料表來儲存任務,你可以用queue:table 這個Artisan 指令來建立這個資料表的遷移。當遷移創建好以後,就可以用migrate 這條指令來建立資料表:

php artisan queue:table

php artisan migrate
登入後複製

Redis

為了使用redis 佇列驅動,你需要在你的設定檔config/database.php 中設定Redis的資料庫連接

#如果你的Redis 佇列連線使用的是Redis 集群, 你的佇列名稱必須包含key hash tag 。這是為了確保所有的redis 鍵對於一個給定的佇列都置於同一雜湊中:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,],
登入後複製

其它佇列驅動的依賴擴充包

在使用清單裡的佇列服務前,必須安裝以下依賴擴充:


  • Amazon SQS: aws/aws-sdk-php ~3.0

  • #Beanstalkd: pda/pheanstalk ~3.0

  • #Redis: predis/predis ~1.0



建立任務


產生任務類別

在你的應用程式中,佇列的任務類別都預設放在app/Jobs 目錄下,如果這個目錄不存在,那麼當你執行make:job artisan命令時目錄就會自動建立。你可以用以下的Artisan 指令來產生一個新的佇列任務:

php artisan make:job SendReminderEmail
登入後複製

產生的類別實作了Illuminate\Contracts\Queue\ShouldQueue 接口,這表示這個任務將會被推送到佇列中,而不是同步執行。


任務類別結構

任務類別的結構很簡單,一般來說只會包含一個讓佇列用來呼叫此任務的handle 方法。我們來看一個範例的任務類,這個範例裡,假設我們管理著一個播客發布服務,在發布之前需要處理上傳播客文件:

<?phpnamespace App\Jobs;use App\Podcast;use App\AudioProcessor;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;class ProcessPodcast implements ShouldQueue{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的任务实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 运行任务。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // Process uploaded podcast...
    }}
登入後複製

注意,在這個例子中,我們在任務類的構造器中直接傳遞了一個Eloquent 模型。因為我們在任務類別裡引用了 SerializesModels 這個
,使得 Eloquent 模型在處理任務時可以被優雅地序列化和反序列化。如果你的佇列任務類別在建構器中接收了一個 Eloquent 模型,那麼只有可辨識出該模型的屬性會被序列化到佇列裡。當任務實際運作時,佇列系統會自動從資料庫中重新取回完整的模型。這整個過程對你的應用程式來說是完全透明的,這可以避免在序列化完整的 Eloquent 模式實例時所帶來的一些問題。

在队列处理任务时,会调用 handle 方法,而这里我们也可以通过 handle 方法的参数类型提示,让 Laravel 的 服务容器 自动注入依赖对象。

{note} 像图片内容这种二进制数据, 在放入队列任务之前必须使用 base64_encode 方法转换一下。 否则,当这项任务放置到队列中时,可能无法正确序列化为 JSON。


分发任务

你写好任务类后,就能通过 dispatch 辅助函数来分发它了。唯一需要传递给 dispatch 的参数是这个任务类的实例:

<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{
    /**
     * 保存播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        dispatch(new ProcessPodcast($podcast));
    }}
登入後複製

{tip} dispatch 提供了一种简捷、全局可用的函数,它也非常容易测试。查看下 Laravel 测试文档 来了解更多。


延迟分发

如果你想延迟执行一个队列中的任务,你可以用任务实例的 delay 方法。 这个方法是 Illuminate\Bus\Queueable trait 提供的,而这个 trait 在所有自动生成的任务类中都是默认加载了的。对于延迟任务我们可以举个例子,比如指定一个被分发10分钟后才执行的任务:

<?phpnamespace App\Http\Controllers;use Carbon\Carbon;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{
    /**
     * 保存一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($podcast))
                    ->delay(Carbon::now()->addMinutes(10));

        dispatch($job);
    }}
登入後複製

{note} Amazon SQS 队列服务最大延迟 15 分钟。


自定义队列 & 连接

分发任务到指定队列

通过推送任务到不同的队列,你可以给队列任务分类,甚至可以控制给不同的队列分配多少任务。记住,这个并不是要推送任务到队列配置文件中不同的 「connections」 里,而是推送到一个连接中不同的队列里。要指定队列的话,就调用任务实例的 onQueue 方法:

<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{
    /**
     * 保存一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($podcast))->onQueue(&#39;processing&#39;);

        dispatch($job);
    }}
登入後複製

分发任务到指定连接

如果你使用了多个队列连接,你可以把任务推到指定连接。要指定连接的话,你可以调用任务实例的 onConnection 方法:

<?phpnamespace App\Http\Controllers;use App\Jobs\ProcessPodcast;use Illuminate\Http\Request;use App\Http\Controllers\Controller;class PodcastController extends Controller{
    /**
     * 保存一个新的播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        $job = (new ProcessPodcast($podcast))->onConnection(&#39;sqs&#39;);

        dispatch($job);
    }}
登入後複製

当然,你可以链式调用 onConnectiononQueue 来同时指定任务的连接和队列:

$job = (new ProcessPodcast($podcast))
                ->onConnection(&#39;sqs&#39;)
                ->onQueue(&#39;processing&#39;);
登入後複製


指定任务最大尝试次数 / 超时值

最大尝试次数

在一项任务中指定最大的尝试次数可以尝试通过 Artisan 命令行 --tries 来设置:

php artisan queue:work --tries=3
登入後複製

但是,你可以采取更为精致的方法来完成这项工作比如说在任务类中定义最大尝试次数。如果在类和命令行中都定义了最大尝试次数, Laravel 会优先执行任务类中的值:

<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{
    /**
     * 任务最大尝试次数
     *
     * @var int
     */
    public $tries = 5;}
登入後複製

超时

同样的,任务可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout 开关指定:

php artisan queue:work --timeout=30
登入後複製

然而,你也可以在任务类中定义一个变量来设置可运行的最大描述,如果在类和命令行中都定义了最大尝试次数, Laravel 会优先执行任务类中的值:

<?phpnamespace App\Jobs;class ProcessPodcast implements ShouldQueue{
    /**
     * 任务运行的超时时间。
     *
     * @var int
     */
    public $timeout = 120;}
登入後複製


错误处理

如果任务运行的时候抛出异常,这个任务就自动被释放回队列,这样它就能被再重新运行了。如果继续抛出异常,这个任务会继续被释放回队列,直到重试次数达到你应用允许的最多次数。这个最多次数是在调用 queue:work Artisan 命令的时候通过 --tries 参数来定义的。更多队列处理器的信息可以 在下面看到 。


运行队列处理器

Laravel 包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过 queue:work Artisan 命令来运行处理器。要注意,一旦 queue:work 命令开始,它将一直运行,直到你手动停止或者你关闭控制台:

php artisan queue:work
登入後複製

{tip} 要让 queue:work 进程永久在后台运行,你应该使用进程监控工具,比如 Supervisor 来保证队列处理器没有停止运行。

一定要记得,队列处理器是长时间运行的进程,并在内存里保存着已经启动的应用状态。这样的结果就是,处理器运行后如果你修改代码那这些改变是不会应用到处理器中的。所以在你重新部署过程中,一定要 重启队列处理器 。

指定连接 & 队列

你可以指定队列处理器所使用的连接。你在 config/queue.php 配置文件里定义了多个连接,而你传递给 work 命令的连接名字要至少跟它们其中一个是一致的:

php artisan queue:work redis
登入後複製

你可以自定义队列处理器,方式是处理给定连接的特定队列。举例来说,如果你所有的邮件都是在 redis 连接中的 emails 队列中处理的,你就能通过以下命令启动一个只处理那个特定队列的队列处理器了:

php artisan queue:work redis --queue=emails
登入後複製

资源注意事项

守护程序队列不会在处理每个作业之前 「重新启动」 框架。 因此,在每个任务完成后,您应该释放任何占用过大的资源。例如,如果你使用GD库进行图像处理,你应该在完成后用 imagedestroy 释放内存。


队列优先级

有时候你希望设置处理队列的优先级。比如在 config/queue.php 里你可能设置了 redis 连接中的默认队列优先级为 low,但是你可能偶尔希望把一个任务推到 high 优先级的队列中,像这样:

dispatch((new Job)->onQueue(&#39;high&#39;));
登入後複製

要验证 high 队列中的任务都是在 low 队列中的任务之前处理的,你要启动一个队列处理器,传递给它队列名字的列表并以英文逗号,间隔:

php artisan queue:work --queue=high,low
登入後複製


队列处理器 & 部署

因为队列处理器都是 long-lived 进程,如果代码改变而队列处理器没有重启,他们是不能应用新代码的。所以最简单的方式就是重新部署过程中要重启队列处理器。你可以很优雅地只输入 queue:restart 来重启所有队列处理器。

php artisan queue:restart
登入後複製

这个命令将会告诉所有队列处理器在执行完当前任务后结束进程,这样才不会有任务丢失。因为队列处理器在执行 queue:restart 命令时对结束进程,你应该运行一个进程管理器,比如 Supervisor 来自动重新启动队列处理器。


任务过期 & 超时

任务过期

config/queue.php 配置文件里,每一个队列连接都定义了一个 retry_after 选项。这个选项指定了任务最多处理多少秒后就被当做失败重试了。比如说,如果这个选项设置为 90,那么当这个任务持续执行了 90 秒而没有被删除,那么它将被释放回队列。通常情况下,你应该把 retry_after 设置为最长耗时的任务所对应的时间。

{note} 唯一没有 retry_after 选项的连接是 Amazon SQS。当用 Amazon SQS 时,你必须通过 Amazon 命令行来配置这个重试阈值。

队列处理器超时

queue:work Artisan 命令对外有一个 --timeout 选项。这个选项指定了 Laravel 队列处理器最多执行多长时间后就应该被关闭掉。有时候一个队列的子进程会因为很多原因僵死,比如一个外部的 HTTP 请求没有响应。这个 --timeout 选项会移除超出指定事件限制的僵死进程。

php artisan queue:work --timeout=60
登入後複製

retry_after 配置选项和 --timeout 命令行选项是不一样的,但是可以同时工作来保证任务不会丢失并且不会重复执行。

{note} --timeout 应该永远都要比 retry_after 短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。如果你的 --timeout 选项大于 retry_after 配置选项,你的任务可能被执行两次。

队列进程睡眠时间

当队列需要处理任务时,进程将继续处理任务,它们之间没有延迟。 但是,如果没有新的工作可用,sleep 参数决定了工作进程将「睡眠」多长时间:

php artisan queue:work --sleep=3
登入後複製


Supervisor 配置

安装 Supervisor

Supervisor 是一个 Linux 操作系统上的进程监控软件,它会在 queue:listenqueue:work 命令发生失败后自动重启它们。要在 Ubuntu 安装 Supervisor,可以用以下命令:

sudo apt-get install supervisor
登入後複製

{tip} 如果自己手动配置 Supervisor 听起来有点难以应付,可以考虑使用 Laravel Forge ,它能给你的 Laravel 项目自动安装与配置 Supervisor。

配置 Supervisor

Supervisor 的配置文件一般是放在 /etc/supervisor/conf.d 目录下,在这个目录中你可以创建任意数量的配置文件来要求 Supervisor 怎样监控你的进程。例如我们创建一个 laravel-worker.conf 来启动与监控一个 queue:work 进程:

[program:laravel-worker]process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3autostart=trueautorestart=trueuser=forge
numprocs=8redirect_stderr=truestdout_logfile=/home/forge/app.com/worker.log
登入後複製

这个例子里的 numprocs 命令会要求 Supervisor 运行并监控 8 个 queue:work 进程,并且在它们运行失败后重新启动。当然,你必须更改 command 命令的 queue:work sqs,以显示你所选择的队列驱动。

启动 Supervisor

当这个配置文件被创建后,你需要更新 Supervisor 的配置,并用以下命令来启动该进程:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*
登入後複製

更多有关 Supervisor 的设置与使用,请参考 Supervisor 官方文档。


处理失败的任务

有时候你队列中的任务会失败。不要担心,本来事情就不会一帆风顺。 Laravel 内置了一个方便的方式来指定任务重试的最大次数。当任务超出这个重试次数后,它就会被插入到 failed_jobs 数据表里面。要创建 failed_jobs 表的话,你可以用 queue:failed-table 命令:

php artisan queue:failed-table

php artisan migrate
登入後複製

然后运行队列处理器,在调用 queue:work 命令时你应该通过 --tries 参数指定任务的最大重试次数。如果不指定,任务就会永久重试:

php artisan queue:work redis --tries=3
登入後複製


清除失败任务

你可以在任务类里直接定义 failed 方法,它能在任务失败时运行任务的清除逻辑。这个地方用来发一条警告给用户或者重置任务执行的操作等再好不过了。导致任务失败的异常信息会被传递到 failed 方法:

<?phpnamespace App\Jobs;use Exception;use App\Podcast;use App\AudioProcessor;use Illuminate\Bus\Queueable;use Illuminate\Queue\SerializesModels;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Contracts\Queue\ShouldQueue;class ProcessPodcast implements ShouldQueue{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的任务实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行任务。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传播客...
    }

    /**
     * 要处理的失败任务。
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 给用户发送失败通知,等等...
    }}
登入後複製


任务失败事件

如果你想注册一个当队列任务失败时会被调用的事件,则可以用 Queue::failing 方法。这样你就有机会通过这个事件来用 e-mail 或 HipChat 通知你的团队。例如我们可以在 Laravel 内置的 AppServiceProvider 中对这个事件附加一个回调函数:

<?phpnamespace App\Providers;use Illuminate\Support\Facades\Queue;use Illuminate\Queue\Events\JobFailed;use Illuminate\Support\ServiceProvider;class AppServiceProvider extends ServiceProvider{
    /**
     * 启动任意应用程序的服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }}
登入後複製


重试失败任务

要查看你在 failed_jobs 数据表中的所有失败任务,则可以用 queue:failed 这个 Artisan 命令:

php artisan queue:failed
登入後複製

queue:failed 命令会列出所有任务的 ID、连接、队列以及失败时间,任务 ID 可以被用在重试失败的任务上。例如要重试一个 ID 为 5 的失败任务,其命令如下:

php artisan queue:retry 5
登入後複製

要重试所有失败的任务,可以使用 queue:retry 并使用 all 作为 ID:

php artisan queue:retry all
登入後複製

如果你想删除掉一个失败任务,可以用 queue:forget 命令:

php artisan queue:forget 5
登入後複製

queue:flush 命令可以让你删除所有失败的任务:

php artisan queue:flush
登入後複製


任务事件

使用队列的 beforeafter 方法,你能指定任务处理前和处理后的回调处理。在这些回调里正是实现额外的日志记录或者增加统计数据的好时机。通常情况下,你应该在 服务容器 中调用这些方法。例如,我们使用 Laravel 中的 AppServiceProvider:

<?phpnamespace App\Providers;use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobProcessed;use Illuminate\Queue\Events\JobProcessing;class AppServiceProvider extends ServiceProvider{
    /**
     * 启动任意服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     * 注册服务提供者。
     *
     * @return void
     */
    public function register()
    {
        //
    }}
登入後複製

队列 facade 中使用 looping 方法,你可以尝试在队列获取任务之前执行指定的回调方法。举个例子,你可以用闭包来回滚之前已失败任务的事务。

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }});
登入後複製

相关推荐:

Laravel延迟队列实现之Lua脚本解析

以上是Laravel 的隊列系統介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板