線程,一種有助於開發現代高效能解決方案並成為不可或缺的工具。無論使用哪種語言,並行執行任務的能力都具有很大的吸引力。但顯然有本叔叔的名言:「能力越大,責任越大。」如何以最佳方式使用該解決方案,以實現性能、更好地利用資源和應用程式健康?首先,有必要了解本主題的基本概念。
執行緒是作業系統中進程執行的基本單位。它們允許程式在同一進程中同時執行多個操作。每個執行緒與主進程共享相同的記憶體空間,但可以獨立執行,這對於可以並行執行的任務非常有用,例如輸入/輸出(I/O)操作、複雜計算或資料使用者介面。 .
在許多系統上,執行緒由作業系統管理,作業系統為每個執行緒分配 CPU 時間並管理它們之間的上下文切換。在 Java、Python 和 C 等程式語言中,都有一些函式庫和框架可以更輕鬆地建立和管理執行緒。
執行緒主要用於提高程式的效率和回應能力。使用線程,尤其是後端的原因是:
平行性:執行緒讓您同時執行多個操作,從而更好地利用可用的 CPU 資源,尤其是在多核心系統上。
效能:在I/O 操作中,例如讀寫檔案或網路通信,執行緒可以透過允許程式在等待這些任務完成的同時繼續執行其他任務來幫助提高效能操作。
模組化:執行緒可用於將程式分割成更小、更易於管理的部分,每個部分執行特定的任務。
但是,仔細管理執行緒非常重要,因為不正確的使用可能會導致競爭條件、死鎖和除錯困難等問題。為了更好地管理它們,使用了執行緒池解決方案。
執行緒池是一種軟體設計模式,涉及建立和管理可重複使用來執行任務的執行緒池。線程池不會為每個任務重複建立和銷毀線程,而是維護固定數量的線程,準備根據需要執行任務。這可以顯著提高需要處理許多並發任務的應用程式的效能。使用執行緒池的優點是:
提高效能:建立和銷毀執行緒在資源方面是一項昂貴的操作。執行緒池透過重複使用現有執行緒來最小化此成本。
資源管理:控制運行的執行緒數量,避免過多的執行緒建立導致系統過載。
易於使用:簡化執行緒管理,使開發人員能夠專注於應用程式邏輯而不是執行緒管理。
可擴展性:幫助擴展應用程式以有效地處理大量並發任務。
好吧,我當然必須創建一個線程池才能更好地利用此功能,但很快出現的一個問題是:「池應該包含多少個線程?」。按照基本邏輯,越多越好,對嗎?如果所有事情都可以並行完成,那麼很快就會完成,因為速度會更快。因此,最好不要限制線程數,或設定一個較高的數字,這樣就不會出現問題。正確嗎?
這是一個公平的說法,所以讓我們測試一下。此測試的程式碼是用 Kotlin 編寫的,只是為了熟悉並且易於編寫範例。這一點與語言無關。
透過 4 個範例來探索不同的系統性質。範例 1 和範例 2 是為了使用 CPU 進行大量數學運算,即進行大量處理。範例3重點關注I/O,範例是讀取文件,最後,範例4是並行API呼叫的情況,同樣關注I/O。它們都使用不同大小的池,分別具有 1、2、4、8、16、32、50、100 和 500 個執行緒。所有過程發生超過 500 次。
import kotlinx.coroutines.* import kotlin.math.sqrt import kotlin.system.measureTimeMillis fun isPrime(number: Int): Boolean { if (number <= 1) return false for (i in 2..sqrt(number.toDouble()).toInt()) { if (number % i == 0) return false } return true } fun countPrimesInRange(start: Int, end: Int): Int { var count = 0 for (i in start..end) { if (isPrime(i)) { count++ } } return count } @OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val rangeStart = 1 val rangeEnd = 100_000 val numberOfThreadsList = listOf(1, 2, 4, 8, 16, 32, 50, 100, 500) for (numberOfThreads in numberOfThreadsList) { val customDispatcher = newFixedThreadPoolContext(numberOfThreads, "customPool") val chunkSize = (rangeEnd - rangeStart + 1) / numberOfThreads val timeTaken = measureTimeMillis { val jobs = mutableListOf<Deferred<Int>>() for (i in 0 until numberOfThreads) { val start = rangeStart + i * chunkSize val end = if (i == numberOfThreads - 1) rangeEnd else start + chunkSize - 1 jobs.add(async(customDispatcher) { countPrimesInRange(start, end) }) } val totalPrimes = jobs.awaitAll().sum() println("Total de números primos encontrados com $numberOfThreads threads: $totalPrimes") } println("Tempo levado com $numberOfThreads threads: $timeTaken ms") customDispatcher.close() } }
Total de números primos encontrados com 1 threads: 9592 Tempo levado com 1 threads: 42 ms Total de números primos encontrados com 2 threads: 9592 Tempo levado com 2 threads: 17 ms Total de números primos encontrados com 4 threads: 9592 Tempo levado com 4 threads: 8 ms Total de números primos encontrados com 8 threads: 9592 Tempo levado com 8 threads: 8 ms Total de números primos encontrados com 16 threads: 9592 Tempo levado com 16 threads: 16 ms Total de números primos encontrados com 32 threads: 9592 Tempo levado com 32 threads: 12 ms Total de números primos encontrados com 50 threads: 9592 Tempo levado com 50 threads: 19 ms Total de números primos encontrados com 100 threads: 9592 Tempo levado com 100 threads: 36 ms Total de números primos encontrados com 500 threads: 9592 Tempo levado com 500 threads: 148 ms
import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import kotlin.system.measureTimeMillis fun fibonacci(n: Int): Long { return if (n <= 1) n.toLong() else fibonacci(n - 1) + fibonacci(n - 2) } @OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val numberOfThreadsList = listOf(1, 2, 4, 8, 16, 32, 50, 100, 500) for (numberOfThreads in numberOfThreadsList) { val customDispatcher = newFixedThreadPoolContext(numberOfThreads, "customPool") val numbersToCalculate = mutableListOf<Int>() for (i in 1..1000) { numbersToCalculate.add(30) } val timeTaken = measureTimeMillis { val jobs = numbersToCalculate.map { number -> launch(customDispatcher) { fibonacci(number) } } jobs.forEach { it.join() } } println("Tempo levado com $numberOfThreads threads: $timeTaken ms") customDispatcher.close() } }
Tempo levado com 1 threads: 4884 ms Tempo levado com 2 threads: 2910 ms Tempo levado com 4 threads: 1660 ms Tempo levado com 8 threads: 1204 ms Tempo levado com 16 threads: 1279 ms Tempo levado com 32 threads: 1260 ms Tempo levado com 50 threads: 1364 ms Tempo levado com 100 threads: 1400 ms Tempo levado com 500 threads: 1475 ms
import kotlinx.coroutines.* import java.io.File import kotlin.system.measureTimeMillis @OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val file = File("numeros_aleatorios.txt") if (!file.exists()) { println("Arquivo não encontrado!") return@runBlocking } val numberOfThreadsList = listOf(1, 2, 4, 8, 16, 32, 50, 100, 500) for (numberOfThreads in numberOfThreadsList) { val customDispatcher = newFixedThreadPoolContext(numberOfThreads, "customPool") val timeTaken = measureTimeMillis { val jobs = mutableListOf<Deferred<Int>>() file.useLines { lines -> lines.forEach { line -> jobs.add(async(customDispatcher) { processLine(line) }) } } val totalSum = jobs.awaitAll().sum() println("Total da soma com $numberOfThreads threads: $totalSum") } println("Tempo levado com $numberOfThreads threads: $timeTaken ms") customDispatcher.close() } } fun processLine(line: String): Int { return line.toInt() + 10 }
Total da soma de 1201 linhas com 1 threads: 60192 Tempo levado com 1 threads: 97 ms Total da soma de 1201 linhas com 2 threads: 60192 Tempo levado com 2 threads: 28 ms Total da soma de 1201 linhas com 4 threads: 60192 Tempo levado com 4 threads: 30 ms Total da soma de 1201 linhas com 8 threads: 60192 Tempo levado com 8 threads: 26 ms Total da soma de 1201 linhas com 16 threads: 60192 Tempo levado com 16 threads: 33 ms Total da soma de 1201 linhas com 32 threads: 60192 Tempo levado com 32 threads: 35 ms Total da soma de 1201 linhas com 50 threads: 60192 Tempo levado com 50 threads: 44 ms Total da soma de 1201 linhas com 100 threads: 60192 Tempo levado com 100 threads: 66 ms Total da soma de 1201 linhas com 500 threads: 60192 Tempo levado com 500 threads: 297 ms
import io.ktor.client.* import io.ktor.client.engine.cio.* import io.ktor.client.request.* import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import kotlin.system.measureTimeMillis @OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val client = HttpClient(CIO) try { val numberOfThreadsList = listOf(1, 2, 4, 8, 16, 32, 50, 100, 500) for (numberOfThreads in numberOfThreadsList) { val customDispatcher = newFixedThreadPoolContext(numberOfThreads, "customPool") val timeTaken = measureTimeMillis { repeat(500) { val jobs = launch(customDispatcher) { client.get("http://127.0.0.1:5000/example") } jobs.join() } } println("Tempo levado com $numberOfThreads threads: $timeTaken ms") customDispatcher.close() } } catch (e: Exception) { println("Erro ao conectar à API: ${e.message}") } finally { client.close() } }
Tempo levado com 1 threads: 7104 ms Tempo levado com 2 threads: 4793 ms Tempo levado com 4 threads: 4170 ms Tempo levado com 8 threads: 4310 ms Tempo levado com 16 threads: 4028 ms Tempo levado com 32 threads: 4089 ms Tempo levado com 50 threads: 4066 ms Tempo levado com 100 threads: 3978 ms Tempo levado com 500 threads: 3777 ms
範例 1 到範例 3 有一個共同的行為,它們在達到 8 個執行緒時效能都變得更高,然後處理時間再次增加,但範例 4 卻沒有,這說明了什麼?總是使用盡可能多的線程不是很有趣嗎?
簡單快速的答案是不。
我的機器的處理器有 8 個核心,也就是說,它可以同時執行 8 個任務,超過這個時間會隨著管理每個執行緒狀態的時間而增加,最終導致效能下降。
好的,這回答了範例 1 到 3,但範例 4 呢?為什麼啟動的線程越多效能就會提高?
簡單,因為它是一個集成,機器沒有處理,它基本上等待響應,它保持“睡眠”直到響應到達,所以是的,這裡線程的數量可以更大。但要小心,並不意味著可以有盡可能多的線程,線程會導致資源耗盡,不加區別地使用它們會產生相反的效果,會影響服務的整體健康狀況。
因此,要定義池將擁有的執行緒數,最簡單、最安全的方法是分離將要執行的任務的性質。它們分為兩部分:
不需要處理的任務:
當任務類型不需要處理時,可以建立比機器上的處理器核心更多的執行緒。發生這種情況是因為不需要處理資訊來完成線程,基本上這種性質的線程在大多數情況下都期望來自整合的回應,例如寫入資料庫或來自 API 的回應。
需要處理的任務:
當解決方案有處理時,即機器實際在做工作時,最大執行緒數必須是機器處理器的核心數。這是因為處理器核心無法同時執行多於一件事。例如,如果執行解決方案的處理器有 4 個核心,那麼您的執行緒池必須是處理器核心的大小,也就是 4 個執行緒池。
在考慮執行緒池時,首先要定義的一點不一定是限制其大小的數量,而是所執行任務的性質。線程對服務的效能有很大幫助,但必須以最佳方式使用它們,以免產生相反的效果並降低效能,甚至更糟的是,導致整個服務的健康狀況受到影響。很明顯,較小的池最終更傾向於處理大量使用的任務,換句話說,即 CPU 受限的任務。如果您不確定將使用線程的解決方案是否具有大量使用處理的行為,請謹慎起見,將您的池限制為機器上的處理器數量,相信我,這會節省讓您很頭疼。
以上是執行緒:如何以效能為目標定義和限制執行?的詳細內容。更多資訊請關注PHP中文網其他相關文章!