Kata Pengantar
Sebelum menganalisis, sila pastikan anda memahami pelaksanaan baris gilir mesej
tp5 Baris gilir mesej adalah berdasarkan redis pangkalan data dan tp pelaksanaan rasmi Topthink
Bab ini berdasarkan analisis redis
Kunci storan:
key | 类型 | 描述 |
---|---|---|
queues:queueName |
list | 要执行的任务 |
think:queue:restart |
string | 重启队列时间戳 |
queues:queueName:delayed |
zSet | 延迟任务 |
queues:queueName:reserved |
zSet | 执行失败,等待重新执行 |
Laksanakan arahan
kerja Perbezaan dengan mendengar akan diterangkan di bawah
命令 | 描述 |
---|---|
php think queue:work |
监听队列 |
php think queue:listen |
监听队列 |
php think queue:restart |
重启队列 |
php think queue:subscribe |
暂无,可能是保留的 官方有什么其他想法但是还没实现 |
Teg tingkah laku
标签 | 描述 |
---|---|
worker_daemon_start |
守护进程开启 |
worker_memory_exceeded |
内存超出 |
worker_queue_restart |
重启守护进程 |
worker_before_process |
任务开始执行之前 |
worker_before_sleep |
任务延迟执行 |
queue_failed |
任务执行失败 |
Parameter arahan
参数 | 默认值 | 可以使用的模式 | 描述 |
---|---|---|---|
queue |
null | work,listen | 要执行的任务名称 |
daemon |
null | work | 以守护进程执行任务 |
delay |
0 | work,listen | 失败后重新执行的时间 |
force |
null | work | 失败后重新执行的时间 |
memory |
128M | work,listen | 限制最大内存 |
sleep |
3 | work,listen | 没有任务的时候等待的时间 |
tries |
0 | work,listen | 任务失败后最大尝试次数 |
Perbezaan mod
1: Prinsip pelaksanaan yang berbeza
kerja: mod pemprosesan satu proses;
Tiada parameter daemon Proses kerja akan menamatkan proses semasa secara langsung selepas memproses mesej seterusnya. Apabila tiada mesej baharu, ia akan tidur untuk tempoh masa dan kemudian keluar;
Dengan parameter daemon, proses kerja akan memproses mesej dalam baris gilir secara kitaran sehingga memori melebihi konfigurasi parameter sebelum menamatkan proses. Apabila tiada mesej baharu, ia akan tidur untuk satu tempoh masa dalam setiap gelung; proses. Proses anak kerja akan memproses mesej seterusnya dalam baris gilir melalui proses anak kerja apabila proses anak kerja keluar, proses induk di mana
2: Masa keluar adalah berbeza
kerja: lihat di atas
01: Masa pelaksanaan proses anak kerja yang dibuat melebihi konfigurasi parameter tamat masa dalam baris arahan dengar pada masa ini, proses anak kerja akan ditamatkan secara paksa, dan proses induk di mana mendengar berada yang terletak juga akan membuang pengecualian dan keluar ProcessTimeoutException ;
Pembangun boleh memilih untuk menangkap pengecualian ini dan membiarkan proses induk terus dilaksanakan;
02: Proses induk mengalami kebocoran memori atas sebab tertentu, dan apabila memori yang diduduki oleh proses induk itu sendiri melebihi baris arahan Apabila parameter --memory dikonfigurasikan dalam, kedua-dua proses ibu bapa dan anak akan keluar. Dalam keadaan biasa, ingatan yang diduduki oleh proses mendengar itu sendiri adalah stabil.
3: Prestasi berbeza
kerja: Ia bergelung di dalam skrip, dan skrip rangka kerja dimuatkan pada peringkat awal pelaksanaan perintah;
dengar: Ia membuka yang baharu selepas diproses tugasan Proses kerja, skrip rangka kerja akan dimuat semula pada masa ini
Nota: Apabila kod dikemas kini, anda perlu melaksanakan perintah php think queue:restart secara manual dalam mod kerja untuk memulakan semula baris gilir agar perubahan berkuat kuasa semasa dalam mod dengar, ia akan berkuat kuasa secara automatik tanpa yang lain operasi.
4: Keupayaan kawalan tamat masa
kerja: Pada dasarnya, ia tidak boleh mengawal masa berjalan proses itu sendiri mahupun mengehadkan masa pelaksanaan tugasan pelaksanaan
Masa maksimum subproses kerja dibenarkan dijalankan boleh dihadkan melalui parameter tamat masa yang belum tamat melebihi had masa ini akan dihadkan secara paksa ditamatkan;
Perbezaan antara tamat tempoh dan masa
tamat ditetapkan dalam fail konfigurasi dan merujuk kepada masa tamat tugasan Masa ini adalah global dan mempengaruhi semua proses kerja ditetapkan dalam parameter baris arahan dan merujuk kepada masa tamat masa sub-proses kerja Kali ini Ia hanya sah untuk perintah dengar yang dilaksanakan pada masa ini ialah sub-proses kerja. Senario penggunaan yang berbeza
01: Bilangan tugas yang besar
02: Keperluan prestasi tinggi
04: Tiada gelung tak terhingga , sleep(), exit(), die() dan logik lain yang boleh membawa kepada pepijat dalam kelas pengguna dengan mudah
dengar senario yang berkenaan ialah:
01: Bilangan tugasan adalah kecil
02: Masa pelaksanaan tugas adalah panjang
03: Masa pelaksanaan tugas perlu dihadkan dengan ketat
Operasi awam
Memandangkan kami melakukan analisis berdasarkan redis, kita hanya perlu menganalisis src/queue/connector/redis.php
01: Mula-mula panggil kaedah ajaib dalam
02: Dipanggil dalam kaedah __callStatic
03 : Mula-mula muatkan fail konfigurasi dalam buildConnector, jika tiada, ia akan dilaksanakan secara serentak
04: Buat sambungan berdasarkan fail konfigurasi dan lulus dalam konfigurasisrc/Queue.php
__callStatic
Operasi dalam pembina kelas redis.php:buildConnector
01: Semak sama ada sambungan redis dipasang
02: Gabungkan konfigurasi
03: Semak sama ada ia sambungan redis atau pRedis
04: Cipta objek sambungan
Proses penerbitan
Terbitkan parameter
立即执行
push($job, $data, $queue) Queue::push(Test::class, ['id' => 1], 'test');
一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName
数组结构:
[ 'job' => $job, // 要执行任务的类 'data' => $data, // 任务数据 'id'=>'xxxxx' //任务id ]
写入 redis并且返回队列id
至于中间的那顿骚操作太长了就没写
延迟发布
later($delay, $job, $data, $queue) Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed
score为当前的时间戳+$delay
执行过程
执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用
//守护进程开启 'worker_daemon_start' => [ \app\index\behavior\WorkerDaemonStart::class ], //内存超出 'worker_memory_exceeded' => [ \app\index\behavior\WorkerMemoryExceeded::class ], //重启守护进程 'worker_queue_restart' => [ \app\index\behavior\WorkerQueueRestart::class ], //任务开始执行之前 'worker_before_process' => [ \app\index\behavior\WorkerBeforeProcess::class ], //任务延迟执行 'worker_before_sleep' => [ \app\index\behavior\WorkerBeforeSleep::class ], //任务执行失败 'queue_failed' => [ \app\index\behavior\QueueFailed::class ]
public function run(Output $output) { $output->write('<info>任务执行失败</info>', true); }
控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出
守护进程开启 任务延迟执行
失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed
在app\index\behavior@run
方法里面写失败的逻辑 比如邮件通知 写入日志等
最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架
在其他项目中推任务
php版本
<?php class Index { private $redis = null; public function __construct() { $this->redis = new Redis(); $this->redis->connect('127.0.0.1', 6379); $this->redis->select(10); } public function push($job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->rPush('queues:' . $queue, $payload); } public function later($delay, $job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload); } private function createPayload($job, $data) { $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32)); return $this->setMeta($payload, 'attempts', 1); } private function setMeta($payload, $key, $value) { $payload = json_decode($payload, true); $payload[$key] = $value; $payload = json_encode($payload); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } private function random(int $length = 16): string { $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; $randomString = ''; for ($i = 0; $i later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
go版本
package main import ( "encoding/json" "github.com/garyburd/redigo/redis" "math/rand" "time" ) type Payload struct { Id string `json:"id"` Job string `json:"job"` Data interface{} `json:"data"` Attempts int `json:"attempts"` } var RedisClient *redis.Pool func init() { RedisClient = &redis.Pool{ MaxIdle: 20, MaxActive: 500, IdleTimeout: time.Second * 100, Dial: func() (conn redis.Conn, e error) { c, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } _, _ = c.Do("SELECT", 10) return c, nil }, } } func main() { var data = make(map[string]interface{}) data["id"] = "1" later(10, "app\\index\\jobs\\Test", data, "test") } func push(job string, data interface{}, queue string) { payload := createPayload(job, data) queueName := "queues:" + queue _, _ = RedisClient.Get().Do("rPush", queueName, payload) } func later(delay int, job string, data interface{}, queue string) { m, _ := time.ParseDuration("+1s") currentTime := time.Now() op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix() createPayload(job, data) payload := createPayload(job, data) queueName := "queues:" + queue + ":delayed" _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload) } // 创建指定格式的数据 func createPayload(job string, data interface{}) (payload string) { payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1} jsonStr, _ := json.Marshal(payload1) return string(jsonStr) } // 创建随机字符串 func random(n int) string { var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") b := make([]rune, n) for i := range b { b[i] = str[rand.Intn(len(str))] } return string(b) }