Concurrent queue processing system based on Swoole and Redis
PHP中高级教程分享
PHP中高级教程分享 2018-11-14 11:18:20
0
2
1510

Since PHP does not support multi-threading, as a complete system, many operations need to be completed asynchronously. In order to complete these asynchronous operations, we built a Redis queue task system.

As we all know, a message queue processing system is mainly divided into two parts: consumers and producers.

In our system, the main system acts as the producer and the task system acts as the consumer.

The specific workflow is as follows: 1. The main system pushes the name of the task that needs to be processed and the task parameters into the queue. 2. The task system pops the task queue in real time. When a task pops out, it forks a sub-process, and the sub-process completes the specific task logic.

/**
 * 启动守护进程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}
 
/**
 * 创建子进程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子进程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主进程
        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}
 
/**
 * 业务任务队列处理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}


PHP中高级教程分享
PHP中高级教程分享

+VX:PHPopen888PHP中高级教程分享,专注laravel,swoole,tp,分布式高并发处理教程分享

reply all(0)
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template