Rumah > pembangunan bahagian belakang > Golang > Concurrency dan WorkerPool dalam bahasa Go - Bahagian 1

Concurrency dan WorkerPool dalam bahasa Go - Bahagian 1

Lepaskan: 2023-07-25 14:35:02
ke hadapan
597 orang telah melayarinya

Concurrency telah menjadi ciri penting dalam bahasa pengaturcaraan moden. Kebanyakan bahasa pengaturcaraan kini mempunyai beberapa kaedah untuk mencapai konkurensi.

Sesetengah pelaksanaan ini sangat berkuasa dan boleh mengalihkan beban ke urutan sistem yang berbeza, seperti Java, dll.

Model konkurensi Golang sangat berkuasa, dipanggil CSP (Communicating Sequential Process), yang memecahkan masalah kepada proses berjujukan yang lebih kecil dan kemudian menjadualkan kejadian proses ini (dipanggil Goroutines). Proses ini berkomunikasi dengan menyampaikan maklumat melalui saluran.

Dalam artikel ini, kami akan meneroka cara memanfaatkan konkurensi golang dan cara menggunakannya dalam workerPool. Dalam artikel kedua dalam siri ini, kami akan meneroka cara membina penyelesaian serentak yang berkuasa.

Contoh mudah

Andaikan kita perlu memanggil antara muka API luaran, dan keseluruhan proses mengambil masa 100ms. Jika kita perlu memanggil antara muka ini 1000 kali serentak, ia akan mengambil masa 100s.

//// model/data.go

package model

type SimpleData struct {
 ID int
}

//// basic/basic.go

package basic

import (
 "fmt"
 "github.com/Joker666/goworkerpool/model"
 "time"
)

func Work(allData []model.SimpleData) {
 start := time.Now()
 for i, _ := range allData {
  Process(allData[i])
 }
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func Process(data model.SimpleData) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 fmt.Printf("Finish processing %d\n", data.ID)
}

//// main.go

package main

import (
 "fmt"
 "github.com/Joker666/goworkerpool/basic"
 "github.com/Joker666/goworkerpool/model"
 "github.com/Joker666/goworkerpool/worker"
)

func main() {
 // Prepare the data
 var allData []model.SimpleData
 for i := 0; i < 1000; i++ {
  data := model.SimpleData{ ID: i }
  allData = append(allData, data)
 }
 fmt.Printf("Start processing all work \n")

 // Process
 basic.Work(allData)
}
Salin selepas log masuk
Start processing all work
Took ===============> 1m40.226679665s
Salin selepas log masuk

Kod di atas mencipta pakej model, yang mengandungi struktur yang mempunyai hanya satu ahli jenis int. Kami memproses data secara serentak, yang jelas tidak optimum kerana tugasan ini boleh diproses secara serentak. Mari kita ubah penyelesaian dan gunakan goroutine dan saluran untuk mengendalikannya.

Asynchronous

//// worker/notPooled.go

func NotPooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup

 dataCh := make(chan model.SimpleData, 100)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for data := range dataCh {
   wg.Add(1)
   go func(data model.SimpleData) {
    defer wg.Done()
    basic.Process(data)
   }(data)
  }
 }()

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.NotPooledWork(allData)
Salin selepas log masuk
Start processing all work
Took ===============> 101.191534ms
Salin selepas log masuk

Dalam kod di atas, kami mencipta saluran cache dengan kapasiti 100 dan menolak data ke dalam saluran melalui NoPooledWork(). Selepas panjang saluran mencapai 100, kami tidak boleh menambah elemen padanya sehingga elemen dibaca. Gunakan untuk julat untuk membaca saluran dan menjana pemprosesan goroutine. Di sini kami tidak mempunyai had pada bilangan goroutine yang dihasilkan, yang boleh mengendalikan seberapa banyak tugas yang mungkin. Secara teorinya, sebanyak mungkin data boleh diproses berdasarkan sumber yang diperlukan. Melaksanakan kod, ia hanya mengambil masa 100ms untuk menyelesaikan 1000 tugasan. Ia gila! Tidak sepenuhnya, baca terus.

Masalah

Melainkan kita mempunyai semua sumber di bumi, hanya terdapat begitu banyak sumber yang boleh diperuntukkan pada masa tertentu. Memori minimum yang diduduki oleh goroutine ialah 2k, tetapi ia juga boleh mencapai 1G. Penyelesaian di atas untuk melaksanakan semua tugas secara serentak, dengan mengandaikan sejuta tugas, akan cepat meletihkan memori mesin dan CPU. Kita sama ada perlu menaik taraf konfigurasi mesin atau mencari penyelesaian lain yang lebih baik.

计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。

解决方案:Worker Pool

我们通过实现 worker pool 来修复之前遇到的问题。

//// worker/pooled.go

func PooledWork(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    basic.Process(data)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.PooledWork(allData)
Salin selepas log masuk
Start processing all work
Took ===============> 1.002972449s
Salin selepas log masuk

上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。

Go 语言的 channel 可以当作队列使用。

这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。

处理错误

我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。

//// worker/pooledError.go

func PooledWorkError(allData []model.SimpleData) {
 start := time.Now()
 var wg sync.WaitGroup
 workerPoolSize := 100

 dataCh := make(chan model.SimpleData, workerPoolSize)
 errors := make(chan error, 1000)

 for i := 0; i < workerPoolSize; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()

   for data := range dataCh {
    process(data, errors)
   }
  }()
 }

 for i, _ := range allData {
  dataCh <- allData[i]
 }

 close(dataCh)

 wg.Add(1)
 go func() {
  defer wg.Done()
  for {
   select {
   case err := <-errors:
    fmt.Println("finished with error:", err.Error())
   case <-time.After(time.Second * 1):
    fmt.Println("Timeout: errors finished")
    return
   }
  }
 }()

 defer close(errors)
 wg.Wait()
 elapsed := time.Since(start)
 fmt.Printf("Took ===============> %s\n", elapsed)
}

func process(data model.SimpleData, errors chan<- error) {
 fmt.Printf("Start processing %d\n", data.ID)
 time.Sleep(100 * time.Millisecond)
 if data.ID % 29 == 0 {
  errors <- fmt.Errorf("error on job %v", data.ID)
 } else {
  fmt.Printf("Finish processing %d\n", data.ID)
 }
}

//// main.go

// Process
worker.PooledWorkError(allData)
Salin selepas log masuk

我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。

比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,

我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。

总结

Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!

Atas ialah kandungan terperinci Concurrency dan WorkerPool dalam bahasa Go - Bahagian 1. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:Go语言进阶学习
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan