Codestruktur
Wir haben ein allgemeines WorkerPool-Paket erstellt, um Worker zum Verarbeiten von Aufgaben entsprechend der vom Unternehmen geforderten Parallelität zu verwenden. Werfen wir einen Blick auf die Verzeichnisstruktur:
workerpool ├── pool.go ├── task.go └── worker.go
Das Workerpool-Verzeichnis befindet sich im Stammverzeichnis des Projekts. „Task“ ist eine einzelne Arbeitseinheit, die verarbeitet werden muss; „Worker“ ist eine einfache Worker-Funktion, die zum Ausführen von Aufgaben verwendet wird, und „Pool“ wird zum Erstellen und Verwalten von Workern verwendet.
Erster Blick auf den Task-Code:
// workerpool/task.go package workerpool import ( "fmt" ) type Task struct { Err error Data interface{} f func(interface{}) error } func NewTask(f func(interface{}) error, data interface{}) *Task { return &Task{f: f, Data: data} } func process(workerID int, task *Task) { fmt.Printf("Worker %d processes task %v\n", workerID, task.Data) task.Err = task.f(task.Data) }
Task ist eine einfache Struktur, die alle zur Verarbeitung der Aufgabe erforderlichen Daten speichert. Beim Erstellen einer Aufgabe werden Daten und die auszuführende Funktion f übergeben und die Funktion process() verarbeitet die Aufgabe. Übergeben Sie beim Verarbeiten einer Aufgabe Daten als Parameter an die Funktion f und speichern Sie das Ausführungsergebnis in Task.Err.
Werfen wir einen Blick darauf, wie Worker Aufgaben erledigt:
// workerpool/worker.go package workerpool import ( "fmt" "sync" ) // Worker handles all the work type Worker struct { ID int taskChan chan *Task } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, } } // Start starts the worker func (wr *Worker) Start(wg *sync.WaitGroup) { fmt.Printf("Starting worker %d\n", wr.ID) wg.Add(1) go func() { defer wg.Done() for task := range wr.taskChan { process(wr.ID, task) } }() }
Wir haben eine kleine Worker-Struktur erstellt, einschließlich Worker-ID und einem Kanal zum Speichern ausstehender Aufgaben. Verwenden Sie in der Start()-Methode for range, um Aufgaben aus taskChan zu lesen und zu verarbeiten. Wie Sie sich vorstellen können, können mehrere Mitarbeiter gleichzeitig Aufgaben ausführen.
Wir erledigen Aufgaben durch die Implementierung von Task und Worker, aber es scheint etwas zu fehlen. Wer ist für die Generierung dieser Worker und das Senden von Aufgaben an sie verantwortlich? Die Antwort lautet: Worker Pool.
// workerpoo/pool.go package workerpool import ( "fmt" "sync" "time" ) // Pool is the worker pool type Pool struct { Tasks []*Task concurrency int collector chan *Task wg sync.WaitGroup } // NewPool initializes a new pool with the given tasks and // at the given concurrency. func NewPool(tasks []*Task, concurrency int) *Pool { return &Pool{ Tasks: tasks, concurrency: concurrency, collector: make(chan *Task, 1000), } } // Run runs all work within the pool and blocks until it's // finished. func (p *Pool) Run() { for i := 1; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) worker.Start(&p.wg) } for i := range p.Tasks { p.collector <- p.Tasks[i] } close(p.collector) p.wg.Wait() }
Im obigen Code speichert der Pool alle ausstehenden Aufgaben und generiert eine Reihe von Goroutinen im Einklang mit der Parallelität für die gleichzeitige Verarbeitung von Aufgaben. Gemeinsamer Cache-Kanal – Kollektor zwischen Arbeitern.
Wenn wir also diesen Arbeitspool betreiben, können wir die erforderliche Anzahl von Arbeitskräften generieren und die Sammelkanäle werden von den Arbeitskräften gemeinsam genutzt. Als nächstes verwenden Sie „for range“, um Aufgaben zu lesen und die Leseaufgaben in den Kollektor zu schreiben. Wir verwenden sync.WaitGroup, um eine Synchronisierung zwischen Coroutinen zu erreichen. Nachdem wir nun eine gute Lösung haben, testen wir sie.
// main.go package main import ( "fmt" "time" "github.com/Joker666/goworkerpool/workerpool" ) func main() { var allTask []*workerpool.Task for i := 1; i <= 100; i++ { task := workerpool.NewTask(func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) fmt.Printf("Task %d processed\n", taskID) return nil }, i) allTask = append(allTask, task) } pool := workerpool.NewPool(allTask, 5) pool.Run() }
Der obige Code erstellt 100 Aufgaben und verwendet 5 Parallelität, um diese Aufgaben zu verarbeiten.
输出如下:
Worker 3 processes task 98 Task 92 processed Worker 2 processes task 99 Task 98 processed Worker 5 processes task 100 Task 99 processed Task 100 processed Took ===============> 2.0056295s
处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。
我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。
实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:
// workerpool/worker.go // Worker handles all the work type Worker struct { ID int taskChan chan *Task quit chan bool } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, quit: make(chan bool), } } .... // StartBackground starts the worker in background waiting func (wr *Worker) StartBackground() { fmt.Printf("Starting worker %d\n", wr.ID) for { select { case task := <-wr.taskChan: process(wr.ID, task) case <-wr.quit: return } } } // Stop quits the worker func (wr *Worker) Stop() { fmt.Printf("Closing worker %d\n", wr.ID) go func() { wr.quit <- true }() }
Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。
添加完这两个新的方法之后,我们来修改下 Pool:
// workerpool/pool.go type Pool struct { Tasks []*Task Workers []*Worker concurrency int collector chan *Task runBackground chan bool wg sync.WaitGroup } // AddTask adds a task to the pool func (p *Pool) AddTask(task *Task) { p.collector <- task } // RunBackground runs the pool in background func (p *Pool) RunBackground() { go func() { for { fmt.Print("⌛ Waiting for tasks to come in ...\n") time.Sleep(10 * time.Second) } }() for i := 1; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) p.Workers = append(p.Workers, worker) go worker.StartBackground() } for i := range p.Tasks { p.collector <- p.Tasks[i] } p.runBackground = make(chan bool) <-p.runBackground } // Stop stops background workers func (p *Pool) Stop() { for i := range p.Workers { p.Workers[i].Stop() } p.runBackground <- true }
Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。
添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。
我们来看下具体是如何工作的。
如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。
// main.go ... pool := workerpool.NewPool(allTask, 5) go func() { for { taskID := rand.Intn(100) + 20 if taskID%7 == 0 { pool.Stop() } time.Sleep(time.Duration(rand.Intn(5)) * time.Second) task := workerpool.NewTask(func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) fmt.Printf("Task %d processed\n", taskID) return nil }, taskID) pool.AddTask(task) } }() pool.RunBackground()
当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。
Das obige ist der detaillierte Inhalt vonParallelität und WorkerPool in der Go-Sprache – Teil 2. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!