Utilisez PHP+Redis pour implémenter des tâches retardées et annuler automatiquement les commandes (tutoriel détaillé)

PHPz
Libérer: 2023-04-07 20:22:01
avant
3006 Les gens l'ont consulté

Solution de tâche planifiée simple : utilisez les notifications keyspace de redis (événement de notification après échec de clé). Veuillez noter que cette fonctionnalité a été lancée après la version 2.8 de redis, les reids sur votre serveur doivent donc être au moins de version 2.8 ou supérieure ; >

(A) Scénario métier :

1 Lorsqu'une entreprise est déclenchée, il est nécessaire de démarrer une tâche planifiée, puis d'exécuter une autre tâche dans le délai spécifié (tel que comme l'annulation automatique de la commande, la réalisation automatique des commandes et d'autres fonctions)

2. Les notifications d'espace de clé de Redis enverront un événement après l'expiration de la clé, et le client qui écoute cet événement peut recevoir la notification

(B) Préparation du service :

1. Modifier le fichier de configuration reids (redis.conf) [le fichier de configuration du système de fenêtre est : redis.windows.conf]

redis n'activera pas les notifications d'espace de clé par défaut, car après avoir été allumé, il consommera le CPU

Remarques : E : événement keyevent, l'événement est publié avec __keyevent@__ comme préfixe ;

x : Événement expiré, lorsqu'un certain Cet événement se produira lorsque la clé expirera et sera supprimée

La configuration d'origine est :

notify-keyspace-events ""
Copier après la connexion

La configuration modifiée est la suivante :

notify-keyspace-events "Ex"
Copier après la connexion

Après avoir enregistré la configuration, redémarrez le service Redis pour que la configuration prenne effet

[root@chokingwin etc]#
service redis-server restart /usr/local/redis/etc/redis.conf 
Stopping redis-server: [ OK ] 
Starting redis-server: [ OK ]
Copier après la connexion

Redémarrez Redis dans le système de fenêtres, passez d'abord au répertoire de fichiers Redis, puis fermez le service Redis (redis-server --service-stop), puis ouvrez (redis-server --service-start)

C) Code de fichier :

phpredis implémente abonnement à la notification Keyspace, qui peut automatiquement annuler les commandes et compléter automatiquement les commandes. Ce qui suit est un exemple de test

Créez 4 fichiers, puis modifiez vous-même les paramètres de configuration de la base de données et de Redis

db.class.php

<?phpclass mysql{    private $mysqli;    private $result;    /**
     * 数据库连接
     * @param $config 配置数组     */

    public function connect()
    {        $config=array(            &#39;host&#39;=>&#39;127.0.0.1&#39;,
            &#39;username&#39;=>&#39;root&#39;,
            &#39;password&#39;=>&#39;168168&#39;,
            &#39;database&#39;=>&#39;test&#39;,
            &#39;port&#39;=>3306,
        );        $host = $config[&#39;host&#39;];    //主机地址
        $username = $config[&#39;username&#39;];//用户名
        $password = $config[&#39;password&#39;];//密码
        $database = $config[&#39;database&#39;];//数据库
        $port = $config[&#39;port&#39;];    //端口号
        $this->mysqli = new mysqli($host, $username, $password, $database, $port);

    }    /**
     * 数据查询
     * @param $table 数据表
     * @param null $field 字段
     * @param null $where 条件
     * @return mixed 查询结果数目     */
    public function select($table, $field = null, $where = null)
    {        $sql = "SELECT * FROM `{$table}`";        //echo $sql;exit;
        if (!empty($field)) {            $field = &#39;`&#39; . implode(&#39;`,`&#39;, $field) . &#39;`&#39;;            $sql = str_replace(&#39;*&#39;, $field, $sql);
        }        if (!empty($where)) {            $sql = $sql . &#39; WHERE &#39; . $where;
        }        $this->result = $this->mysqli->query($sql);        return $this->result;
    }    /**
     * @return mixed 获取全部结果     */
    public function fetchAll()
    {        return $this->result->fetch_all(MYSQLI_ASSOC);
    }    /**
     * 插入数据
     * @param $table 数据表
     * @param $data 数据数组
     * @return mixed 插入ID     */
    public function insert($table, $data)
    {        foreach ($data as $key => $value) {            $data[$key] = $this->mysqli->real_escape_string($value);
        }        $keys = &#39;`&#39; . implode(&#39;`,`&#39;, array_keys($data)) . &#39;`&#39;;        $values = &#39;\&#39;&#39; . implode("&#39;,&#39;", array_values($data)) . &#39;\&#39;&#39;;        $sql = "INSERT INTO `{$table}`( {$keys} )VALUES( {$values} )";        $this->mysqli->query($sql);        return $this->mysqli->insert_id;
    }    /**
     * 更新数据
     * @param $table 数据表
     * @param $data 数据数组
     * @param $where 过滤条件
     * @return mixed 受影响记录     */
    public function update($table, $data, $where)
    {        foreach ($data as $key => $value) {            $data[$key] = $this->mysqli->real_escape_string($value);
        }        $sets = array();        foreach ($data as $key => $value) {            $kstr = &#39;`&#39; . $key . &#39;`&#39;;            $vstr = &#39;\&#39;&#39; . $value . &#39;\&#39;&#39;;            array_push($sets, $kstr . &#39;=&#39; . $vstr);
        }        $kav = implode(&#39;,&#39;, $sets);        $sql = "UPDATE `{$table}` SET {$kav} WHERE {$where}";        $this->mysqli->query($sql);        return $this->mysqli->affected_rows;
    }    /**
     * 删除数据
     * @param $table 数据表
     * @param $where 过滤条件
     * @return mixed 受影响记录     */
    public function delete($table, $where)
    {        $sql = "DELETE FROM `{$table}` WHERE {$where}";        $this->mysqli->query($sql);        return $this->mysqli->affected_rows;
    }
}
Copier après la connexion

index.php

<?php

require_once &#39;Redis2.class.php&#39;;

$redis = new \Redis2(&#39;127.0.0.1&#39;,&#39;6379&#39;,&#39;&#39;,&#39;15&#39;);
$order_sn   = &#39;SN&#39;.time().&#39;T&#39;.rand(10000000,99999999);

$use_mysql = 1;         //是否使用数据库,1使用,2不使用
if($use_mysql == 1){
   /*
    *   //数据表
    *   CREATE TABLE `order` (
    *      `ordersn` varchar(255) NOT NULL DEFAULT &#39;&#39;,
    *      `status` varchar(255) NOT NULL DEFAULT &#39;&#39;,
    *      `createtime` varchar(255) NOT NULL DEFAULT &#39;&#39;,
    *      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
    *       PRIMARY KEY (`id`)
    *   ) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=utf8mb4;
   */
    require_once &#39;db.class.php&#39;;
    $mysql      = new \mysql();
    $mysql->connect();
    $data       = [&#39;ordersn&#39;=>$order_sn,&#39;status&#39;=>0,&#39;createtime&#39;=>date(&#39;Y-m-d H:i:s&#39;,time())];
    $mysql->insert(&#39;order&#39;,$data);
}

$list = [$order_sn,$use_mysql];
$key = implode(&#39;:&#39;,$list);

$redis->setex($key,3,&#39;redis延迟任务&#39;);      //3秒后回调



$test_del = false;      //测试删除缓存后是否会有过期回调。结果:没有回调
if($test_del == true){
    //sleep(1);
    $redis->delete($order_sn);
}

echo $order_sn;



/*
 *   测试其他key会不会有回调,结果:有回调
 *   $k = &#39;test&#39;;
 *   $redis2->set($k,&#39;100&#39;);
 *   $redis2->expire($k,10);
 *
*/
Copier après la connexion

psubscribe.php

<?php
ini_set(&#39;default_socket_timeout&#39;, -1);  //不超时
require_once &#39;Redis2.class.php&#39;;
$redis_db = &#39;15&#39;;
$redis = new \Redis2(&#39;127.0.0.1&#39;,&#39;6379&#39;,&#39;&#39;,$redis_db);
// 解决Redis客户端订阅时候超时情况
$redis->setOption();
//当key过期的时候就看到通知,订阅的key __keyevent@<db>__:expired 这个格式是固定的,db代表的是数据库的编号,由于订阅开启之后这个库的所有key过期时间都会被推送过来,所以最好单独使用一个数据库来进行隔离
$redis->psubscribe(array(&#39;__keyevent@&#39;.$redis_db.&#39;__:expired&#39;), &#39;keyCallback&#39;);
// 回调函数,这里写处理逻辑
function keyCallback($redis, $pattern, $channel, $msg)
{
    echo PHP_EOL;
    echo "Pattern: $pattern\n";
    echo "Channel: $channel\n";
    echo "Payload: $msg\n\n";
    $list = explode(&#39;:&#39;,$msg);

    $order_sn = isset($list[0])?$list[0]:&#39;0&#39;;
    $use_mysql = isset($list[1])?$list[1]:&#39;0&#39;;

    if($use_mysql == 1){
        require_once &#39;db.class.php&#39;;
        $mysql = new \mysql();
        $mysql->connect();
        $where = "ordersn = &#39;".$order_sn."&#39;";
        $mysql->select(&#39;order&#39;,&#39;&#39;,$where);
        $finds=$mysql->fetchAll();
        print_r($finds);
        if(isset($finds[0][&#39;status&#39;]) && $finds[0][&#39;status&#39;]==0){
            $data   = array(&#39;status&#39; => 3);
            $where  = " id = ".$finds[0][&#39;id&#39;];
            $mysql->update(&#39;order&#39;,$data,$where);
        }
    }

}


//或者
/*$redis->psubscribe(array(&#39;__keyevent@&#39;.$redis_db.&#39;__:expired&#39;), function ($redis, $pattern, $channel, $msg){
    echo PHP_EOL;
    echo "Pattern: $pattern\n";
    echo "Channel: $channel\n";
    echo "Payload: $msg\n\n";
    //................
});*/
Copier après la connexion

Redis2.class.php

<?php

class Redis2
{
    private $redis;

    public function __construct($host = &#39;127.0.0.1&#39;, $port = &#39;6379&#39;,$password = &#39;&#39;,$db = &#39;15&#39;)
    {
        $this->redis = new Redis();
        $this->redis->connect($host, $port);    //连接Redis
        $this->redis->auth($password);      //密码验证
        $this->redis->select($db);    //选择数据库
    }

    public function setex($key, $time, $val)
    {
        return $this->redis->setex($key, $time, $val);
    }

    public function set($key, $val)
    {
        return $this->redis->set($key, $val);
    }

    public function get($key)
    {
        return $this->redis->get($key);
    }

    public function expire($key = null, $time = 0)
    {
        return $this->redis->expire($key, $time);
    }

    public function psubscribe($patterns = array(), $callback)
    {
        $this->redis->psubscribe($patterns, $callback);
    }

    public function setOption()
    {
        $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
    }

    public function lRange($key,$start,$end)
    {
        return $this->redis->lRange($key,$start,$end);
    }

    public function lPush($key, $value1, $value2 = null, $valueN = null ){
        return $this->redis->lPush($key, $value1, $value2 = null, $valueN = null );
    }

    public function delete($key1, $key2 = null, $key3 = null)
    {
        return $this->redis->delete($key1, $key2 = null, $key3 = null);
    }

}
Copier après la connexion

Méthode de test du système Windows : exécutez d'abord psubscribe.php dans le Interface de commande cmd, puis la page Web s'ouvre index.php.

Activer l'exécution de la surveillance en arrière-plan (abonnement)

Il y a un problème pour réaliser cette étape. En utilisant l'extension PHPREDIS, la surveillance des clés expirées a été implémentée avec succès dans le code et réalisée dans. psCallback() Gestion des rappels. Les deux exigences évoquées au début ont été remplies. Mais il y a un problème ici : une fois que Redis a effectué l'opération d'abonnement, le terminal entre dans l'état de blocage et doit y rester tout le temps. Et ce script d'abonnement doit être exécuté manuellement sur la ligne de commande, ce qui ne répond pas aux besoins réels.

En fait, notre exigence concernant le rappel d'écoute d'expiration est que nous voulons qu'il s'exécute en arrière-plan comme un processus démon et déclenche la fonction de rappel lorsqu'il y a un message d'événement d'expiration. Faites en sorte que l'auditeur fonctionne toujours en arrière-plan. J'espère qu'il sera en arrière-plan comme un démon

C'est ainsi que je l'ai implémenté.

Il existe une commande nohup sous Linux. La fonction est d'exécuter des commandes sans raccrocher. En même temps, nohup place toute la sortie du programme de script dans le fichier nohup.out dans le répertoire courant. Si le fichier n'est pas accessible en écriture, il sera placé dans le fichier /nohup.out. Ainsi, avec cette commande, notre script php peut toujours s'exécuter, que notre fenêtre de terminal soit fermée ou non.

Écrire le fichier psubscribe.php :

<?php
#! /usr/bin/env php
ini_set(&#39;default_socket_timeout&#39;, -1);  //不超时
require_once &#39;Redis2.class.php&#39;;
$redis_db = &#39;15&#39;;
$redis = new \Redis2(&#39;127.0.0.1&#39;,&#39;6379&#39;,&#39;&#39;,$redis_db);
// 解决Redis客户端订阅时候超时情况
$redis->setOption();
//当key过期的时候就看到通知,订阅的key __keyevent@<db>__:expired 这个格式是固定的,db代表的是数据库的编号,由于订阅开启之后这个库的所有key过期时间都会被推送过来,所以最好单独使用一个数据库来进行隔离
$redis->psubscribe(array(&#39;__keyevent@&#39;.$redis_db.&#39;__:expired&#39;), &#39;keyCallback&#39;);
// 回调函数,这里写处理逻辑
function keyCallback($redis, $pattern, $channel, $msg)
{
    echo PHP_EOL;
    echo "Pattern: $pattern\n";
    echo "Channel: $channel\n";
    echo "Payload: $msg\n\n";
    $list = explode(&#39;:&#39;,$msg);

    $order_sn = isset($list[0])?$list[0]:&#39;0&#39;;
    $use_mysql = isset($list[1])?$list[1]:&#39;0&#39;;

    if($use_mysql == 1){
        require_once &#39;db.class.php&#39;;
        $mysql = new \mysql();
        $mysql->connect();
        $where = "ordersn = &#39;".$order_sn."&#39;";
        $mysql->select(&#39;order&#39;,&#39;&#39;,$where);
        $finds=$mysql->fetchAll();
        print_r($finds);
        if(isset($finds[0][&#39;status&#39;]) && $finds[0][&#39;status&#39;]==0){
            $data   = array(&#39;status&#39; => 3);
            $where  = " id = ".$finds[0][&#39;id&#39;];
            $mysql->update(&#39;order&#39;,$data,$where);
        }
    }

}


//或者
/*$redis->psubscribe(array(&#39;__keyevent@&#39;.$redis_db.&#39;__:expired&#39;), function ($redis, $pattern, $channel, $msg){
    echo PHP_EOL;
    echo "Pattern: $pattern\n";
    echo "Channel: $channel\n";
    echo "Payload: $msg\n\n";
    //................
});*/
Copier après la connexion

Remarque : Au début, on déclare le chemin du compilateur php :

#! /usr/bin/env php
Copier après la connexion

Ceci est nécessaire pour exécuter des scripts php.

Ensuite, nohup exécute psubscribe.php sans le suspendre. Notez le &

[root@chokingwin HiGirl]# nohup ./psubscribe.php & 
[1] 4456 nohup: ignoring input and appending output to `nohup.out&#39;
Copier après la connexion

à la fin : le script a bien commencé à s'exécuter sur le processus n°4456.

Vérifiez nohup.out cat nohuo.out pour voir s'il y a une sortie expirée :

[root@chokingwin HiGirl]# cat nohup.out 
Pattern:__keyevent@0__:expired 
Channel: __keyevent@0__:expired 
Payload: name
Copier après la connexion

Exécutez index.php, après 3 secondes, l'effet est comme ci-dessus et il réussit

Problème : utilisez le mode ligne de commande pour ouvrir le script de surveillance. Après un certain temps, une erreur est signalée : Erreur lors de l'envoi du paquet QUERY=xxx

.

解决方法:由于等待消息队列是一个长连接,而等待回调前有个数据库连接,数据库的wait_timeout=28800,所以只要下一条消息离上一条消息超过8小时,就会出现这个错误,把wait_timeout设置成10,并且捕获异常,发现真实的报错是 MySQL server has gone away ,
所以只要处理完所有业务逻辑后主动关闭数据库连接,即数据库连接主动close掉就可以解决问题

yii解决方法如下:

Yii::$app->db->close();
Copier après la connexion

查看进程方法:

 ps -aux|grep psubscribe.php

a:显示所有程序
u:以用户为主的格式来显示
x:显示所有程序,不以终端机来区分
Copier après la connexion

查看jobs进程ID:[ jobs -l ]命令

www@iZ232eoxo41Z:~/tinywan $ jobs -l
[1]-  1365 Stopped (tty output)    sudo nohup psubscribe.php > /dev/null 2>&1 
[2]+ 1370 Stopped (tty output) sudo nohup psubscribe.php > /dev/null 2>&1
Copier après la connexion

终止后台运行的进程方法:

kill -9  进程号
Copier après la connexion

清空 nohup.out文件方法:

cat /dev/null > nohup.out
Copier après la connexion

我们在使用nohup的时候,一般都和&配合使用,但是在实际使用过程中,很多人后台挂上程序就这样不管了,其实这样有可能在当前账户非正常退出或者结束的时候,命令还是自己结束了。

所以在使用nohup命令后台运行命令之后,我们需要做以下操作:

1.先回车,退出nohup的提示。

2.然后执行exit正常退出当前账户。
3.然后再去链接终端。使得程序后台正常运行。

我们应该每次都使用exit退出,而不应该每次在nohup执行成功后直接关闭终端。这样才能保证命令一直在后台运行。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:cnblogs.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal