以下のエディターは、Java同時実行プログラミング_スレッドプールの使い方(詳細な説明)に関する記事をお届けします。編集者はこれがとても良いと思ったので、参考として共有します。エディターをフォローして見てみましょう
1. タスクと実行戦略の間の暗黙的な結合
エグゼキューターはタスクの送信戦略とタスク実行戦略を分離できます
タスクが同じタイプである場合のみ、および存在する場合のみです。実行時間にほとんど差がない場合、最大のパフォーマンスを達成できます。 そうしないと、長時間消費するタスクと短時間消費するタスクが同じスレッド プールに配置されると、スレッド プールがよほど大きくない限り、デッドロックなどの問題が発生します
。 1 .スレッド スターベーション デッドロック
は、2 つのタスクを単一スレッド プールに送信し、2 つのタスクが相互に依存し、一方のタスクがもう一方のタスクを待機すると、デッドロックが発生します。プールが十分ではないこと
定義: タスクはプール内の他のタスクの実行結果を待つ必要があり、飢餓デッドロックが発生する可能性があります
2. スレッドプールのサイズ
注: サイズスレッド プールのサイズには、他のリソース プールなどの制限も適用されます: データベース接続プール
各タスクが接続の場合、スレッド プールのサイズはデータベース接続プールのサイズによって制限されます
。 3. ThreadPoolExecutor スレッド プールを構成します
インスタンス:
1. Executor のファクトリ メソッドを通じていくつかのデフォルト実装を返します
2. ThreadPoolExecutor(....) をインスタンス化して実装をカスタマイズします。タスクが到着し、スレッド プールがいっぱいになると、タスクはキュー内で待機します。タスクが無限に達すると、キューは無限に拡張されます
例: これは、シングルトンと固定サイズのスレッド プールに使用されるものです2境界付きキュー:
新しいタスクが到着し、キューがいっぱいの場合は、 飽和戦略を使用します
3. 同期ハンドオーバー:
スレッド プールが大きい場合、配置後のハンドオーバーに遅延が発生します。タスクプロデューサーがすぐにタスクをキューに入れる場合、SynchronousQueueはタスクをワーカースレッドに直接渡しますメカニズム: タスクを入れると、受け入れを待機しているスレッドが存在する必要があります。そうでない場合は、
新しいスレッド、スレッドが飽和している場合はタスクを拒否します例: CacheThreadPoolが使用される戦略です
setRejectedExecutionHan
dl1. 中止中止 (デフォルト):例外をスローします呼び出し元によって処理されます
2. DiscardOldest: 最も古いタスクを破棄します。注意: 優先度キューは優先度が最も高いタスクを破棄します
4.CallerRuns:ロールバックタスク、呼び出し元のスレッドがそれ自体を処理します
4.スレッドファクトリーThreadFactoy
いつでもスレッドが作成されます: 実際には、と呼ばれます スレッド ファクトリは、完了するために使用されます
カスタム スレッド ファクトリ: ThreadFactory を実装します
スレッド ファクトリの動作 をカスタマイズできます: UncaughtException などハンドラーなど
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
5. ThreadPoolExecutorの拡張
カスタムサブクラスでオーバーライドできるメソッド:
1.afterExecute: 終了後、RuntimeExceptionがスローされた場合、メソッドは実行されません 2.beforeExecute: 開始前に、RuntimeExceptionがスローされた場合、タスクは実行されません3.terminated: スレッドプールが閉じられると、リソースの解放などに使用できます
2. 再帰アルゴリズムの並列化
1.循环
在循环中,每次循环操作都是独立的
//串行化 void processSequentially(List<Element> elements) { for (Element e : elements) process(e); } //并行化 void processInParallel(Executor exec, List<Element> elements) { for (final Element e : elements) exec.execute(new Runnable() { public void run() { process(e); } }); }
2.迭代
如果每个迭代操作是彼此独立的,则可以串行执行
如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的
//串行 计算compute 和串行迭代 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) { for (Node<T> n : nodes) { results.add(n.compute()); sequentialRecursive(n.getChildren(), results); } } //并行 计算compute 和串行迭代 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) { for (final Node<T> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelRecursive(exec, n.getChildren(), results); } } //调用并行方法的操作 public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); parallelRecursive(exec, nodes, resultQueue); exec.shutdown(); exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue; }
实例:
public class ConcurrentPuzzleSolver <P, M> { private final Puzzle<P, M> puzzle; private final ExecutorService exec; private final ConcurrentMap<P, Boolean> seen; protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>(); public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap<P, Boolean>(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List<M> solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode<P, M> implements Runnable { SolverTask(P pos, M move, PuzzleNode<P, M> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }
以上がJava並行プログラミングにおけるスレッドプールの使い方を詳しく解説の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。