Table of Contents
composer require topthink/think-queue
Copy after login
" >
composer require topthink/think-queue
Copy after login
Home PHP Framework ThinkPHP Let's talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues

Let's talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues

Apr 20, 2022 pm 01:07 PM
thinkphp

This article brings you relevant knowledge about thinkphp, which mainly introduces the relevant content about using think-queue to implement ordinary queues and delayed queues. think-queue is officially provided by thinkphp A message queue service, let’s take a look at it, I hope it will be helpful to everyone.

Let's talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues

Recommended learning: "PHP Video Tutorial"

TP6 Queue

TP6 Think-queue can be used to implement ordinary queues and delayed queues.

think-queue is a message queue service officially provided by thinkphp. It supports some basic features of the message queue:
  • Message publishing, acquisition, execution, deletion, and resending, Failure handling, delayed execution, timeout control, etc.
  • Multiple queues of queues, memory limits, start, stop, guard, etc.
  • The message queue can be downgraded to synchronous execution

Message queue implementation process

1. Push messages to the message queue service through the producer

2. The message queue service stores the received messages into the redis queue (zset)

3. The consumer monitors the queue. When it hears a new message in the queue, it obtains the first message in the queue.

4. Processes the obtained message and calls the business class for processing. Related business

5. After business processing, messages need to be deleted from the queue

composer Install think-queue

composer require topthink/think-queue
Copy after login

Configuration file

After installing think-queue, queue.php will be generated in the config directory. This file is the configuration file of the queue.

tp6 provides a variety of message queue implementation methods. By default, sync is used. I choose to use Redis here.

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];
Copy after login
Create directory and queue consumption files

Create the queue directory in the app directory, and then create a new abstract class Queue.php file in this directory as the basis Class

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 队列消费基础类
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息队列默认调用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf(&#39;[%s][%s] 队列无消息&#39;, __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 队列的数据库id或者redis key
        // $jobClassName = $job->getName(); // 队列对象类
        // $queueName = $job->getQueue(); // 队列名称

        // 如果已经执行中或者执行完成就不再执行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 执行业务处理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任务执行成功后删除
            Cache::store('redis')->delete($jobId); // 删除redis中的缓存
        } else {
            // 检查任务重试次数
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));
                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行
                //$job->release(10); 
                // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                //$job->failed();   
                // 第3种处理方式:删除任务
                $job->delete(); // 任务执行后删除
                Cache::store('redis')->delete($jobId); // 删除redis中的缓存
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任务执行的结果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查询redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根据消息中的数据进行实际的业务处理
     * @param $data 数据
     * @return bool 返回结果
     */
    abstract protected function execute($data): bool;}
Copy after login
All real consumer classes inherit the basic abstract class

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具体消费业务逻辑
    }}
Copy after login
Producer logic

use think\facade\Queue;

// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);
Copy after login
Start the process monitoring task and execute it

php think queue:listen
php think queue:work
Copy after login

Command mode introduction

Command mode
  • queue:work command

    work command: This command will start a work process to handle the message queue.
php think queue:work --queue TestQueue
Copy after login
  • queue:listen command

    listen command: This command will create a listen parent process, and then the parent process will pass proc_open('php think queue :work')

    to create a work sub-process to process the message queue and limit the execution time of the work process.
  • php think queue:listen --queue TestQueue
    Copy after login

    Command line parameters
    • Work mode
    php think queue:work \
    --daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
    --queue  helloJobQueue  //要处理的队列的名称
    --delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明
    --memory 128 \      //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
    Copy after login
  • Listen mode

    php think queue:listen \
    --queue  helloJobQueue \   //监听的队列的名称
    --delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
    --memory 128 \       //该进程允许使用的内存上限,以 M 为单位
    --sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
    --tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
    --timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位
    Copy after login
    You can see that in listen mode, the --deamon parameter is not included. The reason will be explained below
  • Start, stop and restart the message queue
    • Start a message queue:
    php think queue:work
    Copy after login
  • Stop all message queues:
  • php think queue:restart
    Copy after login
  • Restart all message queues :
  • php think queue:restart 
    php think queue:work
    Copy after login

    Recommended learning: "PHP Video Tutorial
    "

    ###

    The above is the detailed content of Let's talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues. For more information, please follow other related articles on the PHP Chinese website!

    Statement of this Website
    The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

    Hot AI Tools

    Undresser.AI Undress

    Undresser.AI Undress

    AI-powered app for creating realistic nude photos

    AI Clothes Remover

    AI Clothes Remover

    Online AI tool for removing clothes from photos.

    Undress AI Tool

    Undress AI Tool

    Undress images for free

    Clothoff.io

    Clothoff.io

    AI clothes remover

    AI Hentai Generator

    AI Hentai Generator

    Generate AI Hentai for free.

    Hot Tools

    Notepad++7.3.1

    Notepad++7.3.1

    Easy-to-use and free code editor

    SublimeText3 Chinese version

    SublimeText3 Chinese version

    Chinese version, very easy to use

    Zend Studio 13.0.1

    Zend Studio 13.0.1

    Powerful PHP integrated development environment

    Dreamweaver CS6

    Dreamweaver CS6

    Visual web development tools

    SublimeText3 Mac version

    SublimeText3 Mac version

    God-level code editing software (SublimeText3)

    How to run thinkphp project How to run thinkphp project Apr 09, 2024 pm 05:33 PM

    To run the ThinkPHP project, you need to: install Composer; use Composer to create the project; enter the project directory and execute php bin/console serve; visit http://localhost:8000 to view the welcome page.

    There are several versions of thinkphp There are several versions of thinkphp Apr 09, 2024 pm 06:09 PM

    ThinkPHP has multiple versions designed for different PHP versions. Major versions include 3.2, 5.0, 5.1, and 6.0, while minor versions are used to fix bugs and provide new features. The latest stable version is ThinkPHP 6.0.16. When choosing a version, consider the PHP version, feature requirements, and community support. It is recommended to use the latest stable version for best performance and support.

    How to run thinkphp How to run thinkphp Apr 09, 2024 pm 05:39 PM

    Steps to run ThinkPHP Framework locally: Download and unzip ThinkPHP Framework to a local directory. Create a virtual host (optional) pointing to the ThinkPHP root directory. Configure database connection parameters. Start the web server. Initialize the ThinkPHP application. Access the ThinkPHP application URL and run it.

    Development suggestions: How to use the ThinkPHP framework to implement asynchronous tasks Development suggestions: How to use the ThinkPHP framework to implement asynchronous tasks Nov 22, 2023 pm 12:01 PM

    "Development Suggestions: How to Use the ThinkPHP Framework to Implement Asynchronous Tasks" With the rapid development of Internet technology, Web applications have increasingly higher requirements for handling a large number of concurrent requests and complex business logic. In order to improve system performance and user experience, developers often consider using asynchronous tasks to perform some time-consuming operations, such as sending emails, processing file uploads, generating reports, etc. In the field of PHP, the ThinkPHP framework, as a popular development framework, provides some convenient ways to implement asynchronous tasks.

    Which one is better, laravel or thinkphp? Which one is better, laravel or thinkphp? Apr 09, 2024 pm 03:18 PM

    Performance comparison of Laravel and ThinkPHP frameworks: ThinkPHP generally performs better than Laravel, focusing on optimization and caching. Laravel performs well, but for complex applications, ThinkPHP may be a better fit.

    How to install thinkphp How to install thinkphp Apr 09, 2024 pm 05:42 PM

    ThinkPHP installation steps: Prepare PHP, Composer, and MySQL environments. Create projects using Composer. Install the ThinkPHP framework and dependencies. Configure database connection. Generate application code. Launch the application and visit http://localhost:8000.

    How is the performance of thinkphp? How is the performance of thinkphp? Apr 09, 2024 pm 05:24 PM

    ThinkPHP is a high-performance PHP framework with advantages such as caching mechanism, code optimization, parallel processing and database optimization. Official performance tests show that it can handle more than 10,000 requests per second and is widely used in large-scale websites and enterprise systems such as JD.com and Ctrip in actual applications.

    Development suggestions: How to use the ThinkPHP framework for API development Development suggestions: How to use the ThinkPHP framework for API development Nov 22, 2023 pm 05:18 PM

    Development suggestions: How to use the ThinkPHP framework for API development. With the continuous development of the Internet, the importance of API (Application Programming Interface) has become increasingly prominent. API is a bridge for communication between different applications. It can realize data sharing, function calling and other operations, and provides developers with a relatively simple and fast development method. As an excellent PHP development framework, the ThinkPHP framework is efficient, scalable and easy to use.

    See all articles