Delay queue, as the name suggests, is a message queue with delay function. So, under what circumstances do I need such a queue?
1. Background
Let’s take a look at the business scenario:
1. Send a recall notice 3 days before the membership expires
2. After the order payment is successful, check whether the downstream links are normal after 5 minutes. For example, after the user purchases a membership, whether the various membership statuses are set successfully
3. How to regularly check whether the order in refund status has been Refund successfully?
4. If the notification fails, the notification will be repeated in 1, 3, 5, and 7 minutes until the other party replies?
Usually the simplest and most direct way to solve the above problems is to scan the meter regularly.
The problems with table scanning are:
1. The table scanning is connected to the database for a long time. In the case of large quantities, the connection is prone to abnormal interruption, which requires more exception handling and the program High robustness requirements
2. When the amount of data is large, the delay is high and the processing cannot be completed within the regulations, which affects the business. Although multiple processes can be started for processing, this will bring additional maintenance costs. , cannot be fundamentally solved.
3. Each business must maintain its own table scanning logic. When the business increases, it is found that the logic of the table scanning part will be developed repeatedly, but it is very similar
The delay queue can solve the above needs very well
2. Research
We investigated some open source solutions on the market, as follows:
·Youzan Technology: Only principles, no open source code
·Github personal:
1. Based on redis implementation, only one redis can be configured. If redis hangs, the entire service will be unavailable. The usability is poor
2. The consumer side implements the pull model, and the access cost is high. Each project has to implement the access code
3. There are not many people using star. There are risks when placed in a production environment. In addition, if you don’t understand the Go language, it will be difficult to maintain if something goes wrong
·SchedulerX-Alibaba open source: Very powerful, but complex operation and maintenance, dependent on components Too many, not lightweight enough
·RabbitMQ-delayed task: It does not have a delay function itself, it needs to be implemented by itself with the help of a feature, and the company has not deployed this queue, so deploy this one separately. The cost of making a delayed queue is a bit high, and it also requires special operation and maintenance. Currently, the team does not support it. Basically, for the above reasons, I plan to write one myself. I usually use PHP mostly, and the project basically uses the zset structure of redis as storage. Implemented in PHP language, the implementation principle refers to the Youzan team:
Related recommendations: "
php tutorial3. Goal·
Lightweight: It can be run directly with less PHP extensions, without the need to introduce network frameworks, such as swoole, workman, etc.
·Stability: Using the master-work architecture, the master does not do business processing, but is only responsible for managing the child process. When the child process exits abnormally, it will automatically start up
·Availability: 1. Supports multi-instance deployment, each instance is stateless, and the failure of one instance will not affect the service
2. Supports the configuration of multiple redis, one redis Hanging up will only affect some messages
3. The business side has easy access. In the background, you only need to fill in the relevant message type and return the interface
·Extensibility: when consuming When there is a bottleneck in the process, you can configure to increase the number of consuming processes. When there is a bottleneck in writing, you can increase the number of instances. The writing performance can be improved linearly
·Real-time: a certain amount of time is allowed time error.
·Support message deletion: Business users can delete specified messages at any time.
·Message transmission reliability: After the message enters the delay queue, it is guaranteed to be consumed at least once.
·Write performance: qps>1000
4. Architecture design and descriptionOverall architecture
Insert picture description here
Adopts the master-work architecture model, which mainly includes 6 modules:
1.dq-mster: main process, responsible for managing the creation and destruction of child processes. Recycling and signal notification
2.dq-server: Responsible for message writing, reading, deletion functions and maintaining the redis connection pool
3.dq-timer-N: Responsible for zset from redis Scans the expired messages in the structure and is responsible for writing to the ready queue. The number is configurable, usually 2 is enough, because the messages in the zset structure are ordered by time
4.dq-consume-N : Responsible for reading messages from the ready queue and notifying the corresponding callback interface. The number can be configured
5.dq-redis-checker: Responsible for checking the service status of redis. If redis is down, send an alarm email
6.dq-http-server: Provides a web background interface for registering topics
5. DeploymentEnvironment dependencies: PHP 5.4 installation sockets, redis, pcntl, pdo_mysql extension
create database dq; #存放告警信息 CREATE TABLE `dq_alert` ( `id` int(11) NOT NULL AUTO_INCREMENT, `host` varchar(255) NOT NULL DEFAULT '', `port` int(11) NOT NULL DEFAULT '0', `user` varchar(255) NOT NULL DEFAULT '', `pwd` varchar(255) NOT NULL DEFAULT '', `ext` varchar(2048) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; #存放redis信息 CREATE TABLE `dq_redis` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(200) NOT NULL DEFAULT '', `t_content` varchar(2048) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8; #存储注册信息 CREATE TABLE `dq_topic` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(1024) NOT NULL DEFAULT '', `delay` int(11) NOT NULL DEFAULT '0', `callback` varchar(1024) NOT NULL DEFAULT '', `timeout` int(11) NOT NULL DEFAULT '3000', `email` varchar(1024) NOT NULL DEFAULT '', `topic` varchar(255) NOT NULL DEFAULT '', `createor` varchar(1024) NOT NULL DEFAULT '', `status` tinyint(4) NOT NULL DEFAULT '1', `method` varchar(32) NOT NULL DEFAULT 'GET', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
step2:在DqConfg.文件中配置数据库信息: DqConf::$db
在DqConf.php文件中修改php了路径 $logPath
php DqHttpServer.php --port 8088
redis信息格式:host:post:auth 比如
php DqInit.php --port 6789
<?php include_once 'DqLoader.php'; date_default_timezone_set("PRC"); //可配置多个 $server=array( '', ); $dqClient = new DqClient(); $dqClient->addServer($server); $topic ='order_openvip_checker'; //topic在后台注册 $id = uniqid(); $data=array( 'id'=>$id, 'body'=>array( 'a'=>1, 'b'=>2, 'c'=>3, 'ext'=>str_repeat('a',64), ), //可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定 'fix_time'=>date('Y-m-d 23:50:50'), ); //添加 $boolRet = $dqClient->add($topic, $data); echo 'add耗时:'.(msectime() - $time)."ms\n"; //查询 $time = msectime(); $result = $dqClient->get($topic, $id); echo 'get耗时:'.(msectime() - $time)."ms\n"; //删除 $time = msectime(); $boolRet = $dqClient->del($topic,$id); echo 'del耗时:'.(msectime() - $time)."ms\n";
执行php test.php
2.优雅退出命令: master检测侦听了USR2信号,收到信号后会通知所有子进程,子进程完成当前任务后会自动退出
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2
php DqBench concurrency requests concurrency:并发数 requests: 每个并发产生的请求数 测试环境:内存 8G ,8核cpu,2个redis和1个dq-server 部署在一个机器上,数据包64字节 qps:2400
1.redis multi命令:将多个对redis的操作打包成一个减少网络开销。
3.内存泄露检测有必要: 所有的内存分配在底层都是调用了brk或者mmap,只要程序只有大量brk或者mmap的系统调用,内存泄露可能性非常高 ,检测命令: strace -c -p pid | grep 'mmap| brk'。
4.检测程序的系统调用情况:strace -c -p pid ,发现某个系统函数调用是其他的数倍,可能大概率程序存在问题。
The above is the detailed content of How is PHP delay queue implemented?. For more information, please follow other related articles on the PHP Chinese website!