ホームページ Java &#&チュートリアル JavaのThreadPoolExecutor原理の詳細な分析(コード付き)

JavaのThreadPoolExecutor原理の詳細な分析(コード付き)

Mar 29, 2017 am 10:31 AM

この記事は主にJavaのThreadPoolExecutorの原理分析に関する関連情報を紹介します。必要な方は

JavaのThreadPoolExecutorの原理分析を参照してください

スレッドプール概要

Javaのスレッドプールはよく使われます。開発ツールでは、処理する非同期タスクや並列タスクがある場合、スレッド プールをよく使用します。また、サーバーを実装する場合、接続処理 リクエストを受信するためにスレッド プールを使用する必要もあります。

スレッドプールは、java.util.concurrent.ThreadPoolExecutor
にある

JDKで提供されるスレッドプール実装を使用します。使用する場合は、通常、ExecutorServiceインターフェイスが使用され、submit、invokeAll、shutdownなどの一般的なメソッドが提供されます。

スレッド プールの構成に関して、Executors クラスは、newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor など、いくつかの一般的なシナリオにスレッド プールを提供できるいくつかの static メソッドを提供します。これらのメソッドは、最終的に ThreadPoolExecutor に呼び出されます。 コンストラクター。すべてのパラメータを含む

ThreadPoolExecutor のコンストラクタは

/**
   * @param corePoolSize the number of threads to keep in the pool, even
   *    if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *    pool
   * @param keepAliveTime when the number of threads is greater than
   *    the core, this is the maximum time that excess idle threads
   *    will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *    executed. This queue will hold only the {@code Runnable}
   *    tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *    creates a new thread
   * @param handler the handler to use when execution is blocked
   *    because the thread bounds and queue capacities are reached
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
  }
ログイン後にコピー
です。
  • corePoolSize は、新しいタスクを追加するときに、スレッド プール内のスレッド数が corePoolSize よりも少ない場合、スレッド プールが存在するかどうかに関係なく、スレッド プール内のコア スレッドの数を設定します。現在アイドル状態のスレッドの場合、タスクを実行するための新しいスレッドが作成されます。

  • maximunPoolSize は、スレッド プールで許可される最大スレッド数です

  • workQueue は、キューに入れられたタスクを保存するために使用されます

  • keepAliveTime は、corePoolSize を超えるスレッドのアイドル タイムアウト時間です

  • handler は、次の目的で使用されますタスクの実行、スレッド プールが閉じられているときのタスク処理、スレッド プールのスレッド増加戦略は、現在のスレッド数が corePoolSize 未満の場合、addthreads、スレッド数 = corePoolSize および corePoolSize の場合、それです。 workQueue が新しいタスクを保存できない場合にのみ作成されます。 新しいスレッド、余分なスレッドは、アイドル状態の keepAliveTime の後に破棄されます。

実装 (JDK1.8に基づく)

ThreadPoolExecutorに保存されるステータスには、RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATEDを含む

現在のスレッドプールステータスが含まれます。

現在実行中のスレッドの有効な数。

これら 2 つのステータスを int 変数に入れます。最初の 3 桁はスレッド プールのステータス、最後の 29 桁はスレッドの数です。

たとえば、0b11100000000000000000000000000001 は、RUNNING、つまりスレッドを表します。

HashSetを介してワーカーセットを保存します。HashSetにアクセスする前に、まず保護ステータスmainLock:ReentrantLockを取得する必要があります

submit、execute

は、最初に現在のワーカー数を確認して実行されます。 corePoolSize よりも大きい場合は、コア ワーカーを追加してみてください。スレッド プールは、スレッド数の維持とステータスのチェックに関して多くのテストを実行します。

public void execute(Runnable command) {
    int c = ctl.get();
    // 如果当期数量小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
      // 尝试增加worker
      if (addWorker(command, true))
        return;
      c = ctl.get();
    }
    // 如果线程池正在运行并且成功添加到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
      // 再次检查状态,如果已经关闭则执行拒绝处理
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
        reject(command);
      // 如果工作线程都down了
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
    }
    else if (!addWorker(command, false))
      reject(command);
  }
ログイン後にコピー

addWorkerメソッドの実装

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
      int c = ctl.get();
      int rs = runStateOf(c);
      // Check if queue empty only if necessary.
      if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
          firstTask == null &&
          ! workQueue.isEmpty()))
        return false;
      for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
          return false;
        if (compareAndIncrementWorkerCount(c))
          break retry;
        c = ctl.get(); // Re-read ctl
        if (runStateOf(c) != rs)
          continue retry;
        // else CAS failed due to workerCount change; retry inner loop
      }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
      w = new Worker(firstTask);
      final Thread t = w.thread;
      if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          // Recheck while holding lock.
          // Back out on ThreadFactory failure or if
          // shut down before lock acquired.
          int rs = runStateOf(ctl.get());
          if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
              throw new IllegalThreadStateException();
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
              largestPoolSize = s;
            workerAdded = true;
          }
        } finally {
          mainLock.unlock();
        }
        if (workerAdded) {
          // 如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)
          t.start();
          workerStarted = true;
        }
      }
    } finally {
      if (! workerStarted)
        addWorkerFailed(w);
    }
    return workerStarted;
  }
ログイン後にコピー

WorkerクラスはAbstractQueuedSynchronizerを継承して同期待ちの機能を取得します。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
  {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker */
    public void run() {
      runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
      return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    protected boolean tryRelease(int unused) {
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }
    public void lock()    { acquire(1); }
    public boolean tryLock() { return tryAcquire(1); }
    public void unlock()   { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
      Thread t;
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        }
      }
    }
ログイン後にコピー

runWorker(Worker)は、ワークキューからタスクを継続的に取得して実行するWorkerのポーリング実行ロジックです。タスクの実行中にワーカーが中断されないように、各タスクを実行する前にワーカーをロックする必要があります。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      while (task != null || (task = getTask()) != null) {
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted. This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
           runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
          wt.interrupt();
        try {
          beforeExecute(wt, task);
          Throwable thrown = null;
          try {
            task.run();
          } catch (RuntimeException x) {
            thrown = x; throw x;
          } catch (Error x) {
            thrown = x; throw x;
          } catch (Throwable x) {
            thrown = x; throw new Error(x);
          } finally {
            afterExecute(task, thrown);
          }
        } finally {
          task = null;
          w.completedTasks++;
          w.unlock();
        }
      }
      completedAbruptly = false;
    } finally {
      processWorkerExit(w, completedAbruptly);
    }
  }
ログイン後にコピー

ThreadPoolExecutor の submit メソッドでは、Callable が FutureTask にパッケージ化されてから、execute メソッドに渡されます。

FutureTask

FutureTaskはRunnableとFutureから継承しています。FutureTaskで定義されているいくつかの状態は
NEW、まだ実行されていません
COMPLETING、実行中
NORMAL、通常の実行が完了し結果が得られます
EXCEPTIONAL、実行例外をスローします。
CANCELLED、実行がキャンセルされました
INTERRUPTING、実行が中断されています
INTERRUPTED、中断されました。

key getメソッド

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
ログイン後にコピー

は、まず現在のステータスを取得し、実行が完了しておらず正常であれば、結果待ち処理に入ります。 awaitDone を継続的にループして現在のステータスを取得します。結果がない場合は、CAS を通じて自分自身を待機リストの先頭に追加します。タイムアウトが設定されている場合は、指定された時間の間、LockSupport.parkNanos が使用されます。

FutureTask の run メソッドは、タスクを実行し、結果の位置を設定します。まず、現在の状態が NEW であるかどうかを判断し、現在のスレッドを実行スレッドとして設定します。次に、Callable の呼び出しを呼び出して結果を取得し、設定します。その結果を使用して FutureTask の状態を変更します。 りー

以上がJavaのThreadPoolExecutor原理の詳細な分析(コード付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

Javaの完全数 Javaの完全数 Aug 30, 2024 pm 04:28 PM

Java における完全数のガイド。ここでは、定義、Java で完全数を確認する方法、コード実装の例について説明します。

ジャワのウェカ ジャワのウェカ Aug 30, 2024 pm 04:28 PM

Java の Weka へのガイド。ここでは、weka java の概要、使い方、プラットフォームの種類、利点について例を交えて説明します。

Javaのスミス番号 Javaのスミス番号 Aug 30, 2024 pm 04:28 PM

Java のスミス番号のガイド。ここでは定義、Java でスミス番号を確認する方法について説明します。コード実装の例。

Java Springのインタビューの質問 Java Springのインタビューの質問 Aug 30, 2024 pm 04:29 PM

この記事では、Java Spring の面接で最もよく聞かれる質問とその詳細な回答をまとめました。面接を突破できるように。

Java 8 Stream Foreachから休憩または戻ってきますか? Java 8 Stream Foreachから休憩または戻ってきますか? Feb 07, 2025 pm 12:09 PM

Java 8は、Stream APIを導入し、データ収集を処理する強力で表現力のある方法を提供します。ただし、ストリームを使用する際の一般的な質問は次のとおりです。 従来のループにより、早期の中断やリターンが可能になりますが、StreamのForeachメソッドはこの方法を直接サポートしていません。この記事では、理由を説明し、ストリーム処理システムに早期終了を実装するための代替方法を調査します。 さらに読み取り:JavaストリームAPIの改善 ストリームを理解してください Foreachメソッドは、ストリーム内の各要素で1つの操作を実行する端末操作です。その設計意図はです

Java での日付までのタイムスタンプ Java での日付までのタイムスタンプ Aug 30, 2024 pm 04:28 PM

Java での日付までのタイムスタンプに関するガイド。ここでは、Java でタイムスタンプを日付に変換する方法とその概要について、例とともに説明します。

カプセルの量を見つけるためのJavaプログラム カプセルの量を見つけるためのJavaプログラム Feb 07, 2025 am 11:37 AM

カプセルは3次元の幾何学的図形で、両端にシリンダーと半球で構成されています。カプセルの体積は、シリンダーの体積と両端に半球の体積を追加することで計算できます。このチュートリアルでは、さまざまな方法を使用して、Javaの特定のカプセルの体積を計算する方法について説明します。 カプセルボリュームフォーミュラ カプセルボリュームの式は次のとおりです。 カプセル体積=円筒形の体積2つの半球体積 で、 R:半球の半径。 H:シリンダーの高さ(半球を除く)。 例1 入力 RADIUS = 5ユニット 高さ= 10単位 出力 ボリューム= 1570.8立方ユニット 説明する 式を使用してボリュームを計算します。 ボリューム=π×R2×H(4

Spring Tool Suiteで最初のSpring Bootアプリケーションを実行するにはどうすればよいですか? Spring Tool Suiteで最初のSpring Bootアプリケーションを実行するにはどうすればよいですか? Feb 07, 2025 pm 12:11 PM

Spring Bootは、Java開発に革命をもたらす堅牢でスケーラブルな、生産対応のJavaアプリケーションの作成を簡素化します。 スプリングエコシステムに固有の「構成に関する慣習」アプローチは、手動のセットアップを最小化します。

See all articles