這篇文章為大家帶來了關於thinkphp的相關知識,其中主要整理了使用think-queue實現redis訊息隊列的相關問題,下面一起來看一下,希望對大家有幫助。
推薦學習:《PHP影片教學》
訊息佇列中間件是大型系統中的重要組件,逐漸成為企業系統內部通訊的核心手段。它具有鬆散耦合、非同步訊息、流量削峰、可靠投遞、廣播、流量控制、最終一致性等一系列功能,已成為非同步RPC的主要手段之一。
訊息佇列有兩個角色和一個容器,角色分別為生產者(負責發布任務)和消費者(負責執行任務),容器這是用來存放/堆積生產者發布的任務,將發布和執行兩個步驟分開且互不影響。
生產者發布任務存放/堆積在訊息佇列中,由消費者主動去訊息佇列中取出任務並執行,先發布的先執行(隊列:先進先出),在沒有消費者的情況下任務會堆積在隊列中等待被取出執行。
訊息佇列適用於大並發或處理時間長並需要批量操作的第三方接口,可用於但不僅限於簡訊發送、郵件發送、APP推送等,支援跨系統,也就是本系統發布的消息佇列可以由自己或給其他系統執行任務,同理本系統也可以作為消費者執行自己或其他系統發佈的訊息佇列任務。
ThinkPHP的Queue內建了Redis、Database、Topthink、Sync四種驅動,這裡使用的是Redis,也建議使用Redis
think-queue 佇列訊息可以進行任務的發布、取得、執行、刪除、重新發布、延遲發布、逾時控制等操作
在extra 目錄下建立queue.php 設定檔
<?phpreturn [ 'connector' => 'Redis', 'expire' => null, // 任务过期时间,默认为60秒,若要禁用,则设置为 null 'default' => 'REDIS_QUEUE', // 默认的队列名 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 0, // 使用哪里一个 db,默认为 db0 'timeout' => 0, // redis 连接的超时时间 'persistent' => false, // 是否是长连接];
至為什麼要放在這裡,是因為Queue 原始碼預設從extra 讀取queue 檔案取得設定信息,如果想要將設定檔放置其他地方,則需要對應修改原始程式碼中的預設取得配置,如下圖所示
建立一個測試類,寫入生產者方法
<?phpnamespace app\api\controller;use think\Controller;use think\Queue;class Test extends Controller{ // 生产者,添加消息队列 public function addQueue() { // 参数 $data = [ 'id' => rand(0, 99), 'userName' => '一起摸鱼' ]; // 消息队列名 $queueName = 'testQueue'; // 推入消息队列,注意这里的 ::class 是PHP5.5才有的写法 $isPushed = Queue::push(TestQueue::class, $data, $queueName); // PHP5.5以下的可以直接写命名空间 // $isPushed = Queue::push('app\common\queue\TestQueue', $data, $queueName); if ($isPushed !== false) { // 成功之后的业务 echo '队列加入成功'; } else { // 失败之后的业务 echo '队列加入失败'; } }}
建立一個TestQueue 類,用做消費者,執行訊息佇列中的任務
<?phpnamespace app\common\queue;use think\Log;use think\queue\Job;class TestQueue{ // 消费者执行入口 public function fire(Job $job, $data) { // 具体执行业务 $isJobDone = $this->doJob($data); if ($isJobDone) { // 消息队列执行成功,删除队列,否则会一直执行 $job->delete(); } else { // 消息队列执行失败 // 获取消息队列已经重试了几遍 $attempts = $job->attempts(); if ($attempts == 0 || $attempts == 1) { // 重新发布,参数 delay 是延时发布的时间 $job->release(2); } } } // 消息队列执行失败后会自动执行该方法 public function failed($data) { Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data)); } // 消息队列执行方法 public function doJob($data) { // 具体执行业务 $data = json_encode($data); echo '消息队列:' . $data; // 这里的判断条件以具体业务是否执行成功进行判断 if ($data) { echo "执行成功"; return true; } else { echo "执行失败"; return false; } }}
請求接口,生產者發布任務
redis 佇列存放任務
接下來就是啟用佇列的監聽模式了,因為不可能每次一有任務加進來就去手動執行一次隊列。佇列的監聽模式有兩種,設定參數如下:
專案根目錄執行
php think queue:work --queue 佇列名
開啟消費者,執行任務
redis 佇列中的任務執行後也被刪除
但是由於需要,我們還要將消費者掛起守護程序執行,以確保關掉終端也能夠啟動佇列。
nohup php think queue:listen --queue 佇列名稱&
PS:shell中輸入exit來退出終端機
PS:shell中輸入exit來退出終端機
##PS:shell中輸入exit來退出終端機
PHP影片教學》
以上是一文教你ThinkPHP使用think-queue實作redis訊息佇列的詳細內容。更多資訊請關注PHP中文網其他相關文章!