This article brings you relevant knowledge about thinkphp, which mainly organizes the related issues of using think-queue to implement redis message queue. Let’s take a look at it together. I hope it will be helpful to everyone. .
Recommended study: "PHP Video Tutorial"
In the middle of the message queue Software is an important component in large-scale systems and has gradually become the core means of internal communication in enterprise systems. It has a series of functions such as loose coupling, asynchronous messaging, traffic peak shaving, reliable delivery, broadcast, flow control, and eventual consistency, and has become one of the main means of asynchronous RPC.
The message queue has two roles and a container. The roles are producer (responsible for publishing tasks) and consumers (responsible for executing tasks). The container is used to store / Stacks tasks released by producers, separating the two steps of release and execution without affecting each other.
Producers publish tasks and store/accumulate them in the message queue. Consumers take the initiative to take out tasks from the message queue and execute them. Those published first are executed first ( Queue: first in, first out), when there is no consumer, tasks will accumulate in the queue waiting to be taken out for execution.
Message queue is suitable for third-party interfaces with large concurrency or long processing time and requiring batch operations. It can be used for but not limited to text message sending, email sending, APP push, etc., and supports cross- System, that is, the message queue published by this system can perform tasks by itself or for other systems. In the same way, this system can also act as a consumer to perform message queue tasks published by itself or other systems.
ThinkPHP’s Queue has four built-in drivers: Redis, Database, Topthink, and Sync. Redis is used here and is also recommended. Redis
think-queue queue messages can perform tasks such as publishing, obtaining, executing, deleting, re-publishing, delayed publishing, timeout control and other operations
Create the queue.php configuration file in the extra directory
<?phpreturn [ 'connector' => 'Redis', 'expire' => null, // 任务过期时间,默认为60秒,若要禁用,则设置为 null 'default' => 'REDIS_QUEUE', // 默认的队列名 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 0, // 使用哪里一个 db,默认为 db0 'timeout' => 0, // redis 连接的超时时间 'persistent' => false, // 是否是长连接];
As for why it is placed here, it is because the Queue source code reads the queue file from extra by default to obtain the configuration information. If you want to place the configuration file elsewhere, then It is necessary to modify the default acquisition configuration in the source code accordingly, as shown in the figure below
Create a test class and write the producer method
<?phpnamespace app\api\controller;use think\Controller;use think\Queue;class Test extends Controller{ // 生产者,添加消息队列 public function addQueue() { // 参数 $data = [ 'id' => rand(0, 99), 'userName' => '一起摸鱼' ]; // 消息队列名 $queueName = 'testQueue'; // 推入消息队列,注意这里的 ::class 是PHP5.5才有的写法 $isPushed = Queue::push(TestQueue::class, $data, $queueName); // PHP5.5以下的可以直接写命名空间 // $isPushed = Queue::push('app\common\queue\TestQueue', $data, $queueName); if ($isPushed !== false) { // 成功之后的业务 echo '队列加入成功'; } else { // 失败之后的业务 echo '队列加入失败'; } }}
Create a TestQueue class, used as a consumer, to perform tasks in the message queue
<?phpnamespace app\common\queue;use think\Log;use think\queue\Job;class TestQueue{ // 消费者执行入口 public function fire(Job $job, $data) { // 具体执行业务 $isJobDone = $this->doJob($data); if ($isJobDone) { // 消息队列执行成功,删除队列,否则会一直执行 $job->delete(); } else { // 消息队列执行失败 // 获取消息队列已经重试了几遍 $attempts = $job->attempts(); if ($attempts == 0 || $attempts == 1) { // 重新发布,参数 delay 是延时发布的时间 $job->release(2); } } } // 消息队列执行失败后会自动执行该方法 public function failed($data) { Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data)); } // 消息队列执行方法 public function doJob($data) { // 具体执行业务 $data = json_encode($data); echo '消息队列:' . $data; // 这里的判断条件以具体业务是否执行成功进行判断 if ($data) { echo "执行成功"; return true; } else { echo "执行失败"; return false; } }}
Request interface, producer Publish task
redis Queue storage task
The next step is to enable the listening mode of the queue, because It is impossible to manually execute the queue every time a task is added. There are two listening modes for the queue. The configuration parameters are as follows:
Execute in the project root directory
php think queue:work --queue queue name
Open the consumer and execute the task
The tasks in the redis queue are also deleted after execution
But due to need, we also need to suspend the execution of the consumer daemon process to ensure that the queue can still be started after closing the terminal.
nohup php think queue:listen --queue Queue name&
PS: Enter in shell exit to exit the terminal
PS: Enter exit in the shell to exit the terminal
PS: Enter exit in the shell Exit the terminal
Because when nohup is successfully executed and you click the close program button to close the terminal, the session corresponding to the command will be disconnected, causing the process corresponding to nohup to be notified that it needs to be shut down together.
At this point, the entire message queue process is over.
Recommended learning: "PHP Video Tutorial"
The above is the detailed content of This article teaches you how to implement redis message queue using ThinkPHP using think-queue.. For more information, please follow other related articles on the PHP Chinese website!