laravel8中laravel-swoole的扩展不兼容消息队列怎么办?
下面由laravel教程栏目给大家介绍laravel-swoole消息队列,希望对需要的朋友有所帮助!
这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;
思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!
直接上修改的思路和代码!开干!
一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.
首先重写swoole启动命令
<?php namespace crmeb\swoole\command; use Illuminate\Support\Arr; use Swoole\Process; use SwooleTW\Http\Server\Facades\Server; use SwooleTW\Http\Server\Manager; use crmeb\swoole\server\InteractsWithQueue; use crmeb\swoole\server\FileWatcher; use Swoole\Runtime; class HttpServerCommand extends \SwooleTW\Http\Commands\HttpServerCommand { use InteractsWithQueue; /** * The name and signature of the console command. * * @var string */ protected $signature = 'crmeb:http {action : start|stop|restart|reload|infos}'; /** * Run swoole_http_server. */ protected function start() { if ($this->isRunning()) { $this->error('Failed! swoole_http_server process is already running.'); return; } $host = Arr::get($this->config, 'server.host'); $port = Arr::get($this->config, 'server.port'); $hotReloadEnabled = Arr::get($this->config, 'hot_reload.enabled'); $queueEnabled = Arr::get($this->config, 'queue.enabled'); $accessLogEnabled = Arr::get($this->config, 'server.access_log'); $coroutineEnable = Arr::get($this->config, 'coroutine.enable'); $this->info('Starting swoole http server...'); $this->info("Swoole http server started: <http://{$host}:{$port}>"); if ($this->isDaemon()) { $this->info( '> (You can run this command to ensure the ' . 'swoole_http_server process is running: ps aux|grep "swoole")' ); } $manager = $this->laravel->make(Manager::class); $server = $this->laravel->make(Server::class); if ($accessLogEnabled) { $this->registerAccessLog(); } //热更新重写 if ($hotReloadEnabled) { $manager->addProcess($this->getHotReloadProcessNow($server)); } //启动消息队列进行消费 if ($queueEnabled) { $this->prepareQueue($manager); } if ($coroutineEnable) { Runtime::enableCoroutine(true, Arr::get($this->config, 'coroutine.flags', SWOOLE_HOOK_ALL)); } $manager->run(); } /** * @param Server $server * @return Process|void */ protected function getHotReloadProcessNow($server) { return new Process(function () use ($server) { $watcher = new FileWatcher( Arr::get($this->config, 'hot_reload.include', []), Arr::get($this->config, 'hot_reload.exclude', []), Arr::get($this->config, 'hot_reload.name', []) ); $watcher->watch(function () use ($server) { $server->reload(); }); }, false, 0, true); } }
InteractsWithQueue 类
<?php namespace crmeb\swoole\server; use crmeb\swoole\queue\Manager as QueueManager; use SwooleTW\Http\Server\Manager; /** * Trait InteractsWithQueue * @package crmeb\swoole\server */ trait InteractsWithQueue { public function prepareQueue(Manager $manager) { /** @var QueueManager $queueManager */ $queueManager = $this->laravel->make(QueueManager::class); $queueManager->attachToServer($manager, $this->output); } }
Manager类
<?php namespace crmeb\swoole\queue; use Illuminate\Contracts\Container\Container; use Swoole\Constant; use Swoole\Process; use Swoole\Process\Pool; use Swoole\Timer; use Illuminate\Support\Arr; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; use crmeb\swoole\server\WithContainer; use Illuminate\Queue\Jobs\Job; use function Swoole\Coroutine\run; use Illuminate\Queue\WorkerOptions; use SwooleTW\Http\Server\Manager as ServerManager; use Illuminate\Console\OutputStyle; class Manager { use WithContainer; /** * Container. * * @var \Illuminate\Contracts\Container\Container */ protected $container; /** * @var OutputStyle */ protected $output; /** * @var Closure[] */ protected $workers = []; /** * Manager constructor. * @param Container $container */ public function __construct(Container $container) { $this->container = $container; } /** * @param ServerManager $server */ public function attachToServer(ServerManager $server, OutputStyle $output) { $this->output = $output; $this->listenForEvents(); $this->createWorkers(); foreach ($this->workers as $worker) { $server->addProcess(new Process($worker, false, 0, true)); } } /** * 运行消息队列命令 */ public function run(): void { @cli_set_process_title("swoole queue: manager process"); $this->listenForEvents(); $this->createWorkers(); $pool = new Pool(count($this->workers)); $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) { $process = $pool->getProcess($workerId); run($this->workers[$workerId], $process); }); $pool->start(); } /** * 创建执行任务 */ protected function createWorkers() { $workers = $this->getConfig('queue.workers', []); foreach ($workers as $queue => $options) { if (strpos($queue, '@') !== false) { [$queue, $connection] = explode('@', $queue); } else { $connection = null; } $this->workers[] = function (Process $process) use ($options, $connection, $queue) { @cli_set_process_title("swoole queue: worker process"); /** @var Worker $worker */ $worker = $this->container->make('queue.worker'); /** @var WorkerOptions $option */ $option = $this->container->make(WorkerOptions::class); $option->sleep = Arr::get($options, "sleep", 3); $option->maxTries = Arr::get($options, "tries", 0); $option->timeout = Arr::get($options, "timeout", 60); $timer = Timer::after($option->timeout * 1000, function () use ($process) { $process->exit(); }); $worker->runNextJob($connection, $queue, $option); Timer::clear($timer); }; } } /** * 注册事件 */ protected function listenForEvents() { $this->container->make('events')->listen(JobFailed::class, function (JobFailed $event) { $this->writeOutput($event->job); $this->logFailedJob($event); }); } /** * 记录失败任务 * @param JobFailed $event */ protected function logFailedJob(JobFailed $event) { $this->container['queue.failer']->log( $event->connection, $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); } /** * Write the status output for the queue worker. * * @param Job $job * @param $status */ protected function writeOutput(Job $job, $status) { switch ($status) { case 'starting': $this->writeStatus($job, 'Processing', 'comment'); break; case 'success': $this->writeStatus($job, 'Processed', 'info'); break; case 'failed': $this->writeStatus($job, 'Failed', 'error'); break; } } /** * Format the status output for the queue worker. * * @param Job $job * @param string $status * @param string $type * @return void */ protected function writeStatus(Job $job, $status, $type) { $this->output->writeln(sprintf( "<{$type}>[%s][%s] %s</{$type}> %s", date('Y-m-d H:i:s'), $job->getJobId(), str_pad("{$status}:", 11), $job->getName() )); } }
增加CrmebServiceProvider类
<?php namespace crmeb\swoole; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Contracts\Http\Kernel; use crmeb\swoole\command\HttpServerCommand; use Illuminate\Queue\Worker; use SwooleTW\Http\HttpServiceProvider; use SwooleTW\Http\Middleware\AccessLog; use SwooleTW\Http\Server\Manager; /** * Class CrmebServiceProvider * @package crmeb\swoole */ class CrmebServiceProvider extends HttpServiceProvider { /** * Register manager. * * @return void */ protected function registerManager() { $this->app->singleton(Manager::class, function ($app) { return new Manager($app, 'laravel'); }); $this->app->alias(Manager::class, 'swoole.manager'); $this->app->singleton('queue.worker', function ($app) { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); }; return new Worker( $app['queue'], $app['events'], $app[ExceptionHandler::class], $isDownForMaintenance ); }); } /** * Boot websocket routes. * * @return void */ protected function bootWebsocketRoutes() { require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php'; } /** * Register access log middleware to container. * * @return void */ protected function pushAccessLogMiddleware() { $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class); } /** * Register commands. */ protected function registerCommands() { $this->commands([ HttpServerCommand::class, ]); } /** * Merge configurations. */ protected function mergeConfigs() { $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http'); $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket'); } /** * Publish files of this package. */ protected function publishFiles() { $this->publishes([ base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' => base_path('config/swoole_http.php'), base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' => base_path('config/swoole_websocket.php'), base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' => base_path('routes/websocket.php'), ], 'laravel-swoole'); } }
然后再把\crmeb\swoole\CrmebServiceProvider::class
放入config/app.php
中的providers
中加载重写了swoole的命令启动方式
配置config/swoole_http.php
return [ 'queue' => [ //是否开启自动消费队列 'enabled' => true, 'workers' => [ //队列名称 'CRMEB' => [] ] ],];
输入命令:php artisan crmeb:http restart
swoole启动后就可以自动消费队列了。
相关推荐:最新的五个Laravel视频教程
以上是laravel8中laravel-swoole的扩展不兼容消息队列怎么办?的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

Laravel 中使用 Swoole 协程可以并发处理大量请求,优势包括:并发处理:允许同时处理多个请求。高性能:基于 Linux epoll 事件机制,高效处理请求。低资源消耗:所需服务器资源更少。易于集成:与 Laravel 框架无缝集成,使用简单。

如何使用Swoole实现高性能的HTTP反向代理服务器Swoole是一款基于PHP语言的高性能、异步、并发的网络通信框架。它提供了一系列的网络功能,可以用于实现HTTP服务器、WebSocket服务器等。在本文中,我们将介绍如何使用Swoole来实现一个高性能的HTTP反向代理服务器,并提供具体的代码示例。环境配置首先,我们需要在服务器上安装Swoole扩展

Swoole 和 Workerman 都是高性能 PHP 服务器框架。Swoole 以其异步处理、出色的性能和可扩展性而闻名,适用于需要处理大量并发请求和高吞吐量的项目。Workerman 提供了异步和同步模式的灵活性,具有直观的 API,更适合易用性和处理较低并发量的项目。

要重启 Swoole 服务,请按照以下步骤操作:检查服务状态并获取 PID。使用 "kill -15 PID" 停止服务。使用启动服务的相同命令重新启动服务。

性能比较:吞吐量:Swoole 凭借协程机制,吞吐量更高。延迟:Swoole 的协程上下文切换开销更低,延迟更小。内存消耗:Swoole 的协程占用内存更少。易用性:Swoole 提供更易于使用的并发编程 API。

Swoole实战:如何使用协程进行并发任务处理引言在日常的开发中,我们常常会遇到需要同时处理多个任务的情况。传统的处理方式是使用多线程或多进程来实现并发处理,但这种方式在性能和资源消耗上存在一定的问题。而PHP作为一门脚本语言,通常无法直接使用多线程或多进程的方式来处理任务。然而,借助于Swoole协程库,我们可以使用协程来实现高性能的并发任务处理。本文将介

Swoole协程是一种轻量级并发库,允许开发者编写并发程序。Swoole协程调度机制基于协程模式和事件循环,使用协程栈管理协程执行,在协程让出控制权后挂起它们。事件循环处理IO和定时器事件,协程让出控制权时被挂起并返回事件循环。当事件发生时,Swoole从事件循环切换到挂起的协程,通过保存和加载协程状态完成切换。协程调度使用优先级机制,支持挂起、休眠和恢复操作以灵活控制协程执行。
