Concurrency telah menjadi ciri penting dalam bahasa pengaturcaraan moden. Kebanyakan bahasa pengaturcaraan kini mempunyai beberapa kaedah untuk mencapai konkurensi.
Sesetengah pelaksanaan ini sangat berkuasa dan boleh mengalihkan beban ke urutan sistem yang berbeza, seperti Java, dll.
Model konkurensi Golang sangat berkuasa, dipanggil CSP (Communicating Sequential Process), yang memecahkan masalah kepada proses berjujukan yang lebih kecil dan kemudian menjadualkan kejadian proses ini (dipanggil Goroutines). Proses ini berkomunikasi dengan menyampaikan maklumat melalui saluran.
Dalam artikel ini, kami akan meneroka cara memanfaatkan konkurensi golang dan cara menggunakannya dalam workerPool. Dalam artikel kedua dalam siri ini, kami akan meneroka cara membina penyelesaian serentak yang berkuasa.
Andaikan kita perlu memanggil antara muka API luaran, dan keseluruhan proses mengambil masa 100ms. Jika kita perlu memanggil antara muka ini 1000 kali serentak, ia akan mengambil masa 100s.
//// model/data.go package model type SimpleData struct { ID int } //// basic/basic.go package basic import ( "fmt" "github.com/Joker666/goworkerpool/model" "time" ) func Work(allData []model.SimpleData) { start := time.Now() for i, _ := range allData { Process(allData[i]) } elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func Process(data model.SimpleData) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) fmt.Printf("Finish processing %d\n", data.ID) } //// main.go package main import ( "fmt" "github.com/Joker666/goworkerpool/basic" "github.com/Joker666/goworkerpool/model" "github.com/Joker666/goworkerpool/worker" ) func main() { // Prepare the data var allData []model.SimpleData for i := 0; i < 1000; i++ { data := model.SimpleData{ ID: i } allData = append(allData, data) } fmt.Printf("Start processing all work \n") // Process basic.Work(allData) }
Start processing all work Took ===============> 1m40.226679665s
Kod di atas mencipta pakej model, yang mengandungi struktur yang mempunyai hanya satu ahli jenis int. Kami memproses data secara serentak, yang jelas tidak optimum kerana tugasan ini boleh diproses secara serentak. Mari kita ubah penyelesaian dan gunakan goroutine dan saluran untuk mengendalikannya.
//// worker/notPooled.go func NotPooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup dataCh := make(chan model.SimpleData, 100) wg.Add(1) go func() { defer wg.Done() for data := range dataCh { wg.Add(1) go func(data model.SimpleData) { defer wg.Done() basic.Process(data) }(data) } }() for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.NotPooledWork(allData)
Start processing all work Took ===============> 101.191534ms
Dalam kod di atas, kami mencipta saluran cache dengan kapasiti 100 dan menolak data ke dalam saluran melalui NoPooledWork(). Selepas panjang saluran mencapai 100, kami tidak boleh menambah elemen padanya sehingga elemen dibaca. Gunakan untuk julat untuk membaca saluran dan menjana pemprosesan goroutine. Di sini kami tidak mempunyai had pada bilangan goroutine yang dihasilkan, yang boleh mengendalikan seberapa banyak tugas yang mungkin. Secara teorinya, sebanyak mungkin data boleh diproses berdasarkan sumber yang diperlukan. Melaksanakan kod, ia hanya mengambil masa 100ms untuk menyelesaikan 1000 tugasan. Ia gila! Tidak sepenuhnya, baca terus.
Melainkan kita mempunyai semua sumber di bumi, hanya terdapat begitu banyak sumber yang boleh diperuntukkan pada masa tertentu. Memori minimum yang diduduki oleh goroutine ialah 2k, tetapi ia juga boleh mencapai 1G. Penyelesaian di atas untuk melaksanakan semua tugas secara serentak, dengan mengandaikan sejuta tugas, akan cepat meletihkan memori mesin dan CPU. Kita sama ada perlu menaik taraf konfigurasi mesin atau mencari penyelesaian lain yang lebih baik.
计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。
我们通过实现 worker pool 来修复之前遇到的问题。
//// worker/pooled.go func PooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { basic.Process(data) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.PooledWork(allData)
Start processing all work Took ===============> 1.002972449s
上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。
Go 语言的 channel 可以当作队列使用。
这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。
我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。
//// worker/pooledError.go func PooledWorkError(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) errors := make(chan error, 1000) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { process(data, errors) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Add(1) go func() { defer wg.Done() for { select { case err := <-errors: fmt.Println("finished with error:", err.Error()) case <-time.After(time.Second * 1): fmt.Println("Timeout: errors finished") return } } }() defer close(errors) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func process(data model.SimpleData, errors chan<- error) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) if data.ID % 29 == 0 { errors <- fmt.Errorf("error on job %v", data.ID) } else { fmt.Printf("Finish processing %d\n", data.ID) } } //// main.go // Process worker.PooledWorkError(allData)
我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。
比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,
我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。
Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!
Atas ialah kandungan terperinci Concurrency dan WorkerPool dalam bahasa Go - Bahagian 1. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!