スレッド プールを作成してスレッドを使用するために最も一般的に使用される Executor 実装は、主に次のとおりです。上記のクラス図で提供されるクラスを使用します。上のクラス図には、実行をスケジュールし、一連の実行戦略呼び出しに基づいて非同期タスクを制御するフレームワークである Executor フレームワークが含まれています。その目的は、タスクの送信とタスクの実行方法を分離するメカニズムを提供することです。これには 3 つのエグゼキュータ インターフェイスが含まれています:
Executor: 新しいタスクを実行するためのシンプルなインターフェイス
ExecutorService: Executor を拡張し、エグゼキュータの寿命を管理するメソッドを追加します。サイクルとタスクのライフ サイクル
ScheduleExcutorService: ExecutorService を拡張して、今後のタスクの定期的な実行をサポートします
リソース消費の削減 - 既存のスレッドを再利用し、オブジェクトの作成と破棄のコストを削減し、優れたパフォーマンスを実現します
応答を向上させますSpeed - 同時スレッドの最大数を効果的に制御し、システム リソースの使用率を向上させ、過剰なリソースの競合やブロックを回避できます。タスクが到着すると、スレッドの作成を待たずにすぐにタスクを実行できます。
スレッド管理性の向上-スケジュール実行、定期実行、単一実行を提供します。 -スレッド、同時実行制御およびその他の機能。
新しいスレッドが新しいオブジェクトを作成するたびに、パフォーマンスが低下します
スレッドの統合管理が行われていないと、新しいスレッドが無制限に作成され、互いに競合し、システム リソースを占有しすぎて、クラッシュや OOM (メモリ不足メモリ オーバーフロー) が発生する可能性があります。この問題の原因は、単なる問題ではありません。新しいスレッドが発生しますが、プログラムのバグや設計上の欠陥が原因である可能性があり、常に新しいスレッドが発生します。
追加の実行、定期的な実行、スレッドの中断などのその他の機能がありません。
パラメータの説明: ThreadPoolExecutor には合計 7 つのパラメータがあり、これら 7 つのパラメータが連携してスレッド プールの強力な機能を形成します。
corePoolSize: コア スレッドの数
maximumPoolSize: 最大スレッド数
workQueue:実行を待っているタスクを保存するブロッキング キューは非常に重要であり、スレッド プールの実行プロセスに大きな影響を与えます。
新しいタスクをスレッド プールに送信すると、スレッド プールはプール内で現在実行されているスレッドの数に基づいて、タスクがどのように処理されるかを決定します。処理方法は 3 つあります:
1. 直接切り替え (SynchronusQueue)
2. アンバウンド キュー (LinkedBlockingQueue) で作成できるスレッドの最大数は corePoolSize です。 MaximumPoolSize は機能しません。スレッド プール内のすべてのコア スレッドが実行されている場合、新しいタスクの送信は待機キューに入れられます。
3. 制限付きキュー (ArrayBlockingQueue) の最大 MaximumPoolSize を使用するとリソースの消費を削減できますが、この方法ではスレッド プールのスケジューリングがより困難になります。スレッド プールとキューの容量が限られているためです。したがって、スレッド プールと処理タスクのスループット レートを適切な範囲に達させたい場合、またスレッドのスケジューリングを比較的シンプルにしてリソース消費を可能な限り削減したい場合は、これら 2 つの量割り当て手法を合理的に制限する必要があります。 : [ CPU 使用率、オペレーティング システムのリソース消費、コンテキスト切り替えオーバーヘッドなどのリソース消費を削減したい場合は、より大きなキュー容量とより小さなスレッド プール容量を設定できます。これにより、スレッド プールのスループットが低下します。 。送信したタスクが頻繁にブロックされる場合は、maximumPoolSize を調整できます。キューの容量が小さい場合は、CPU 使用率が相対的に高くなるように、スレッド プールのサイズを大きく設定する必要があります。ただし、スレッドプールの容量を大きくしすぎてタスク数を増やしすぎると同時実行量が増加するため、スレッド間のスケジューリングを考慮する必要があります。これにより、タスク処理のスループットが低下する可能性があります。 ]
keepAliveTime: タスクの実行がない場合 (スレッド内のスレッド数が corePoolSize より大きい場合、コアプール サイズがある場合) にスレッドが終了するまで保持される最大時間。現時点では、コア スレッド外のスレッドに新しいタスクが送信されていません。すぐには破棄されませんが、keepAliveTime を超えるまで待ちます)
unit: keepAliveTime の時間単位
threadFactory: スレッド ファクトリ、スレッドの作成に使用されます。スレッドを作成するためのデフォルト ファクトリがあるため、新しく作成されたスレッドは同じ優先順位を持ち、非デーモン スレッドであり、名前が設定されています)
rejectHandler: タスク (ブロッキング キューがいっぱい) の拒否処理戦略 (AbortPolicy デフォルト ポリシーは例外を直接スローし、CallerRunsPolicy は呼び出し元のスレッドを使用してタスクを実行し、DiscardOldestPolicy はタスクを破棄します)キューの先頭のタスクを実行して現在のタスクを実行すると、DiscardPolicy は現在のタスクを直接破棄します)
corePoolSize、maximumPoolSize、workQueue の関係: 実行中のスレッドの数が corePoolSize より少ない場合、タスクを処理するために新しいスレッドが直接作成されます。スレッド プール内の他のスレッドがアイドル状態であっても。実行中のスレッドの数が corePoolSize より大きく、maximumPoolSize より小さい場合、workQueue がいっぱいの場合にのみタスクを処理するための新しいスレッドが作成されます。 corePoolSizeとmaximumPoolSizeが同じ場合、作成されるスレッドプールのサイズは固定されます。このとき、新しいタスクがサブミットされ、workQueue が満杯でない場合、リクエストは workQueue に配置されます。空のスレッドが workQueue からタスクを削除するまで待ちます。この時点で workQueue もいっぱいである場合は、追加の拒否ポリシー パラメーターを使用して拒否ポリシーを実行します。
初期化メソッド: 7 つのパラメータから 4 つの初期化メソッドで構成されます
その他のメソッド:
execute(); //提交任务,交给线程池执行 submit();//提交任务,能够返回执行结果 execute+Future shutdown();//关闭线程池,等待任务都执行完 shutdownNow();//关闭线程池,不等待任务执行完 getTaskCount();//线程池已执行和未执行的任务总数 getCompleteTaskCount();//已完成的任务数量 getPoolSize();//线程池当前的线程数量 getActiveCount();//当前线程池中正在执行任务的线程数量
スレッド プールのライフ サイクル:
running: 新しく送信されたタスクを受け入れることができ、ブロッキング キュー内のタスクも処理できます
shutdown: 処理できません新しいタスクですが、ブロックされたキュー内のタスクの処理を続行できます
//创建newCachedThreadPool线程池源码 public static ExecutorService newCachedThreadPool() { /** *corePoolSize: 0,核心线程池的数量为0 *maximumPoolSize: Integer.MAX_VALUE,可以认为最大线程数是无限的 *keepAliveTime: 60L *unit: 秒 *workQueue: SynchronousQueue **/ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; executor.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } }
//newSingleThreadExecutor创建线程池源码 public static ExecutorService newSingleThreadExecutor() { /** * corePoolSize : 1,核心线程池的数量为1 * maximumPoolSize : 1,只可以创建一个非核心线程 * keepAliveTime : 0L * unit => 秒 * workQueue => LinkedBlockingQueue **/ return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
//newFixedThreadPool创建线程池源码 public static ExecutorService newFixedThreadPool(int nThreads) { /** * corePoolSize : 核心线程的数量为自定义输入nThreads * maximumPoolSize : 最大线程的数量为自定义输入nThreads * keepAliveTime : 0L * unit : 秒 * workQueue : LinkedBlockingQueue **/ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
//newScheduledThreadPool创建线程池源码 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { /** * corePoolSize : 核心线程的数量为自定义输入corePoolSize * maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE * keepAliveTime : 0L * unit : 纳秒 * workQueue : DelayedWorkQueue **/ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); // executorService.schedule(new Runnable() { // @Override // public void run() { // log.warn("schedule run"); // } // //延迟3秒后执行 // }, 3, TimeUnit.SECONDS); // executorService.shutdown(); // executorService.scheduleWithFixedDelay(new Runnable() { // @Override // public void run() { // log.warn("scheduleWithFixedDelay run"); // } // //延迟一秒后每隔3秒执行 // }, 1, 3, TimeUnit.SECONDS); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { log.warn("schedule run"); } //延迟一秒后每隔3秒执行 }, 1, 3, TimeUnit.SECONDS); /** * 定时器调度,不推荐使用,推荐ScheduledExecutorService调度 */ // Timer timer = new Timer(); // timer.schedule(new TimerTask() { // @Override // public void run() { // log.warn("timer run"); // } // //从当前时间每隔5秒执行 // }, new Date(), 5 * 1000); }
CachedThreadPool および newScheduledThreadPool によって作成できるスレッドの数は Integer.MAX_VALUE です。これにより、大量のスレッドが作成され、OOM 例外が発生する可能性があります
これが、その使用が禁止されている理由です。Executor がスレッド プールを作成するが、ThreadPoolExecutor を自分で作成することを推奨する理由です。
CPU 負荷が高い:スレッド プールのサイズは、CPU 数 1 にすることをお勧めします。CPU 数は、Runtime.availableProcessors メソッドに従って取得できます。 IO-集約型: CPU 数 * CPU 使用率 * (待機中の 1 スレッド)時間/スレッド CPU 時間) 混合タイプ: タスクを CPU 集中型と IO 集中型に分割し、処理に異なるスレッド プールを使用するため、各スレッド プールは独自のワークロードに応じて調整できます。 ブロッキング キュー: 境界付きキューを使用することをお勧めします。キューはリソースの枯渇を避けるのに役立ちます。 拒否ポリシー: デフォルトのポリシーは、AbortPolicy 拒否ポリシーであり、RejectedExecutionException を直接スローします。プログラム [これは実行時例外であり、キャッチを強制しないため] この処理は十分にエレガントではありません。拒否を処理するには、次の戦略をお勧めします。
プログラム内で RejectedExecutionException 例外をキャッチし、キャッチされた例外内のタスクを処理します。デフォルトの拒否ポリシーには、CallerRunsPolicy 拒否ポリシーを使用します。
このポリシーは、実行のために実行を呼び出すスレッド (通常はメイン スレッド) にタスクを渡します。メインスレッドは一定期間実行できなくなります。タスクを送信すると、ワーカースレッドが実行中のタスクを処理します。この時点で送信されたスレッドは、TCP キューに保存されます。TCP キューがいっぱいの場合、クライアントに影響します。これは、軽いパフォーマンスの低下です。
カスタム拒否ポリシーのみRejectedExecutionHandler インターフェイスを使用できます
以上がJavaスレッドプールExecutorの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。