php+redis實作延遲隊列
這篇文章主要介紹了關於php redis實現延遲隊列,有著一定的參考價值,現在分享給大家,有需要的朋友可以參考一下
基於redis有序集實現延遲任務執行,例如某個時間給某個用戶發短信,訂單過期處理,等等
我是在tp5框架上寫的,實現起來很簡單,對於一些不是很複雜的應用足夠了,目前在公司項目中使用,後台進程並沒有實現多進程,
不多說,貼程式碼,不回排版,見諒
1、命令列腳本執行方法:php think delay-queue queuename(這是有序集的key)
namespace app\command; use app\common\lib\delayqueue\DelayQueue; use think\console\Command; use think\console\Input; use think\console\Output; use think\Db; class DelayQueueWorker extends Command { const COMMAND_ARGV_1 = 'queue'; protected function configure() { $this->setName('delay-queue')->setDescription('延迟队列任务进程'); $this->addArgument(self::COMMAND_ARGV_1); } protected function execute(Input $input, Output $output) { $queue = $input->getArgument(self::COMMAND_ARGV_1); //参数1 延迟队列表名,对应与redis的有序集key名 while (true) { DelayQueue::getInstance($queue)->perform(); usleep(300000); } } }
庫類別目錄結構
#config.php 裡是redis連接參數配置
RedisHandler.php只實現有序集的操作,重連機制還沒有實現
namespace app\common\lib\delayqueue; class RedisHandler { public $provider; private static $_instance = null; private function __construct() { $this->provider = new \Redis(); //host port $config = require_once 'config.php'; $this->provider->connect($config['redis_host'], $config['redis_port']); } final private function __clone() {} public static function getInstance() { if(!self::$_instance) { self::$_instance = new RedisHandler(); } return self::$_instance; } /** * @param string $key 有序集key * @param number $score 排序值 * @param string $value 格式化的数据 * @return int */ public function zAdd($key, $score, $value) { return $this->provider->zAdd($key, $score, $value); } /** * 获取有序集数据 * @param $key * @param $start * @param $end * @param null $withscores * @return array */ public function zRange($key, $start, $end, $withscores = null) { return $this->provider->zRange($key, $start, $end, $withscores); } /** * 删除有序集数据 * @param $key * @param $member * @return int */ public function zRem($key,$member) { return $this->provider->zRem($key,$member); } }
延遲隊列類別
namespace app\common\lib\delayqueue; class DelayQueue { private $prefix = 'delay_queue:'; private $queue; private static $_instance = null; private function __construct($queue) { $this->queue = $queue; } final private function __clone() {} public static function getInstance($queue = '') { if(!self::$_instance) { self::$_instance = new DelayQueue($queue); } return self::$_instance; } /** * 添加任务信息到队列 * * demo DelayQueue::getInstance('test')->addTask( * 'app\common\lib\delayqueue\job\Test', * strtotime('2018-05-02 20:55:20'), * ['abc'=>111] * ); * * @param $jobClass * @param int $runTime 执行时间 * @param array $args */ public function addTask($jobClass, $runTime, $args = null) { $key = $this->prefix.$this->queue; $params = [ 'class' => $jobClass, 'args' => $args, 'runtime' => $runTime, ]; RedisHandler::getInstance()->zAdd( $key, $runTime, serialize($params) ); } /** * 执行job * @return bool */ public function perform() { $key = $this->prefix.$this->queue; //取出有序集第一个元素 $result = RedisHandler::getInstance()->zRange($key, 0 ,0); if (!$result) { return false; } $jobInfo = unserialize($result[0]); print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL); $jobClass = $jobInfo['class']; if(!@class_exists($jobClass)) { print_r($jobClass.' undefined'. PHP_EOL); RedisHandler::getInstance()->zRem($key, $result[0]); return false; } // 到时间执行 if (time() >= $jobInfo['runtime']) { $job = new $jobClass; $job->setPayload($jobInfo['args']); $jobResult = $job->preform(); if ($jobResult) { // 将任务移除 RedisHandler::getInstance()->zRem($key, $result[0]); return true; } } return false; } }
非同步任務基類:
namespace app\common\lib\delayqueue; class DelayJob { protected $payload; public function preform () { // todo return true; } public function setPayload($args = null) { $this->payload = $args; } }
所有非同步執行的任務都卸載job目錄下,且要繼承DelayJob,你可以實現任何你想延遲執行的任務
如:
namespace app\common\lib\delayqueue\job; use app\common\lib\delayqueue\DelayJob; class Test extends DelayJob { public function preform() { // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入 print_r('test job'.PHP_EOL); return true; } }
使用方法:
假設用戶創建了一個訂單,訂單在10分鐘後失效,那麼在訂單建立後加入:
DelayQueue::getInstance('close_order')->addTask( 'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job strtotime('2018-05-02 20:55:20'), // 订单失效时间 ['order_id'=>123456] // 传递给job的参数 );
close_order 是有序集的key
命令列啟動程序
php think delay-queue close_order
相關推薦:
以上是php+redis實作延遲隊列的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

PHP 8.4 帶來了多項新功能、安全性改進和效能改進,同時棄用和刪除了大量功能。 本指南介紹如何在 Ubuntu、Debian 或其衍生版本上安裝 PHP 8.4 或升級到 PHP 8.4

Visual Studio Code,也稱為 VS Code,是一個免費的原始碼編輯器 - 或整合開發環境 (IDE) - 可用於所有主要作業系統。 VS Code 擁有大量針對多種程式語言的擴展,可以輕鬆編寫

JWT是一種基於JSON的開放標準,用於在各方之間安全地傳輸信息,主要用於身份驗證和信息交換。 1.JWT由Header、Payload和Signature三部分組成。 2.JWT的工作原理包括生成JWT、驗證JWT和解析Payload三個步驟。 3.在PHP中使用JWT進行身份驗證時,可以生成和驗證JWT,並在高級用法中包含用戶角色和權限信息。 4.常見錯誤包括簽名驗證失敗、令牌過期和Payload過大,調試技巧包括使用調試工具和日誌記錄。 5.性能優化和最佳實踐包括使用合適的簽名算法、合理設置有效期、

字符串是由字符組成的序列,包括字母、數字和符號。本教程將學習如何使用不同的方法在PHP中計算給定字符串中元音的數量。英語中的元音是a、e、i、o、u,它們可以是大寫或小寫。 什麼是元音? 元音是代表特定語音的字母字符。英語中共有五個元音,包括大寫和小寫: a, e, i, o, u 示例 1 輸入:字符串 = "Tutorialspoint" 輸出:6 解釋 字符串 "Tutorialspoint" 中的元音是 u、o、i、a、o、i。總共有 6 個元

本教程演示瞭如何使用PHP有效地處理XML文檔。 XML(可擴展的標記語言)是一種用於人類可讀性和機器解析的多功能文本標記語言。它通常用於數據存儲

靜態綁定(static::)在PHP中實現晚期靜態綁定(LSB),允許在靜態上下文中引用調用類而非定義類。 1)解析過程在運行時進行,2)在繼承關係中向上查找調用類,3)可能帶來性能開銷。

PHP的魔法方法有哪些? PHP的魔法方法包括:1.\_\_construct,用於初始化對象;2.\_\_destruct,用於清理資源;3.\_\_call,處理不存在的方法調用;4.\_\_get,實現動態屬性訪問;5.\_\_set,實現動態屬性設置。這些方法在特定情況下自動調用,提升代碼的靈活性和效率。
