Utilisez Redis dans Laravel pour traiter les tâches de file d'attente. Les fonctions fournies par le framework sont très puissantes, mais récemment j'ai rencontré un problème, c'est-à-dire qu'une tâche a été exécutée plusieurs fois. .Pourquoi est-ce ?
Parlons d'abord de la raison : car dans Laravel, si le temps d'exécution d'une file d'attente (tâche) est supérieur à 60 secondes, elle sera considérée comme ayant échoué et sera ré-ajoutée à la file d'attente, ce qui entraînera l’exécution répétée de la même tâche.
La logique de cette tâche est de transmettre le contenu aux utilisateurs. Les utilisateurs doivent être récupérés et parcourus en fonction du contenu de la file d'attente, et envoyés via l'interface HTTP du backend de requête. Par exemple, s'il y a 10 000 utilisateurs, si le nombre d'utilisateurs est important ou si la vitesse de traitement de l'interface n'est pas si rapide, le temps d'exécution sera certainement supérieur à 60 secondes, donc la tâche sera réajoutée à la file d'attente. La situation est encore pire. Si les tâches précédentes ne sont pas exécutées dans les 60 secondes, elles seront réajoutées à la file d'attente, de sorte que la même tâche sera exécutée non seulement une, mais plusieurs fois.
Trouvons le coupable à partir du code source de Laravel.
Fichier de code source : supplier/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/** * The expiration time of a job. * * @var int|null */ protected $expire = 60;
La variable membre $expire est une valeur fixe Laravel pense qu'une file d'attente n'a pas d'importance. comment 60 Cela devrait être terminé en quelques secondes. Comment obtenir la file d'attente :
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.':delayed', $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.':reserved', $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }
Il y a plusieurs étapes pour obtenir la file d'attente. Parce que l'exécution de la file d'attente échoue, ou que l'exécution expire, etc., elle sera placée dans une autre collection et enregistrée pour une nouvelle tentative. Le processus est le suivant :
1. Repoussez la file d'attente qui a échoué en raison de l'exécution de la collecte retardée vers la file d'attente actuellement exécutée.
2. Repoussez la file d'attente en raison d'un délai d'exécution de la collection réservée vers la file d'attente actuellement exécutée.
3. Retirez ensuite la tâche de la file d'attente et commencez à l'exécuter, puis placez la file d'attente dans la collection ordonnée réservée.
La commande eval est utilisée pour exécuter ce processus, et plusieurs scripts Lua sont utilisés.
Récupérez la tâche de la file d'attente à exécuter :
local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}
Vous pouvez voir que lorsque Laravel récupère la file d'attente à exécuter par Redis, il en mettra également une copie dans un fichier ordonné set et utilisez l’horodatage d’expiration comme score.
Ce n'est que lorsque la tâche est terminée que la tâche sera supprimée de l'ensemble commandé. Le code permettant de supprimer la file d'attente de cette collection ordonnée est omis. Voyons comment Laravel gère les files d'attente dont le temps d'exécution est supérieur à 60 secondes.
C'est ce que fait ce script Lua :
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return true
Ici, zrangebyscore trouve les éléments dont les scores vont de l'infinitésimal à l'horodatage actuel, c'est-à-dire les tâches ajoutées à la collection il y a 60 secondes. les éléments sont ensuite supprimés de l'ensemble via zremrangebyrank et repoussés vers la file d'attente.
Vous devriez avoir une prise de conscience soudaine après avoir vu cela.
Si une file d'attente n'est pas exécutée dans les 60 secondes, le processus repoussera à nouveau les tâches de l'ensemble réservé vers la file d'attente lors de la récupération de la file d'attente.