Maison > cadre php > PensezPHP > Parlons de thinkphp6 utilisant think-queue pour implémenter des files d'attente ordinaires et des files d'attente retardées

Parlons de thinkphp6 utilisant think-queue pour implémenter des files d'attente ordinaires et des files d'attente retardées

WBOY
Libérer: 2022-04-20 20:56:12
avant
10316 Les gens l'ont consulté

Cet article vous apporte des connaissances pertinentes sur thinkphp, qui présente principalement le contenu pertinent sur l'utilisation de think-queue pour implémenter des files d'attente ordinaires et des files d'attente retardées. think-queue est un service de file d'attente de messages officiellement fourni par thinkphp. ensemble, j'espère que cela sera utile à tout le monde.

Parlons de thinkphp6 utilisant think-queue pour implémenter des files d'attente ordinaires et des files d'attente retardées

Apprentissage recommandé : "Tutoriel vidéo PHP"

###TP6 Queue

Think-queue peut être utilisé dans TP6 pour implémenter des files d'attente ordinaires et des files d'attente retardées.

think-queue est un service de file d'attente de messages officiellement fourni par thinkphp. Il prend en charge certaines fonctionnalités de base de la file d'attente de messages :

  • Publication, acquisition, exécution, suppression, renvoi, gestion des échecs, exécution retardée, contrôle des délais d'attente, etc.
  • Files d'attente multiples, limites de mémoire, démarrage, arrêt, garde, etc.
  • La file d'attente des messages peut être rétrogradée en exécution synchrone

Processus de mise en œuvre de la file d'attente des messages

1.

2 , le service de file d'attente de messages stocke les messages reçus dans la file d'attente redis (zset)

3 Le consommateur surveille la file d'attente Lorsqu'il y a un nouveau message dans la file d'attente, il obtient le premier de la file d'attente

4. . Traite le message obtenu Le message appelle la classe affaires pour traiter les affaires associées

5 Après le traitement commercial, le message doit être supprimé de la file d'attente

installer think-queue
composer require topthink/think-queue
Copier après la connexion

Fichier de configuration

Après l'installation. think-queue, il sera généré dans le répertoire de configuration queue.php, ce fichier est le fichier de configuration de la file d'attente.

tp6 fournit une variété de méthodes d'implémentation de file d'attente de messages. Par défaut, la synchronisation est utilisée ici.

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
Copier après la connexion

Créez des répertoires et des fichiers de consommation de file d'attente

Créez le répertoire de file d'attente dans le répertoire de l'application, puis créez un nouveau fichier de classe abstraite Queue.php dans ce répertoire en tant que classe de base

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}
Copier après la connexion
Toutes les classes de consommateurs réels héritent de la classe de base Classe d'abstraction

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}
Copier après la connexion

Logique du producteur
use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);
Copier après la connexion

Démarrez la tâche de surveillance du processus et exécutez
php think queue:listen
php think queue:work
Copier après la connexion

Introduction au mode commande

Mode commande

  • file d'attente: commande de travail

    commande de travail : Cette commande démarrera un travail processus pour gérer la file d’attente des messages.

    php think queue:work --queue TestQueue
    Copier après la connexion
  • queue:listen command

    listen command : Cette commande créera un processus parent d'écoute, qui sera ensuite créé par le processus parent via proc_open('php think queue:work') Un processus enfant de travail gère la file d'attente des messages et limite le temps d'exécution du processus de travail.

    php think queue:listen --queue TestQueue
    Copier après la connexion
    proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。
    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
    Copier après la connexion

命令行参数

  • Work 模式

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位
    Copier après la connexion
  • Listen 模式

    php think queue:work
    Copier après la connexion

    可以看到 listen 模式下,不包含 --deamon

  • Paramètres de ligne de commande
    • Mode travail
    • php think queue:restart
      Copier après la connexion

    • Mode écoute
    • php think queue:restart 
      php think queue:work
      Copier après la connexion

      Vous pouvez voir qu'en mode écoute, le paramètre --deamon n'est pas inclus. sera expliqué ci-dessous

    • Démarrer, arrêter et redémarrer les files d'attente de messages

  • Démarrer une file d'attente de messages :
rrreee

Arrêter toutes les files d'attente de messages :
rrreee

🎜🎜Redémarrer toutes les files d'attente de messages : 🎜🎜 🎜Apprentissage recommandé : "🎜Tutoriel vidéo PHP🎜"🎜🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:csdn.net
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal