This article mainly introduces the implementation of delay queue in PHP redis, which has certain reference value. Now I share it with you. Friends in need can refer to it.
Implementation of delayed task execution based on redis ordered set. For example, sending text messages to a certain user at a certain time, processing order expiration, etc.
I wrote it on the tp5 framework. It is very simple to implement. It is enough for some not very complex applications. It is currently used in company projects. , the background process does not implement multi-process,
I won’t say more, I will just post the code, and I won’t go back to the typesetting, please forgive me
1. Command line script execution method: php think delay-queue queuename (this is an ordered set key)
namespace app\command; use app\common\lib\delayqueue\DelayQueue; use think\console\Command; use think\console\Input; use think\console\Output; use think\Db; class DelayQueueWorker extends Command { const COMMAND_ARGV_1 = 'queue'; protected function configure() { $this->setName('delay-queue')->setDescription('延迟队列任务进程'); $this->addArgument(self::COMMAND_ARGV_1); } protected function execute(Input $input, Output $output) { $queue = $input->getArgument(self::COMMAND_ARGV_1); //参数1 延迟队列表名,对应与redis的有序集key名 while (true) { DelayQueue::getInstance($queue)->perform(); usleep(300000); } } }
Library directory structure
config.php is the redis connection parameter configuration
RedisHandler.php Only ordered set operations are implemented, and the reconnection mechanism has not yet been implemented
namespace app\common\lib\delayqueue; class RedisHandler { public $provider; private static $_instance = null; private function __construct() { $this->provider = new \Redis(); //host port $config = require_once 'config.php'; $this->provider->connect($config['redis_host'], $config['redis_port']); } final private function __clone() {} public static function getInstance() { if(!self::$_instance) { self::$_instance = new RedisHandler(); } return self::$_instance; } /** * @param string $key 有序集key * @param number $score 排序值 * @param string $value 格式化的数据 * @return int */ public function zAdd($key, $score, $value) { return $this->provider->zAdd($key, $score, $value); } /** * 获取有序集数据 * @param $key * @param $start * @param $end * @param null $withscores * @return array */ public function zRange($key, $start, $end, $withscores = null) { return $this->provider->zRange($key, $start, $end, $withscores); } /** * 删除有序集数据 * @param $key * @param $member * @return int */ public function zRem($key,$member) { return $this->provider->zRem($key,$member); } }
Delay queue class
namespace app\common\lib\delayqueue; class DelayQueue { private $prefix = 'delay_queue:'; private $queue; private static $_instance = null; private function __construct($queue) { $this->queue = $queue; } final private function __clone() {} public static function getInstance($queue = '') { if(!self::$_instance) { self::$_instance = new DelayQueue($queue); } return self::$_instance; } /** * 添加任务信息到队列 * * demo DelayQueue::getInstance('test')->addTask( * 'app\common\lib\delayqueue\job\Test', * strtotime('2018-05-02 20:55:20'), * ['abc'=>111] * ); * * @param $jobClass * @param int $runTime 执行时间 * @param array $args */ public function addTask($jobClass, $runTime, $args = null) { $key = $this->prefix.$this->queue; $params = [ 'class' => $jobClass, 'args' => $args, 'runtime' => $runTime, ]; RedisHandler::getInstance()->zAdd( $key, $runTime, serialize($params) ); } /** * 执行job * @return bool */ public function perform() { $key = $this->prefix.$this->queue; //取出有序集第一个元素 $result = RedisHandler::getInstance()->zRange($key, 0 ,0); if (!$result) { return false; } $jobInfo = unserialize($result[0]); print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL); $jobClass = $jobInfo['class']; if(!@class_exists($jobClass)) { print_r($jobClass.' undefined'. PHP_EOL); RedisHandler::getInstance()->zRem($key, $result[0]); return false; } // 到时间执行 if (time() >= $jobInfo['runtime']) { $job = new $jobClass; $job->setPayload($jobInfo['args']); $jobResult = $job->preform(); if ($jobResult) { // 将任务移除 RedisHandler::getInstance()->zRem($key, $result[0]); return true; } } return false; } }
Asynchronous task base class:
namespace app\common\lib\delayqueue; class DelayJob { protected $payload; public function preform () { // todo return true; } public function setPayload($args = null) { $this->payload = $args; } }
All asynchronously executed tasks are unloaded job directory, and to inherit DelayJob, you can implement any task you want to delay execution
For example:
namespace app\common\lib\delayqueue\job; use app\common\lib\delayqueue\DelayJob; class Test extends DelayJob { public function preform() { // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入 print_r('test job'.PHP_EOL); return true; } }
Usage method:
Assume that the user creates an order, The order will expire after 10 minutes, then add it after the order is created:
DelayQueue::getInstance('close_order')->addTask( 'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job strtotime('2018-05-02 20:55:20'), // 订单失效时间 ['order_id'=>123456] // 传递给job的参数 );
close_order is the key of the ordered set
Command line startup process
php think delay-queue close_order
Related recommendations:
PHP redis implements session sharing
The above is the detailed content of PHP+redis implements delay queue. For more information, please follow other related articles on the PHP Chinese website!