Simple scheduled task solution: use keyspace notifications of redis (notification event after key failure). Please note that this feature was launched after redis version 2.8, so the reids on your server must be at least version 2.8 or above;
(A) Business scenario:
1. When a business is triggered, a scheduled task needs to be started, and another task must be executed within the specified time (such as automatic cancellation of orders, automatic completion Orders and other functions)
2. Redis’s keyspace notifications will send an event after the key expires, and the client that listens to this event can receive the notification
(B) Service Preparation:
1. Modify the reids configuration file (redis.conf) [window system configuration file is: redis.windows.conf]
redis will not enable keyspace notifications by default, because After being turned on, it will consume the CPU
Remarks: E: keyevent event, the event is published with __keyevent@
x: Expired event, when a certain This event will occur when the key expires and is deleted;
The original configuration is:
notify-keyspace-events ""
Change the configuration as follows:
notify-keyspace-events "Ex"
After saving the configuration, restart the Redis service to make the configuration take effect
[root@chokingwin etc]# service redis-server restart /usr/local/redis/etc/redis.conf Stopping redis-server: [ OK ] Starting redis-server: [ OK ]
Restart redis in the window system, first switch to the redis file directory, and then close the redis service (redis-server --service-stop), then open (redis-server --service-start)
##C) File code:
phpredis implements subscription to Keyspace notification, which can automatically cancel orders and automatically complete orders. The following is a test exampleCreate 4 files, and then modify the database and redis configuration parameters yourselfdb.class.php<?phpclass mysql{ private $mysqli; private $result; /** * 数据库连接 * @param $config 配置数组 */ public function connect() { $config=array( 'host'=>'127.0.0.1', 'username'=>'root', 'password'=>'168168', 'database'=>'test', 'port'=>3306, ); $host = $config['host']; //主机地址 $username = $config['username'];//用户名 $password = $config['password'];//密码 $database = $config['database'];//数据库 $port = $config['port']; //端口号 $this->mysqli = new mysqli($host, $username, $password, $database, $port); } /** * 数据查询 * @param $table 数据表 * @param null $field 字段 * @param null $where 条件 * @return mixed 查询结果数目 */ public function select($table, $field = null, $where = null) { $sql = "SELECT * FROM `{$table}`"; //echo $sql;exit; if (!empty($field)) { $field = '`' . implode('`,`', $field) . '`'; $sql = str_replace('*', $field, $sql); } if (!empty($where)) { $sql = $sql . ' WHERE ' . $where; } $this->result = $this->mysqli->query($sql); return $this->result; } /** * @return mixed 获取全部结果 */ public function fetchAll() { return $this->result->fetch_all(MYSQLI_ASSOC); } /** * 插入数据 * @param $table 数据表 * @param $data 数据数组 * @return mixed 插入ID */ public function insert($table, $data) { foreach ($data as $key => $value) { $data[$key] = $this->mysqli->real_escape_string($value); } $keys = '`' . implode('`,`', array_keys($data)) . '`'; $values = '\'' . implode("','", array_values($data)) . '\''; $sql = "INSERT INTO `{$table}`( {$keys} )VALUES( {$values} )"; $this->mysqli->query($sql); return $this->mysqli->insert_id; } /** * 更新数据 * @param $table 数据表 * @param $data 数据数组 * @param $where 过滤条件 * @return mixed 受影响记录 */ public function update($table, $data, $where) { foreach ($data as $key => $value) { $data[$key] = $this->mysqli->real_escape_string($value); } $sets = array(); foreach ($data as $key => $value) { $kstr = '`' . $key . '`'; $vstr = '\'' . $value . '\''; array_push($sets, $kstr . '=' . $vstr); } $kav = implode(',', $sets); $sql = "UPDATE `{$table}` SET {$kav} WHERE {$where}"; $this->mysqli->query($sql); return $this->mysqli->affected_rows; } /** * 删除数据 * @param $table 数据表 * @param $where 过滤条件 * @return mixed 受影响记录 */ public function delete($table, $where) { $sql = "DELETE FROM `{$table}` WHERE {$where}"; $this->mysqli->query($sql); return $this->mysqli->affected_rows; } }
<?php require_once 'Redis2.class.php'; $redis = new \Redis2('127.0.0.1','6379','','15'); $order_sn = 'SN'.time().'T'.rand(10000000,99999999); $use_mysql = 1; //是否使用数据库,1使用,2不使用 if($use_mysql == 1){ /* * //数据表 * CREATE TABLE `order` ( * `ordersn` varchar(255) NOT NULL DEFAULT '', * `status` varchar(255) NOT NULL DEFAULT '', * `createtime` varchar(255) NOT NULL DEFAULT '', * `id` int(11) unsigned NOT NULL AUTO_INCREMENT, * PRIMARY KEY (`id`) * ) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=utf8mb4; */ require_once 'db.class.php'; $mysql = new \mysql(); $mysql->connect(); $data = ['ordersn'=>$order_sn,'status'=>0,'createtime'=>date('Y-m-d H:i:s',time())]; $mysql->insert('order',$data); } $list = [$order_sn,$use_mysql]; $key = implode(':',$list); $redis->setex($key,3,'redis延迟任务'); //3秒后回调 $test_del = false; //测试删除缓存后是否会有过期回调。结果:没有回调 if($test_del == true){ //sleep(1); $redis->delete($order_sn); } echo $order_sn; /* * 测试其他key会不会有回调,结果:有回调 * $k = 'test'; * $redis2->set($k,'100'); * $redis2->expire($k,10); * */
<?php ini_set('default_socket_timeout', -1); //不超时 require_once 'Redis2.class.php'; $redis_db = '15'; $redis = new \Redis2('127.0.0.1','6379','',$redis_db); // 解决Redis客户端订阅时候超时情况 $redis->setOption(); //当key过期的时候就看到通知,订阅的key __keyevent@<db>__:expired 这个格式是固定的,db代表的是数据库的编号,由于订阅开启之后这个库的所有key过期时间都会被推送过来,所以最好单独使用一个数据库来进行隔离 $redis->psubscribe(array('__keyevent@'.$redis_db.'__:expired'), 'keyCallback'); // 回调函数,这里写处理逻辑 function keyCallback($redis, $pattern, $channel, $msg) { echo PHP_EOL; echo "Pattern: $pattern\n"; echo "Channel: $channel\n"; echo "Payload: $msg\n\n"; $list = explode(':',$msg); $order_sn = isset($list[0])?$list[0]:'0'; $use_mysql = isset($list[1])?$list[1]:'0'; if($use_mysql == 1){ require_once 'db.class.php'; $mysql = new \mysql(); $mysql->connect(); $where = "ordersn = '".$order_sn."'"; $mysql->select('order','',$where); $finds=$mysql->fetchAll(); print_r($finds); if(isset($finds[0]['status']) && $finds[0]['status']==0){ $data = array('status' => 3); $where = " id = ".$finds[0]['id']; $mysql->update('order',$data,$where); } } } //或者 /*$redis->psubscribe(array('__keyevent@'.$redis_db.'__:expired'), function ($redis, $pattern, $channel, $msg){ echo PHP_EOL; echo "Pattern: $pattern\n"; echo "Channel: $channel\n"; echo "Payload: $msg\n\n"; //................ });*/
<?php class Redis2 { private $redis; public function __construct($host = '127.0.0.1', $port = '6379',$password = '',$db = '15') { $this->redis = new Redis(); $this->redis->connect($host, $port); //连接Redis $this->redis->auth($password); //密码验证 $this->redis->select($db); //选择数据库 } public function setex($key, $time, $val) { return $this->redis->setex($key, $time, $val); } public function set($key, $val) { return $this->redis->set($key, $val); } public function get($key) { return $this->redis->get($key); } public function expire($key = null, $time = 0) { return $this->redis->expire($key, $time); } public function psubscribe($patterns = array(), $callback) { $this->redis->psubscribe($patterns, $callback); } public function setOption() { $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, -1); } public function lRange($key,$start,$end) { return $this->redis->lRange($key,$start,$end); } public function lPush($key, $value1, $value2 = null, $valueN = null ){ return $this->redis->lPush($key, $value1, $value2 = null, $valueN = null ); } public function delete($key1, $key2 = null, $key3 = null) { return $this->redis->delete($key1, $key2 = null, $key3 = null); } }
<?php #! /usr/bin/env php ini_set('default_socket_timeout', -1); //不超时 require_once 'Redis2.class.php'; $redis_db = '15'; $redis = new \Redis2('127.0.0.1','6379','',$redis_db); // 解决Redis客户端订阅时候超时情况 $redis->setOption(); //当key过期的时候就看到通知,订阅的key __keyevent@<db>__:expired 这个格式是固定的,db代表的是数据库的编号,由于订阅开启之后这个库的所有key过期时间都会被推送过来,所以最好单独使用一个数据库来进行隔离 $redis->psubscribe(array('__keyevent@'.$redis_db.'__:expired'), 'keyCallback'); // 回调函数,这里写处理逻辑 function keyCallback($redis, $pattern, $channel, $msg) { echo PHP_EOL; echo "Pattern: $pattern\n"; echo "Channel: $channel\n"; echo "Payload: $msg\n\n"; $list = explode(':',$msg); $order_sn = isset($list[0])?$list[0]:'0'; $use_mysql = isset($list[1])?$list[1]:'0'; if($use_mysql == 1){ require_once 'db.class.php'; $mysql = new \mysql(); $mysql->connect(); $where = "ordersn = '".$order_sn."'"; $mysql->select('order','',$where); $finds=$mysql->fetchAll(); print_r($finds); if(isset($finds[0]['status']) && $finds[0]['status']==0){ $data = array('status' => 3); $where = " id = ".$finds[0]['id']; $mysql->update('order',$data,$where); } } } //或者 /*$redis->psubscribe(array('__keyevent@'.$redis_db.'__:expired'), function ($redis, $pattern, $channel, $msg){ echo PHP_EOL; echo "Pattern: $pattern\n"; echo "Channel: $channel\n"; echo "Payload: $msg\n\n"; //................ });*/
#! /usr/bin/env php
This is necessary to execute the php script. Then, nohup executes psubscribe.php without suspending it. Note the &
[root@chokingwin HiGirl]# nohup ./psubscribe.php & [1] 4456 nohup: ignoring input and appending output to `nohup.out'
[root@chokingwin HiGirl]# cat nohup.out Pattern:__keyevent@0__:expired Channel: __keyevent@0__:expired Payload: name
解决方法:由于等待消息队列是一个长连接,而等待回调前有个数据库连接,数据库的wait_timeout=28800,所以只要下一条消息离上一条消息超过8小时,就会出现这个错误,把wait_timeout设置成10,并且捕获异常,发现真实的报错是 MySQL server has gone away ,
所以只要处理完所有业务逻辑后主动关闭数据库连接,即数据库连接主动close掉就可以解决问题
yii解决方法如下:
Yii::$app->db->close();
查看进程方法:
ps -aux|grep psubscribe.php a:显示所有程序 u:以用户为主的格式来显示 x:显示所有程序,不以终端机来区分
查看jobs进程ID:[ jobs -l ]命令
www@iZ232eoxo41Z:~/tinywan $ jobs -l [1]- 1365 Stopped (tty output) sudo nohup psubscribe.php > /dev/null 2>&1 [2]+ 1370 Stopped (tty output) sudo nohup psubscribe.php > /dev/null 2>&1
终止后台运行的进程方法:
kill -9 进程号
清空 nohup.out文件方法:
cat /dev/null > nohup.out
我们在使用nohup的时候,一般都和&配合使用,但是在实际使用过程中,很多人后台挂上程序就这样不管了,其实这样有可能在当前账户非正常退出或者结束的时候,命令还是自己结束了。
所以在使用nohup命令后台运行命令之后,我们需要做以下操作:
1.先回车,退出nohup的提示。
2.然后执行exit正常退出当前账户。
3.然后再去链接终端。使得程序后台正常运行。
我们应该每次都使用exit退出,而不应该每次在nohup执行成功后直接关闭终端。这样才能保证命令一直在后台运行。
The above is the detailed content of Use PHP+Redis to implement delayed tasks and automatically cancel orders (detailed tutorial). For more information, please follow other related articles on the PHP Chinese website!