Maison > développement back-end > Golang > le corps du texte

Concurrence et WorkerPool en langage Go - Partie 2

Libérer: 2023-07-21 10:47:45
avant
1115 Les gens l'ont consulté

Structure du code

Nous avons créé un package workerPool général pour utiliser des travailleurs pour traiter les tâches en fonction de la concurrence requise par l'entreprise. Jetons un coup d'œil à la structure des répertoires :

workerpool
├── pool.go
├── task.go
└── worker.go
Copier après la connexion

Le répertoire Workerpool se trouve dans le répertoire racine du projet. La tâche est une unité de travail unique qui doit être traitée ; Worker est une simple fonction de travail utilisée pour effectuer des tâches et Pool est utilisé pour créer et gérer des travailleurs.

implémentation

Premier coup d'œil au code de la tâche :

// workerpool/task.go

package workerpool

import (
 "fmt"
)

type Task struct {
 Err  error
 Data interface{}
 f    func(interface{}) error
}

func NewTask(f func(interface{}) error, data interface{}) *Task {
 return &Task{f: f, Data: data}
}

func process(workerID int, task *Task) {
 fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
 task.Err = task.f(task.Data)
}
Copier après la connexion

La tâche est une structure simple qui enregistre toutes les données nécessaires au traitement de la tâche. Lors de la création d'une tâche, les données et la fonction f à exécuter sont transmises et la fonction process() traitera la tâche. Lors du traitement d'une tâche, transmettez Data en paramètre à la fonction f et enregistrez le résultat de l'exécution dans Task.Err.

Jetons un coup d'œil à la façon dont Worker gère les tâches :

// workerpool/worker.go

package workerpool

import (
 "fmt"
 "sync"
)

// Worker handles all the work
type Worker struct {
 ID       int
 taskChan chan *Task
}

// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
 return &Worker{
  ID:       ID,
  taskChan: channel,
 }
}

// Start starts the worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
 fmt.Printf("Starting worker %d\n", wr.ID)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for task := range wr.taskChan {
   process(wr.ID, task)
  }
 }()
}
Copier après la connexion

Nous avons créé une petite structure Worker, comprenant un identifiant de travailleur et un canal pour enregistrer les tâches en attente. Dans la méthode Start(), utilisez for range pour lire les tâches de taskChan et les traiter. Comme vous pouvez l’imaginer, plusieurs travailleurs peuvent effectuer des tâches simultanément.

workerPool

Nous gérons les tâches en implémentant Task et Worker, mais il semble qu'il manque quelque chose qui est responsable de la génération de ces travailleurs et de leur envoyer des tâches ? La réponse est : pool de travailleurs.

// workerpoo/pool.go

package workerpool

import (
 "fmt"
 "sync"
 "time"
)

// Pool is the worker pool
type Pool struct {
 Tasks   []*Task

 concurrency   int
 collector     chan *Task
 wg            sync.WaitGroup
}

// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
 return &Pool{
  Tasks:       tasks,
  concurrency: concurrency,
  collector:   make(chan *Task, 1000),
 }
}

// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
 for i := 1; i <= p.concurrency; i++ {
  worker := NewWorker(p.collector, i)
  worker.Start(&p.wg)
 }

 for i := range p.Tasks {
  p.collector <- p.Tasks[i]
 }
 close(p.collector)

 p.wg.Wait()
}
Copier après la connexion

Dans le code ci-dessus, le pool enregistre toutes les tâches en attente et génère un certain nombre de goroutines cohérentes avec la concurrence pour le traitement simultané des tâches. Canal de cache partagé – collecteur entre les travailleurs.

Ainsi, lorsque nous gérons ce pool de travail, nous pouvons générer le nombre requis de travailleurs, et les canaux collecteurs sont partagés entre les travailleurs. Ensuite, utilisez for range pour lire les tâches et écrire les tâches de lecture dans le collecteur. Nous utilisons sync.WaitGroup pour réaliser la synchronisation entre les coroutines. Maintenant que nous avons une bonne solution, testons-la.

// main.go

package main

import (
 "fmt"
 "time"

 "github.com/Joker666/goworkerpool/workerpool"
)

func main() {
 var allTask []*workerpool.Task
 for i := 1; i <= 100; i++ {
  task := workerpool.NewTask(func(data interface{}) error {
   taskID := data.(int)
   time.Sleep(100 * time.Millisecond)
   fmt.Printf("Task %d processed\n", taskID)
   return nil
  }, i)
  allTask = append(allTask, task)
 }

 pool := workerpool.NewPool(allTask, 5)
 pool.Run()
}
Copier après la connexion

Le code ci-dessus crée 100 tâches et utilise 5 simultanéités pour traiter ces tâches.

输出如下:

Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s
Copier après la connexion

处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。

我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。

进一步扩展:后台处理任务

实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:

// workerpool/worker.go

// Worker handles all the work
type Worker struct {
 ID       int
 taskChan chan *Task
 quit     chan bool
}

// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
 return &Worker{
  ID:       ID,
  taskChan: channel,
  quit:     make(chan bool),
 }
}

....

// StartBackground starts the worker in background waiting
func (wr *Worker) StartBackground() {
 fmt.Printf("Starting worker %d\n", wr.ID)

 for {
  select {
  case task := <-wr.taskChan:
   process(wr.ID, task)
  case <-wr.quit:
   return
  }
 }
}

// Stop quits the worker
func (wr *Worker) Stop() {
 fmt.Printf("Closing worker %d\n", wr.ID)
 go func() {
  wr.quit <- true
 }()
}
Copier après la connexion

Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。

添加完这两个新的方法之后,我们来修改下 Pool:

// workerpool/pool.go

type Pool struct {
 Tasks   []*Task
 Workers []*Worker

 concurrency   int
 collector     chan *Task
 runBackground chan bool
 wg            sync.WaitGroup
}

// AddTask adds a task to the pool
func (p *Pool) AddTask(task *Task) {
 p.collector <- task
}

// RunBackground runs the pool in background
func (p *Pool) RunBackground() {
 go func() {
  for {
   fmt.Print("⌛ Waiting for tasks to come in ...\n")
   time.Sleep(10 * time.Second)
  }
 }()

 for i := 1; i <= p.concurrency; i++ {
  worker := NewWorker(p.collector, i)
  p.Workers = append(p.Workers, worker)
  go worker.StartBackground()
 }

 for i := range p.Tasks {
  p.collector <- p.Tasks[i]
 }

 p.runBackground = make(chan bool)
 <-p.runBackground
}

// Stop stops background workers
func (p *Pool) Stop() {
 for i := range p.Workers {
  p.Workers[i].Stop()
 }
 p.runBackground <- true
}
Copier après la connexion

Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。

添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。

我们来看下具体是如何工作的。

如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。

// main.go

...

pool := workerpool.NewPool(allTask, 5)
go func() {
 for {
  taskID := rand.Intn(100) + 20

  if taskID%7 == 0 {
   pool.Stop()
  }

  time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
  task := workerpool.NewTask(func(data interface{}) error {
   taskID := data.(int)
   time.Sleep(100 * time.Millisecond)
   fmt.Printf("Task %d processed\n", taskID)
   return nil
  }, taskID)
  pool.AddTask(task)
 }
}()
pool.RunBackground()
Copier après la connexion

当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:Go语言进阶学习
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal