下面由laravel教程栏目给大家介绍laravel-swoole消息队列,希望对需要的朋友有所帮助!
这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;
思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!
直接上修改的思路和代码!开干!
一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.
首先重写swoole启动命令
<?php namespace crmeb\swoole\command; use Illuminate\Support\Arr; use Swoole\Process; use SwooleTW\Http\Server\Facades\Server; use SwooleTW\Http\Server\Manager; use crmeb\swoole\server\InteractsWithQueue; use crmeb\swoole\server\FileWatcher; use Swoole\Runtime; class HttpServerCommand extends \SwooleTW\Http\Commands\HttpServerCommand { use InteractsWithQueue; /** * The name and signature of the console command. * * @var string */ protected $signature = 'crmeb:http {action : start|stop|restart|reload|infos}'; /** * Run swoole_http_server. */ protected function start() { if ($this->isRunning()) { $this->error('Failed! swoole_http_server process is already running.'); return; } $host = Arr::get($this->config, 'server.host'); $port = Arr::get($this->config, 'server.port'); $hotReloadEnabled = Arr::get($this->config, 'hot_reload.enabled'); $queueEnabled = Arr::get($this->config, 'queue.enabled'); $accessLogEnabled = Arr::get($this->config, 'server.access_log'); $coroutineEnable = Arr::get($this->config, 'coroutine.enable'); $this->info('Starting swoole http server...'); $this->info("Swoole http server started: <http://{$host}:{$port}>"); if ($this->isDaemon()) { $this->info( '> (You can run this command to ensure the ' . 'swoole_http_server process is running: ps aux|grep "swoole")' ); } $manager = $this->laravel->make(Manager::class); $server = $this->laravel->make(Server::class); if ($accessLogEnabled) { $this->registerAccessLog(); } //热更新重写 if ($hotReloadEnabled) { $manager->addProcess($this->getHotReloadProcessNow($server)); } //启动消息队列进行消费 if ($queueEnabled) { $this->prepareQueue($manager); } if ($coroutineEnable) { Runtime::enableCoroutine(true, Arr::get($this->config, 'coroutine.flags', SWOOLE_HOOK_ALL)); } $manager->run(); } /** * @param Server $server * @return Process|void */ protected function getHotReloadProcessNow($server) { return new Process(function () use ($server) { $watcher = new FileWatcher( Arr::get($this->config, 'hot_reload.include', []), Arr::get($this->config, 'hot_reload.exclude', []), Arr::get($this->config, 'hot_reload.name', []) ); $watcher->watch(function () use ($server) { $server->reload(); }); }, false, 0, true); } }
InteractsWithQueue 类
<?php namespace crmeb\swoole\server; use crmeb\swoole\queue\Manager as QueueManager; use SwooleTW\Http\Server\Manager; /** * Trait InteractsWithQueue * @package crmeb\swoole\server */ trait InteractsWithQueue { public function prepareQueue(Manager $manager) { /** @var QueueManager $queueManager */ $queueManager = $this->laravel->make(QueueManager::class); $queueManager->attachToServer($manager, $this->output); } }
Manager类
<?php namespace crmeb\swoole\queue; use Illuminate\Contracts\Container\Container; use Swoole\Constant; use Swoole\Process; use Swoole\Process\Pool; use Swoole\Timer; use Illuminate\Support\Arr; use Illuminate\Queue\Events\JobFailed; use Illuminate\Queue\Worker; use crmeb\swoole\server\WithContainer; use Illuminate\Queue\Jobs\Job; use function Swoole\Coroutine\run; use Illuminate\Queue\WorkerOptions; use SwooleTW\Http\Server\Manager as ServerManager; use Illuminate\Console\OutputStyle; class Manager { use WithContainer; /** * Container. * * @var \Illuminate\Contracts\Container\Container */ protected $container; /** * @var OutputStyle */ protected $output; /** * @var Closure[] */ protected $workers = []; /** * Manager constructor. * @param Container $container */ public function __construct(Container $container) { $this->container = $container; } /** * @param ServerManager $server */ public function attachToServer(ServerManager $server, OutputStyle $output) { $this->output = $output; $this->listenForEvents(); $this->createWorkers(); foreach ($this->workers as $worker) { $server->addProcess(new Process($worker, false, 0, true)); } } /** * 运行消息队列命令 */ public function run(): void { @cli_set_process_title("swoole queue: manager process"); $this->listenForEvents(); $this->createWorkers(); $pool = new Pool(count($this->workers)); $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) { $process = $pool->getProcess($workerId); run($this->workers[$workerId], $process); }); $pool->start(); } /** * 创建执行任务 */ protected function createWorkers() { $workers = $this->getConfig('queue.workers', []); foreach ($workers as $queue => $options) { if (strpos($queue, '@') !== false) { [$queue, $connection] = explode('@', $queue); } else { $connection = null; } $this->workers[] = function (Process $process) use ($options, $connection, $queue) { @cli_set_process_title("swoole queue: worker process"); /** @var Worker $worker */ $worker = $this->container->make('queue.worker'); /** @var WorkerOptions $option */ $option = $this->container->make(WorkerOptions::class); $option->sleep = Arr::get($options, "sleep", 3); $option->maxTries = Arr::get($options, "tries", 0); $option->timeout = Arr::get($options, "timeout", 60); $timer = Timer::after($option->timeout * 1000, function () use ($process) { $process->exit(); }); $worker->runNextJob($connection, $queue, $option); Timer::clear($timer); }; } } /** * 注册事件 */ protected function listenForEvents() { $this->container->make('events')->listen(JobFailed::class, function (JobFailed $event) { $this->writeOutput($event->job); $this->logFailedJob($event); }); } /** * 记录失败任务 * @param JobFailed $event */ protected function logFailedJob(JobFailed $event) { $this->container['queue.failer']->log( $event->connection, $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); } /** * Write the status output for the queue worker. * * @param Job $job * @param $status */ protected function writeOutput(Job $job, $status) { switch ($status) { case 'starting': $this->writeStatus($job, 'Processing', 'comment'); break; case 'success': $this->writeStatus($job, 'Processed', 'info'); break; case 'failed': $this->writeStatus($job, 'Failed', 'error'); break; } } /** * Format the status output for the queue worker. * * @param Job $job * @param string $status * @param string $type * @return void */ protected function writeStatus(Job $job, $status, $type) { $this->output->writeln(sprintf( "<{$type}>[%s][%s] %s</{$type}> %s", date('Y-m-d H:i:s'), $job->getJobId(), str_pad("{$status}:", 11), $job->getName() )); } }
增加CrmebServiceProvider类
<?php namespace crmeb\swoole; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Contracts\Http\Kernel; use crmeb\swoole\command\HttpServerCommand; use Illuminate\Queue\Worker; use SwooleTW\Http\HttpServiceProvider; use SwooleTW\Http\Middleware\AccessLog; use SwooleTW\Http\Server\Manager; /** * Class CrmebServiceProvider * @package crmeb\swoole */ class CrmebServiceProvider extends HttpServiceProvider { /** * Register manager. * * @return void */ protected function registerManager() { $this->app->singleton(Manager::class, function ($app) { return new Manager($app, 'laravel'); }); $this->app->alias(Manager::class, 'swoole.manager'); $this->app->singleton('queue.worker', function ($app) { $isDownForMaintenance = function () { return $this->app->isDownForMaintenance(); }; return new Worker( $app['queue'], $app['events'], $app[ExceptionHandler::class], $isDownForMaintenance ); }); } /** * Boot websocket routes. * * @return void */ protected function bootWebsocketRoutes() { require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php'; } /** * Register access log middleware to container. * * @return void */ protected function pushAccessLogMiddleware() { $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class); } /** * Register commands. */ protected function registerCommands() { $this->commands([ HttpServerCommand::class, ]); } /** * Merge configurations. */ protected function mergeConfigs() { $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http'); $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket'); } /** * Publish files of this package. */ protected function publishFiles() { $this->publishes([ base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' => base_path('config/swoole_http.php'), base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' => base_path('config/swoole_websocket.php'), base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' => base_path('routes/websocket.php'), ], 'laravel-swoole'); } }
然后再把\crmeb\swoole\CrmebServiceProvider::class
放入config/app.php
中的providers
中加载重写了swoole的命令启动方式
配置config/swoole_http.php
return [ 'queue' => [ //是否开启自动消费队列 'enabled' => true, 'workers' => [ //队列名称 'CRMEB' => [] ] ],];
输入命令:php artisan crmeb:http restart
swoole启动后就可以自动消费队列了。
相关推荐:最新的五个Laravel视频教程
Atas ialah kandungan terperinci laravel8中laravel-swoole的扩展不兼容消息队列怎么办?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!