同時実行により、複数のタスクを互いに独立して処理できます。ゴルーチンは、複数のタスクを個別に処理する簡単な方法です。この投稿では、ファイルを受け入れる http ハンドラーを段階的に強化し、チャネルと同期パッケージを利用して Go のさまざまな同時実行パターンを探索します。
同時実行パターンに入る前に、準備を整えましょう。フォーム経由で複数のファイルを受け取り、そのファイルを何らかの方法で処理する HTTP ハンドラーがあると想像してください。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
上記の例では、フォームからファイルを受信し、それらを順番に処理します。 10 個のファイルがアップロードされる場合、プロセスが完了してクライアントに応答が送信されるまでに 1 秒かかります。
多くのファイルを処理する場合、これがボトルネックになる可能性がありますが、Go の同時実行サポートを使用すると、この問題を簡単に解決できます。
これを解決するには、ファイルを同時に処理できます。新しいゴルーチンを生成するには、関数呼び出しの前に go キーワードを付けることができます。 processFile(f) に進みます。ただし、ゴルーチンは非ブロックであるため、プロセスが完了する前にハンドラーが戻り、ファイルが未処理のままになったり、間違った状態が返されたりする可能性があります。すべてのファイルの処理を待つには、sync.WaitGroup.
を利用できます。
WaitGroup は、多数の goroutine が終了するのを待ちます。生成する goroutine ごとに、さらに WaitGroup 内のカウンターを増やす必要があります。これは、Add 関数を使用して行うことができます。 goroutine が終了したら、カウンターが 1 つ減るように Done を呼び出す必要があります。関数から戻る前に、WaitGroup のカウンターが 0 になるまでブロックする Wait を呼び出す必要があります。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
アップロードされたファイルごとに新しいゴルーチンが生成されるため、システムに負荷がかかる可能性があります。解決策の 1 つは、生成されるゴルーチンの数を制限することです。
セマフォは、複数のスレッド (この場合はゴルーチン) による共通リソースへのアクセスを制御するために使用できる単なる変数です。
Go では、バッファーされたチャネルを利用してセマフォを実装できます。
実装に入る前に、チャネルとは何か、およびバッファ付きチャネルとバッファなしチャネルの違いを見てみましょう。
チャネルは、go ルーチン間で安全に通信するためにデータを送受信できるパイプです。
チャンネルは make 関数を使用して作成する必要があります。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
チャネルには特別な演算子 <- があり、チャネルの送信またはチャネルからの読み取りに使用されます。
オペレータがチャネル ch
このアニメーションは、プロデューサーがバッファーなしのチャネルを通じて値 1 を送信し、コンシューマーがチャネルから読み取る様子を視覚化しています。
プロデューサーがコンシューマーが処理できるよりも速くイベントを送信できる場合は、バッファーされたチャネルを利用して、バッファーがいっぱいになるまでプロデューサーをブロックせずに複数のメッセージをキューに入れるオプションがあります。同時に、消費者は自分のペースでメッセージを処理できます。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
この例では、プロデューサーはブロックせずに最大 2 つのアイテムを送信できます。バッファーの容量に達すると、コンシューマーが少なくとも 1 つのメッセージを処理するまで、プロデューサーはブロックされます。
最初の問題に戻り、ファイルを同時に処理するゴルーチンの量を制限したいと思います。これを行うには、バッファリングされたチャネルを利用できます。
ch := make(chan int)
この例では、容量 5 の バッファー チャネル を追加しました。これにより、5 つのファイルを同時に処理し、システムへの負担を制限できます。
しかし、すべてのファイルが同等ではない場合はどうなるでしょうか?ファイルの種類やファイル サイズが異なると、処理により多くのリソースが必要になることを確実に予測できる場合があります。この場合、重み付きセマフォを利用できます。
簡単に言えば、重み付きセマフォを使用すると、より多くのリソースを単一のタスクに割り当てることができます。 Go はすでに、extend sync パッケージ内で重み付きセマフォの実装を提供しています。
ch := make(chan int, 2)
このバージョンでは、5 つのスロットを持つ加重セマフォを作成しました。たとえば、画像のみがアップロードされる場合、プロセスは 5 つの画像を同時に処理しますが、PDF がアップロードされる場合は 2 つのスロットが取得されるため、処理できるファイルの量が減少します。同時に。
私たちは、sync.WaitGroup とセマフォを利用して同時タスクの数を制御する、Go のいくつかの同時実行パターンを調査しました。ただし、利用可能なツールは他にもあり、チャネルを利用してワーカー プールを作成したり、タイムアウトを追加したり、ファン イン/アウト パターンを使用したりできます。
さらに、エラー処理は重要な側面ですが、簡単にするためにほとんど省略されています。
エラーを処理する 1 つの方法は、チャネルを利用してエラーを集約し、すべてのゴルーチンが完了した後にエラーを処理することです。
Go は、sync.WaitGroups に関連する errgroup.Group も提供しますが、エラーを返すタスクの処理を追加します。
このパッケージは、extend sync パッケージにあります。
以上がゴルーチンとチャネル: Go の同時実行パターンの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。