同時実行性は、最新のプログラミング言語において不可欠な機能となっています。現在、ほとんどのプログラミング言語には、同時実行性を実現するための何らかの方法が備わっています。
これらの実装の一部は非常に強力で、Java などの別のシステム スレッドに負荷をシフトできます。また、Ruby など、同じスレッド上でこの動作をシミュレートするものもあります。
Golang の同時実行モデルは、CSP (Communicating Sequential Process) と呼ばれる非常に強力で、問題を小さな逐次プロセスに分割し、これらのプロセスのインスタンス (Goroutines と呼ばれる) をスケジュールします。これらのプロセスは、チャネルを通じて情報を渡すことによって通信します。
この記事では、golang の同時実行性を活用する方法と、workerPool でそれを使用する方法について説明します。シリーズの 2 番目の記事では、強力な同時実行ソリューションを構築する方法を検討します。
外部 API インターフェイスを呼び出す必要があり、プロセス全体に 100 ミリ秒かかるとします。このインターフェイスを 1000 回同期的に呼び出す必要がある場合、数百秒かかります。
//// model/data.go package model type SimpleData struct { ID int } //// basic/basic.go package basic import ( "fmt" "github.com/Joker666/goworkerpool/model" "time" ) func Work(allData []model.SimpleData) { start := time.Now() for i, _ := range allData { Process(allData[i]) } elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func Process(data model.SimpleData) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) fmt.Printf("Finish processing %d\n", data.ID) } //// main.go package main import ( "fmt" "github.com/Joker666/goworkerpool/basic" "github.com/Joker666/goworkerpool/model" "github.com/Joker666/goworkerpool/worker" ) func main() { // Prepare the data var allData []model.SimpleData for i := 0; i < 1000; i++ { data := model.SimpleData{ ID: i } allData = append(allData, data) } fmt.Printf("Start processing all work \n") // Process basic.Work(allData) }
Start processing all work Took ===============> 1m40.226679665s
上記のコードは、int 型のメンバーを 1 つだけ持つ構造体を含むモデル パッケージを作成します。データは同期的に処理されますが、これらのタスクは同時に処理される可能性があるため、これは明らかに最適ではありません。ソリューションを変更し、ゴルーチンとチャネルを使用して処理してみましょう。
//// worker/notPooled.go func NotPooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup dataCh := make(chan model.SimpleData, 100) wg.Add(1) go func() { defer wg.Done() for data := range dataCh { wg.Add(1) go func(data model.SimpleData) { defer wg.Done() basic.Process(data) }(data) } }() for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.NotPooledWork(allData)
Start processing all work Took ===============> 101.191534ms
上記のコードでは、容量 100 のキャッシュ チャネルを作成し、NoPooledWork を通じてデータをチャネルにプッシュしました。 () 。チャネル長が 100 に達すると、要素が読み取られるまで要素を追加できません。 range に使用してチャネルを読み取り、ゴルーチン処理を生成します。ここでは、生成されるゴルーチンの数に制限がなく、できるだけ多くのタスクを処理できます。理論的には、必要なリソースがあれば、できるだけ多くのデータを処理できます。コードを実行すると、1000 個のタスクを完了するのにわずか 100 ミリ秒しかかかりませんでした。それはクレイジーです!完全ではありませんが、読み続けてください。
私たちが地球上のすべての資源を所有していない限り、一度に割り当てられる量には制限があります。 goroutine が占有する最小メモリは 2k ですが、1G に達することもあります。すべてのタスクを同時に実行する上記の解決策では、100 万個のタスクを想定すると、マシンのメモリと CPU がすぐに使い果たされてしまいます。マシンの構成をアップグレードするか、他のより良いソリューションを見つける必要があります。
计算机科学家很久之前就考虑过这个问题,并提出了出色的解决方案 - 使用 Thread Pool 或者 Worker Pool。这个方案是使用 worker 数量受限的工作池来处理任务,workers 会按顺序一个接一个处理任务,这样就避免了 CPU 和内存使用急速增长。
我们通过实现 worker pool 来修复之前遇到的问题。
//// worker/pooled.go func PooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { basic.Process(data) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.PooledWork(allData)
Start processing all work Took ===============> 1.002972449s
上面的代码,worker 数量限制在 100,我们创建了相应数量的 goroutine 来处理任务。我们可以把 channel 看作是队列,worker goroutine 看作是消费者。多个 goroutine 可以监听同一个 channel,但是 channel 里的每一个元素只会被处理一次。
Go 语言的 channel 可以当作队列使用。
这是一个比较好的解决方案,执行代码,我们看到完成所有任务花费 1s。虽然没有 100ms 这么快,但已经能满足业务需要,而且我们得到了一个更好的解决方案,能将负载均摊在不同的时间片上。
我们能做的还没完。上面看起来是一个完整的解决方案,但却不是的,我们没有处理错误情况。所以需要模拟出错的情形,并且看下我们需要怎么处理。
//// worker/pooledError.go func PooledWorkError(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) errors := make(chan error, 1000) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { process(data, errors) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Add(1) go func() { defer wg.Done() for { select { case err := <-errors: fmt.Println("finished with error:", err.Error()) case <-time.After(time.Second * 1): fmt.Println("Timeout: errors finished") return } } }() defer close(errors) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func process(data model.SimpleData, errors chan<- error) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) if data.ID % 29 == 0 { errors <- fmt.Errorf("error on job %v", data.ID) } else { fmt.Printf("Finish processing %d\n", data.ID) } } //// main.go // Process worker.PooledWorkError(allData)
我们修改了 process() 函数,处理一些随机的错误并将错误 push 到 errors chnanel 里。所以,为了处理并发出现的错误,我们可以使用 errors channel 保存错误数据。在所有任务处理完成之后,可以检查错误 channel 是否有数据。错误 channel 里的元素保存了任务 ID,方便需要的时候再处理这些任务。
比之前没处理错误,很明显这是一个更好的解决方案。但我们还可以做得更好,
我们将在下篇文章讨论如何编写一个强大的 worker pool 包,并且在 worker 数量受限的情况下处理并发任务。
Go 语言的并发模型足够强大给力,只需要构建一个 worker pool 就能很好地解决问题而无需做太多工作,这就是它没有包含在标准库中的原因。但是,我们自己可以构建一个满足自身需求的方案。很快,我会在下一篇文章中讲到,敬请期待!
以上がGo 言語の同時実行性と WorkerPool - パート 1の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。