この記事は主にJavaのThreadPoolExecutorの原理分析に関する関連情報を紹介します。必要な方は
JavaのThreadPoolExecutorの原理分析を参照してください
スレッドプール概要
Javaのスレッドプールはよく使われます。開発ツールでは、処理する非同期タスクや並列タスクがある場合、スレッド プールをよく使用します。また、サーバーを実装する場合、接続処理 リクエストを受信するためにスレッド プールを使用する必要もあります。
スレッドプールは、java.util.concurrent.ThreadPoolExecutor
にある
JDKで提供されるスレッドプール実装を使用します。使用する場合は、通常、ExecutorServiceインターフェイスが使用され、submit、invokeAll、shutdownなどの一般的なメソッドが提供されます。
スレッド プールの構成に関して、Executors クラスは、newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor など、いくつかの一般的なシナリオにスレッド プールを提供できるいくつかの static メソッドを提供します。これらのメソッドは、最終的に ThreadPoolExecutor に呼び出されます。 コンストラクター。すべてのパラメータを含む
ThreadPoolExecutor のコンストラクタは
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | /**
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
|
ログイン後にコピー
です。
corePoolSize は、新しいタスクを追加するときに、スレッド プール内のスレッド数が corePoolSize よりも少ない場合、スレッド プールが存在するかどうかに関係なく、スレッド プール内のコア スレッドの数を設定します。現在アイドル状態のスレッドの場合、タスクを実行するための新しいスレッドが作成されます。
maximunPoolSize は、スレッド プールで許可される最大スレッド数です
workQueue は、キューに入れられたタスクを保存するために使用されます
keepAliveTime は、corePoolSize を超えるスレッドのアイドル タイムアウト時間です
handler は、次の目的で使用されますタスクの実行、スレッド プールが閉じられているときのタスク処理、スレッド プールのスレッド増加戦略は、現在のスレッド数が corePoolSize 未満の場合、addthreads、スレッド数 = corePoolSize および corePoolSize の場合、それです。 workQueue が新しいタスクを保存できない場合にのみ作成されます。 新しいスレッド、余分なスレッドは、アイドル状態の keepAliveTime の後に破棄されます。
実装 (JDK1.8に基づく)
ThreadPoolExecutorに保存されるステータスには、RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATEDを含む
現在のスレッドプールステータスが含まれます。
現在実行中のスレッドの有効な数。
これら 2 つのステータスを int 変数に入れます。最初の 3 桁はスレッド プールのステータス、最後の 29 桁はスレッドの数です。
たとえば、0b11100000000000000000000000000001 は、RUNNING、つまりスレッドを表します。
HashSetを介してワーカーセットを保存します。HashSetにアクセスする前に、まず保護ステータスmainLock:ReentrantLockを取得する必要があります
submit、execute
は、最初に現在のワーカー数を確認して実行されます。 corePoolSize よりも大きい場合は、コア ワーカーを追加してみてください。スレッド プールは、スレッド数の維持とステータスのチェックに関して多くのテストを実行します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return ;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
|
ログイン後にコピー
addWorkerメソッドの実装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
|
ログイン後にコピー
WorkerクラスはAbstractQueuedSynchronizerを継承して同期待ちの機能を取得します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
|
ログイン後にコピー
runWorker(Worker)は、ワークキューからタスクを継続的に取得して実行するWorkerのポーリング実行ロジックです。タスクの実行中にワーカーが中断されないように、各タスクを実行する前にワーカーをロックする必要があります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
|
ログイン後にコピー
ThreadPoolExecutor の submit メソッドでは、Callable が FutureTask にパッケージ化されてから、execute メソッドに渡されます。
FutureTask
FutureTaskはRunnableとFutureから継承しています。FutureTaskで定義されているいくつかの状態は
NEW、まだ実行されていません
COMPLETING、実行中
NORMAL、通常の実行が完了し結果が得られます
EXCEPTIONAL、実行例外をスローします。
CANCELLED、実行がキャンセルされました
INTERRUPTING、実行が中断されています
INTERRUPTED、中断されました。
key getメソッド
1 2 3 4 5 6 | public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
|
ログイン後にコピー
は、まず現在のステータスを取得し、実行が完了しておらず正常であれば、結果待ち処理に入ります。 awaitDone を継続的にループして現在のステータスを取得します。結果がない場合は、CAS を通じて自分自身を待機リストの先頭に追加します。タイムアウトが設定されている場合は、指定された時間の間、LockSupport.parkNanos が使用されます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | <a href= "https://www.php.cn/code/6276.html" target= "_blank" > static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}</a>
|
FutureTask の run メソッドは、タスクを実行し、結果の位置を設定します。まず、現在の状態が NEW であるかどうかを判断し、現在のスレッドを実行スレッドとして設定します。次に、Callable の呼び出しを呼び出して結果を取得し、設定します。その結果を使用して FutureTask の状態を変更します。 りー以上がJavaのThreadPoolExecutor原理の詳細な分析(コード付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。