最近、プロジェクトの同時実行機能を改善してきましたが、開発は凸凹していました。たくさんの情報を読んで、ようやく理解が深まりました。そこで、一緒にソースコードを確認して、同時プログラミングの原理をまとめてみようと思いました。
最もよく使用されるスレッド プールから開始し、作成、実行、シャットダウンを中心としたスレッド プールのライフ サイクル全体の実装原則を理解する準備をしてください。後で、アトミック変数、同時コンテナ、ブロックキュー、同期ツール、ロックなどのトピックについて学習します。 java.util.concurrent の同時実行ツールは使用するのが難しくありませんが、ただ使用するだけではだめです。ソース コードを読まなければなりません (笑)。ちなみに使用しているJDKは1.8です。
Executorは、インターフェイス内に実行可能なタスクを実行するメソッドexecuteが1つだけあります。 ExecutorService インターフェイスは Executor を拡張し、スレッドのライフサイクル管理を追加し、タスクの終了やタスクの結果の返しなどのメソッドを提供します。 AbstractExecutorService は ExecutorService を実装し、submit メソッドなどのデフォルトの実装ロジックを提供します。
それでは、今日のトピックである ThreadPoolExecutor は、AbstractExecutorService を継承し、スレッド プールの特定の実装を提供します。
以下は、最大 7 つのパラメーターを持つ ThreadPoolExecutor の最も一般的なコンストラクターです。特定のコードは投稿しません。パラメーターの確認と設定に関するいくつかのステートメントのみを掲載します。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize は、スレッド プールのターゲット サイズです。これは、スレッド プールが作成されたばかりで、実行されるタスクがないときのサイズです。 MaximumPoolSize は、スレッド プールの最大上限です。 keepAliveTime はスレッドの生存時間です。スレッド プール内のスレッドの数が corePoolSize よりも大きい場合、生存時間を超えたアイドル状態のスレッドがリサイクルされます。ユニットは言うまでもなく、残りの 3 つのパラメータは後で分析されます。
ThreadPoolExecutor は、Executor のファクトリ メソッドによって作成されたいくつかのカスタマイズされたスレッド プールをプリセットします。 newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool の作成パラメーターを分析してみましょう。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool の corePoolSize と minimumPoolSize は両方とも受信固定数値に設定され、keepAliveTim は 0 に設定されます。スレッド プールの作成後、スレッドの数は固定されるため、スレッドの安定性が必要な状況に適しています。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor は、スレッド数が 1 に固定されている newFixedThreadPool のバージョンで、プール内のタスクのシリアル化を保証します。 FinalizableDelegatedExecutorService が返されることに注意してください:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizableDelegatedExecutorService は DelegatedExecutorService を継承し、gc 中にスレッド プールを閉じる操作のみを追加します。 DelegatedExecutorService のソース コードを見てみましょう:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }
コードは非常に単純で、DelegatedExecut または Service ExecutorService は ExecutorService のメソッドのみを公開するようにラップされているため、スレッド プールのパラメーターを構成できなくなります。本来、スレッドプールが作成するパラメータは調整可能であり、ThreadPoolExecutor が set メソッドを提供します。 newSingleThreadExecutor を使用する目的は、シングルスレッドのシリアル スレッド プールを生成することです。スレッド プールのサイズも構成できたら退屈でしょう。
Executors は、通常のスレッド プールを構成不可能なスレッド プールにラップする unconfigurableExecutorService メソッドも提供します。将来の未知の世代によってスレッド プールが変更されることを望まない場合は、このメソッドを呼び出すことができます。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool は、キャッシュされたスレッド プールを生成します。スレッド数は 0 から Integer.MAX_VALUE までで、タイムアウトは 1 分です。スレッド プールを使用すると、アイドル状態のスレッドがあればそのスレッドが再利用され、アイドル状態のスレッドが 1 分以上続くと新しいスレッドが作成されます。リサイクルされた。
newScheduledThreadPool
newScheduledThreadPoolは、タスクを定期的に実行できるスレッドプールを作成します。これについてはこの記事では説明する予定はありません。後で別の記事で詳しく説明します。
newCachedThreadPool的线程上限几乎等同于无限,但系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度。因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue。
JDK为BlockingQueue提供了几种实现方式,常用的有:
ArrayBlockingQueue:数组结构的阻塞队列
LinkedBlockingQueue:链表结构的阻塞队列
PriorityBlockingQueue:有优先级的阻塞队列
SynchronousQueue:不会存储元素的阻塞队列
newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue。要注意的是,如果任务一直提交,但线程池又不能及时处理,等待队列将会无限制地加长,系统资源总会有消耗殆尽的一刻。所以,推荐使用有界的等待队列,避免资源耗尽。但解决一个问题,又会带来新问题:队列填满之后,再来新任务,这个时候怎么办?后文会介绍如何处理队列饱和。
newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。
不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。
欢迎留言和转发,下一篇打算分析线程池如何执行任务。
以上就是Java 线程池的创建过程分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!