This article brings you relevant knowledge about thinkphp, which mainly introduces the relevant content about using think-queue to implement ordinary queues and delayed queues. think-queue is officially provided by thinkphp A message queue service, let’s take a look at it, I hope it will be helpful to everyone.

Recommended learning: "PHP Video Tutorial"
TP6 Queue
TP6 Think-queue can be used to implement ordinary queues and delayed queues.
think-queue is a message queue service officially provided by thinkphp. It supports some basic features of the message queue:
- Message publishing, acquisition, execution, deletion, and resending, Failure handling, delayed execution, timeout control, etc.
- Multiple queues of queues, memory limits, start, stop, guard, etc.
- The message queue can be downgraded to synchronous execution
Message queue implementation process
1. Push messages to the message queue service through the producer
2. The message queue service stores the received messages into the redis queue (zset)
3. The consumer monitors the queue. When it hears a new message in the queue, it obtains the first message in the queue.
4. Processes the obtained message and calls the business class for processing. Related business
5. After business processing, messages need to be deleted from the queue
composer Install think-queue
1 | composer require topthink/think-queue
|
Copy after login
Configuration file
After installing think-queue, queue.php will be generated in the config directory. This file is the configuration file of the queue.
tp6 provides a variety of message queue implementation methods. By default, sync is used. I choose to use Redis here.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | return [
'default' => 'redis' ,
'connections' => [
'sync' => [
'type' => 'sync' ,
],
'database' => [
'type' => 'database' ,
'queue' => 'default' ,
'table' => 'jobs' ,
'connection' => null,
],
'redis' => [
'type' => 'redis' ,
'queue' => 'default' ,
'host' => env( 'redis.host' , '127.0.0.1' ),
'port' => env( 'redis.port' , '6379' ),
'password' => env( 'redis.password' , '123456' ),
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none' ,
'table' => 'failed_jobs' ,
],
];
|
Copy after login
Create directory and queue consumption files
Create the queue directory in the app directory, and then create a new abstract class Queue.php file in this directory as the basis Class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | <?phpnamespace app\queue; use think\facade\Cache; use think\queue\Job; use think\facade\Log;
abstract class Queue{
public function fire(Job $job , $data )
{
if ( empty ( $data )) {
Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__ , __FUNCTION__ ));
return ;
}
$jobId = $job ->getJobId();
if (! $this ->checkJob( $jobId , $data )) {
$job -> delete ();
Cache::store( 'redis' )-> delete ( $jobId );
return ;
}
if ( $this ->execute( $data )) {
Log::record(sprintf( '[%s][%s] 队列执行成功' , __CLASS__ , __FUNCTION__ ));
$job -> delete ();
Cache::store( 'redis' )-> delete ( $jobId );
} else {
if ( $job ->attempts() > 3) {
Log::error(sprintf( '[%s][%s] 队列执行重试次数超过3次,执行失败' , __CLASS__ , __FUNCTION__ ));
$job -> delete ();
Cache::store( 'redis' )-> delete ( $jobId );
}
}
}
protected function checkJob(string $jobId , $message ): bool
{
$data = Cache::store( 'redis' )->get( $jobId );
if (! empty ( $data )) {
return false;
}
Cache::store( 'redis' )->set( $jobId , $message );
return true;
}
abstract protected function execute( $data ): bool;}
|
Copy after login
All real consumer classes inherit the basic abstract class
1 2 3 4 5 | <?phpnamespace app\queue\test; use app\queue\Queue; class Test extends Queue{
protected function execute( $data ): bool
{
}}
|
Copy after login
Producer logic
1 2 3 4 5 6 7 8 9 10 11 | use think\facade\Queue;
Queue::push( $job , $data , $queueName );
Queue::push(Test:: class , $data , $queueName );
Queue::later( $delay , $job , $data , $queueName );
Queue::later(10 , Test:: class , $data , $queueName );
|
Copy after login
Start the process monitoring task and execute it
1 2 | php think queue:listen
php think queue:work
|
Copy after login
Command mode introduction
Command mode
-
queue:work commandwork command: This command will start a work process to handle the message queue.
1 | php think queue:work --queue TestQueue
|
Copy after login
queue:listen commandlisten command: This command will create a listen parent process, and then the parent process will pass
proc_open('php think queue :work')
to create a work sub-process to process the message queue and limit the execution time of the work process.
1 | php think queue:listen --queue TestQueue
|
Copy after login
Command line parameters
1 2 3 4 5 6 7 8 | php think queue:work \
--daemon
--queue helloJobQueue
--delay 0 \
--force \
--memory 128 \
--sleep 3 \
--tries 2
|
Copy after login
Listen mode
1 2 3 4 5 6 7 | php think queue:listen \
--queue helloJobQueue \
--delay 0 \
--memory 128 \
--sleep 3 \
--tries 0 \
--timeout 60
|
Copy after login
You can see that in listen mode, the
--deamon parameter is not included. The reason will be explained below
Start, stop and restart the message queue
Stop all message queues:
Restart all message queues :
1 2 | php think queue:restart
php think queue:work
|
Copy after login
Recommended learning: "PHP Video Tutorial
"
###
The above is the detailed content of Let's talk about thinkphp6 using think-queue to implement ordinary queues and delayed queues. For more information, please follow other related articles on the PHP Chinese website!