首页 php框架 Laravel laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

May 20, 2021 pm 01:48 PM
laravel8 swoole

下面由laravel教程栏目给大家介绍laravel-swoole消息队列,希望对需要的朋友有所帮助!

这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;

思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!

直接上修改的思路和代码!开干!

一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.

首先重写swoole启动命令

<?php

namespace crmeb\swoole\command;


use Illuminate\Support\Arr;
use Swoole\Process;
use SwooleTW\Http\Server\Facades\Server;
use SwooleTW\Http\Server\Manager;
use crmeb\swoole\server\InteractsWithQueue;
use crmeb\swoole\server\FileWatcher;
use Swoole\Runtime;

class HttpServerCommand extends \SwooleTW\Http\Commands\HttpServerCommand
{
    use InteractsWithQueue;

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = &#39;crmeb:http {action : start|stop|restart|reload|infos}&#39;;

    /**
     * Run swoole_http_server.
     */
    protected function start()
    {
        if ($this->isRunning()) {
            $this->error(&#39;Failed! swoole_http_server process is already running.&#39;);

            return;
        }

        $host             = Arr::get($this->config, &#39;server.host&#39;);
        $port             = Arr::get($this->config, &#39;server.port&#39;);
        $hotReloadEnabled = Arr::get($this->config, &#39;hot_reload.enabled&#39;);
        $queueEnabled     = Arr::get($this->config, &#39;queue.enabled&#39;);
        $accessLogEnabled = Arr::get($this->config, &#39;server.access_log&#39;);
        $coroutineEnable  = Arr::get($this->config, &#39;coroutine.enable&#39;);

        $this->info(&#39;Starting swoole http server...&#39;);
        $this->info("Swoole http server started: <http://{$host}:{$port}>");
        if ($this->isDaemon()) {
            $this->info(
                &#39;> (You can run this command to ensure the &#39; .
                &#39;swoole_http_server process is running: ps aux|grep "swoole")&#39;
            );
        }

        $manager = $this->laravel->make(Manager::class);
        $server  = $this->laravel->make(Server::class);

        if ($accessLogEnabled) {
            $this->registerAccessLog();
        }
        //热更新重写
        if ($hotReloadEnabled) {
            $manager->addProcess($this->getHotReloadProcessNow($server));
        }
        //启动消息队列进行消费
        if ($queueEnabled) {
            $this->prepareQueue($manager);
        }

        if ($coroutineEnable) {
            Runtime::enableCoroutine(true, Arr::get($this->config, &#39;coroutine.flags&#39;, SWOOLE_HOOK_ALL));
        }

        $manager->run();
    }

    /**
     * @param Server $server
     * @return Process|void
     */
    protected function getHotReloadProcessNow($server)
    {
        return new Process(function () use ($server) {
            $watcher = new FileWatcher(
                Arr::get($this->config, &#39;hot_reload.include&#39;, []),
                Arr::get($this->config, &#39;hot_reload.exclude&#39;, []),
                Arr::get($this->config, &#39;hot_reload.name&#39;, [])
            );

            $watcher->watch(function () use ($server) {
                $server->reload();
            });
        }, false, 0, true);
    }

}
登录后复制

InteractsWithQueue 类

<?php
namespace crmeb\swoole\server;


use crmeb\swoole\queue\Manager as QueueManager;
use SwooleTW\Http\Server\Manager;

/**
 * Trait InteractsWithQueue
 * @package crmeb\swoole\server
 */
trait InteractsWithQueue
{
    public function prepareQueue(Manager $manager)
    {
        /** @var QueueManager $queueManager */
        $queueManager = $this->laravel->make(QueueManager::class);

        $queueManager->attachToServer($manager, $this->output);
    }
}
登录后复制

Manager类

<?php
namespace crmeb\swoole\queue;


use Illuminate\Contracts\Container\Container;
use Swoole\Constant;
use Swoole\Process;
use Swoole\Process\Pool;
use Swoole\Timer;
use Illuminate\Support\Arr;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Worker;
use crmeb\swoole\server\WithContainer;
use Illuminate\Queue\Jobs\Job;
use function Swoole\Coroutine\run;
use Illuminate\Queue\WorkerOptions;
use SwooleTW\Http\Server\Manager as ServerManager;
use Illuminate\Console\OutputStyle;

class Manager
{
    use WithContainer;

    /**
     * Container.
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * @var OutputStyle
     */
    protected $output;

    /**
     * @var Closure[]
     */
    protected $workers = [];

    /**
     * Manager constructor.
     * @param Container $container
     */
    public function __construct(Container $container)
    {
        $this->container = $container;
    }

    /**
     * @param ServerManager $server
     */
    public function attachToServer(ServerManager $server, OutputStyle $output)
    {
        $this->output = $output;
        $this->listenForEvents();
        $this->createWorkers();
        foreach ($this->workers as $worker) {
            $server->addProcess(new Process($worker, false, 0, true));
        }
    }

    /**
     * 运行消息队列命令
     */
    public function run(): void
    {
        @cli_set_process_title("swoole queue: manager process");

        $this->listenForEvents();
        $this->createWorkers();

        $pool = new Pool(count($this->workers));

        $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) {
            $process = $pool->getProcess($workerId);
            run($this->workers[$workerId], $process);
        });

        $pool->start();
    }

    /**
     * 创建执行任务
     */
    protected function createWorkers()
    {
        $workers = $this->getConfig(&#39;queue.workers&#39;, []);

        foreach ($workers as $queue => $options) {

            if (strpos($queue, &#39;@&#39;) !== false) {
                [$queue, $connection] = explode(&#39;@&#39;, $queue);
            } else {
                $connection = null;
            }

            $this->workers[] = function (Process $process) use ($options, $connection, $queue) {

                @cli_set_process_title("swoole queue: worker process");

                /** @var Worker $worker */
                $worker = $this->container->make(&#39;queue.worker&#39;);
                /** @var WorkerOptions $option */
                $option = $this->container->make(WorkerOptions::class);

                $option->sleep = Arr::get($options, "sleep", 3);
                $option->maxTries = Arr::get($options, "tries", 0);
                $option->timeout = Arr::get($options, "timeout", 60);

                $timer = Timer::after($option->timeout * 1000, function () use ($process) {
                    $process->exit();
                });

                $worker->runNextJob($connection, $queue, $option);

                Timer::clear($timer);
            };
        }
    }

    /**
     * 注册事件
     */
    protected function listenForEvents()
    {
        $this->container->make(&#39;events&#39;)->listen(JobFailed::class, function (JobFailed $event) {
            $this->writeOutput($event->job);

            $this->logFailedJob($event);
        });
    }

    /**
     * 记录失败任务
     * @param JobFailed $event
     */
    protected function logFailedJob(JobFailed $event)
    {
        $this->container[&#39;queue.failer&#39;]->log(
            $event->connection,
            $event->job->getQueue(),
            $event->job->getRawBody(),
            $event->exception
        );
    }

    /**
     * Write the status output for the queue worker.
     *
     * @param Job $job
     * @param     $status
     */
    protected function writeOutput(Job $job, $status)
    {
        switch ($status) {
            case &#39;starting&#39;:
                $this->writeStatus($job, &#39;Processing&#39;, &#39;comment&#39;);
                break;
            case &#39;success&#39;:
                $this->writeStatus($job, &#39;Processed&#39;, &#39;info&#39;);
                break;
            case &#39;failed&#39;:
                $this->writeStatus($job, &#39;Failed&#39;, &#39;error&#39;);
                break;
        }
    }

    /**
     * Format the status output for the queue worker.
     *
     * @param Job $job
     * @param string $status
     * @param string $type
     * @return void
     */
    protected function writeStatus(Job $job, $status, $type)
    {
        $this->output->writeln(sprintf(
            "<{$type}>[%s][%s] %s</{$type}> %s",
            date(&#39;Y-m-d H:i:s&#39;),
            $job->getJobId(),
            str_pad("{$status}:", 11), $job->getName()
        ));
    }

}
登录后复制

增加CrmebServiceProvider类

<?php
namespace crmeb\swoole;


use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Http\Kernel;
use crmeb\swoole\command\HttpServerCommand;
use Illuminate\Queue\Worker;
use SwooleTW\Http\HttpServiceProvider;
use SwooleTW\Http\Middleware\AccessLog;
use SwooleTW\Http\Server\Manager;

/**
 * Class CrmebServiceProvider
 * @package crmeb\swoole
 */
class CrmebServiceProvider extends HttpServiceProvider
{



    /**
     * Register manager.
     *
     * @return void
     */
    protected function registerManager()
    {
        $this->app->singleton(Manager::class, function ($app) {
            return new Manager($app, &#39;laravel&#39;);
        });

        $this->app->alias(Manager::class, &#39;swoole.manager&#39;);

        $this->app->singleton(&#39;queue.worker&#39;, function ($app) {
            $isDownForMaintenance = function () {
                return $this->app->isDownForMaintenance();
            };

            return new Worker(
                $app[&#39;queue&#39;],
                $app[&#39;events&#39;],
                $app[ExceptionHandler::class],
                $isDownForMaintenance
            );
        });
    }

    /**
     * Boot websocket routes.
     *
     * @return void
     */
    protected function bootWebsocketRoutes()
    {
        require base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/routes/laravel_routes.php&#39;;
    }

    /**
     * Register access log middleware to container.
     *
     * @return void
     */
    protected function pushAccessLogMiddleware()
    {
        $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class);
    }

    /**
     * Register commands.
     */
    protected function registerCommands()
    {
        $this->commands([
            HttpServerCommand::class,
        ]);
    }

    /**
     * Merge configurations.
     */
    protected function mergeConfigs()
    {
        $this->mergeConfigFrom(base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_http.php&#39;, &#39;swoole_http&#39;);
        $this->mergeConfigFrom(base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_websocket.php&#39;, &#39;swoole_websocket&#39;);
    }

    /**
     * Publish files of this package.
     */
    protected function publishFiles()
    {
        $this->publishes([
            base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_http.php&#39; => base_path(&#39;config/swoole_http.php&#39;),
            base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_websocket.php&#39; => base_path(&#39;config/swoole_websocket.php&#39;),
            base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/routes/websocket.php&#39; => base_path(&#39;routes/websocket.php&#39;),
        ], &#39;laravel-swoole&#39;);
    }
}
登录后复制

然后再把\crmeb\swoole\CrmebServiceProvider::class放入config/app.php中的providers中加载重写了swoole的命令启动方式

配置config/swoole_http.php

return [
    &#39;queue&#39;        => [
        //是否开启自动消费队列
        &#39;enabled&#39; => true,
        &#39;workers&#39; => [
            //队列名称
            &#39;CRMEB&#39; => []
        ]
    ],];
登录后复制

输入命令:
php artisan crmeb:http restart

swoole启动后就可以自动消费队列了。

相关推荐:最新的五个Laravel视频教程

以上是laravel8中laravel-swoole的扩展不兼容消息队列怎么办?的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

swoole协程如何在laravel中使用 swoole协程如何在laravel中使用 Apr 09, 2024 pm 06:48 PM

Laravel 中使用 Swoole 协程可以并发处理大量请求,优势包括:并发处理:允许同时处理多个请求。高性能:基于 Linux epoll 事件机制,高效处理请求。低资源消耗:所需服务器资源更少。易于集成:与 Laravel 框架无缝集成,使用简单。

如何使用Swoole实现高性能的HTTP反向代理服务器 如何使用Swoole实现高性能的HTTP反向代理服务器 Nov 07, 2023 am 08:18 AM

如何使用Swoole实现高性能的HTTP反向代理服务器Swoole是一款基于PHP语言的高性能、异步、并发的网络通信框架。它提供了一系列的网络功能,可以用于实现HTTP服务器、WebSocket服务器等。在本文中,我们将介绍如何使用Swoole来实现一个高性能的HTTP反向代理服务器,并提供具体的代码示例。环境配置首先,我们需要在服务器上安装Swoole扩展

swoole和workerman哪个好 swoole和workerman哪个好 Apr 09, 2024 pm 07:00 PM

Swoole 和 Workerman 都是高性能 PHP 服务器框架。Swoole 以其异步处理、出色的性能和可扩展性而闻名,适用于需要处理大量并发请求和高吞吐量的项目。Workerman 提供了异步和同步模式的灵活性,具有直观的 API,更适合易用性和处理较低并发量的项目。

swoole_process 怎么让用户切换 swoole_process 怎么让用户切换 Apr 09, 2024 pm 06:21 PM

Swoole Process 中可以让用户切换,具体操作步骤为:创建进程;设置进程用户;启动进程。

swoole框架怎么重启服务 swoole框架怎么重启服务 Apr 09, 2024 pm 06:15 PM

要重启 Swoole 服务,请按照以下步骤操作:检查服务状态并获取 PID。使用 "kill -15 PID" 停止服务。使用启动服务的相同命令重新启动服务。

swoole和java哪个性能好 swoole和java哪个性能好 Apr 09, 2024 pm 07:03 PM

性能比较:吞吐量:Swoole 凭借协程机制,吞吐量更高。延迟:Swoole 的协程上下文切换开销更低,延迟更小。内存消耗:Swoole 的协程占用内存更少。易用性:Swoole 提供更易于使用的并发编程 API。

Swoole实战:如何使用协程进行并发任务处理 Swoole实战:如何使用协程进行并发任务处理 Nov 07, 2023 pm 02:55 PM

Swoole实战:如何使用协程进行并发任务处理引言在日常的开发中,我们常常会遇到需要同时处理多个任务的情况。传统的处理方式是使用多线程或多进程来实现并发处理,但这种方式在性能和资源消耗上存在一定的问题。而PHP作为一门脚本语言,通常无法直接使用多线程或多进程的方式来处理任务。然而,借助于Swoole协程库,我们可以使用协程来实现高性能的并发任务处理。本文将介

swoole协程是怎样调度的 swoole协程是怎样调度的 Apr 09, 2024 pm 07:06 PM

Swoole协程是一种轻量级并发库,允许开发者编写并发程序。Swoole协程调度机制基于协程模式和事件循环,使用协程栈管理协程执行,在协程让出控制权后挂起它们。事件循环处理IO和定时器事件,协程让出控制权时被挂起并返回事件循环。当事件发生时,Swoole从事件循环切换到挂起的协程,通过保存和加载协程状态完成切换。协程调度使用优先级机制,支持挂起、休眠和恢复操作以灵活控制协程执行。

See all articles