この記事では、thinkphp に関する関連知識を提供します。主に、think-queue を使用して通常のキューと遅延キューを実装することに関する関連コンテンツを紹介します。think-queue は、thinkphp によって公式に提供されているメッセージ キュー サービスです。見てみましょう。皆さんのお役に立てれば幸いです。
推奨学習: 「PHP ビデオ チュートリアル 」
1. プロデューサーを通じてメッセージ キュー サービスにメッセージをプッシュします
2. メッセージ キュー サービスは、受信したメッセージを保存しますRedis キュー (zset) へのメッセージ
3. コンシューマーはキューを監視します。キュー内の新しいメッセージを受信すると、キュー内の最初のメッセージを取得します。
4. プロセス関連ビジネス
5. ビジネス処理後、メッセージをキューから削除する必要があります
composer require topthink/think-queue
think-queueをインストールするとconfigディレクトリにqueue.phpが生成されます、このファイルがキューの設定ファイルです。
tp6 では、さまざまなメッセージ キューの実装方法が提供されています。デフォルトでは、同期が使用されます。ここでは Redis を使用することにします。
return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', '6379'), 'password' => env('redis.password','123456'), 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
アプリ ディレクトリにキュー ディレクトリを作成し、このディレクトリに新しい抽象クラス Queue.php ファイルを基本クラス
<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/** * Class Queue 队列消费基础类 * @package app\queue */abstract class Queue{ /** * @describe:fire是消息队列默认调用的方法 * @param \think\queue\Job $job * @param $message */ public function fire(Job $job, $data) { if (empty($data)) { Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__)); return ; } $jobId = $job->getJobId(); // 队列的数据库id或者redis key // $jobClassName = $job->getName(); // 队列对象类 // $queueName = $job->getQueue(); // 队列名称 // 如果已经执行中或者执行完成就不再执行了 if (!$this->checkJob($jobId, $data)) { $job->delete(); Cache::store('redis')->delete($jobId); return ; } // 执行业务处理 if ($this->execute($data)) { Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__)); $job->delete(); // 任务执行成功后删除 Cache::store('redis')->delete($jobId); // 删除redis中的缓存 } else { // 检查任务重试次数 if ($job->attempts() > 3) { Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__)); // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行 //$job->release(10); // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数 //$job->failed(); // 第3种处理方式:删除任务 $job->delete(); // 任务执行后删除 Cache::store('redis')->delete($jobId); // 删除redis中的缓存 } } } /** * 消息在到达消费者时可能已经不需要执行了 * @param string $jobId * @param $message * @return bool 任务执行的结果 * @throws \Psr\SimpleCache\InvalidArgumentException */ protected function checkJob(string $jobId, $message): bool { // 查询redis $data = Cache::store('redis')->get($jobId); if (!empty($data)) { return false; } Cache::store('redis')->set($jobId, $message); return true; } /** * @describe: 根据消息中的数据进行实际的业务处理 * @param $data 数据 * @return bool 返回结果 */ abstract protected function execute($data): bool;}
すべての実際のコンシューマ クラスは基本抽象クラスを継承します
<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{ protected function execute($data): bool { // 具体消费业务逻辑 }}
use think\facade\Queue; // 普通队列生成调用方式 Queue::push($job, $data, $queueName); // 例: Queue::push(Test::class, $data, $queueName); // 延时队列生成调用方式 Queue::later($delay, $job, $data, $queueName); // 例如使用延时队列 10 秒后执行: Queue::later(10 , Test::class, $data, $queueName);
php think queue:listen php think queue:work
queue:work command
work コマンド: このコマンドは、メッセージキューを処理するためのワークプロセスを開始します。
php think queue:work --queue TestQueue
queue:listen command
listen コマンド: このコマンドは listen 親プロセスを作成し、親プロセスは proc_open('php think queue :work')
を使用して、メッセージ キューを処理し、ワーク プロセスの実行時間を制限するためのワーク サブプロセスを作成します。
php think queue:listen --queue TestQueue
作業モード
php think queue:work \ --daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出 --queue helloJobQueue //要处理的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
リッスン モード
php think queue:listen \ --queue helloJobQueue \ //监听的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则多长时间后重新检查,daemon模式下有效 --tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0 --timeout 60 //创建的work子进程的允许执行的最长时间,以秒为单位
リッスン モードでは、--deamon
パラメータが含まれていないことがわかります。理由は以下で説明します。
メッセージの開始、停止、再開queue
メッセージ キューの開始:
php think queue:work
すべてのメッセージ キューの停止:
php think queue:restart
再起動すべてのメッセージ キュー:
php think queue:restart php think queue:work
推奨学習: 「PHP ビデオ チュートリアル 」
以上がthink-queue を使用して通常のキューと遅延キューを実装する thinkphp6 について話しましょうの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。