目錄
執行緒池狀態
Worker的建立
Worker的執行
线程池的关闭
后言
首頁 Java java教程 Java 執行緒池執行原理分析

Java 執行緒池執行原理分析

Feb 23, 2017 am 10:42 AM
java 執行緒池

上一篇已經對線程池的創建進行了分析,了解線程池既有預設的模板,也提供多種參數支撐靈活的客製化。

本文將會圍繞著執行緒池的生命週期,分析執行緒池執行任務的過程。

執行緒池狀態

先認識兩個貫穿執行緒池程式碼的參數:

runState:執行緒池運行狀態

workerCount:工作執行緒的數量

執行緒池用一個32位元的int來同時儲存runState和workerCount,其中高3位元是runState,其餘29位元是workerCount。程式碼中會重複使用runStateOf和workerCountOf來取得runState和workerCount。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
登入後複製
// 线程池状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
登入後複製
// ctl操作
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
登入後複製

RUNNING:可接收新任務,可執行等待佇列裡的任務

SHUTDOWN:不可接收新任務,可執行等待佇列裡的任務

STOP:不可接收新任務,不可執行等待佇列裡的任務,並且嘗試終止所有在執行任務

TIDYING:所有任務已經終止,執行terminated()

TERMINATED: terminated()執行完成

執行緒池狀態預設從RUNNING開始流轉,到狀態TERMINATED結束,中間不需要經過每一種狀態,但不能讓狀態回退。以下是狀態變更可能的路徑與變更條件:

Java 執行緒池執行原理分析

圖1 執行緒池狀態變更路徑

Worker的建立

執行緒池是由Worker類別負責執行任務,Worker繼承了AbstractQueuedSynchronizer,引出了Java並發框架的核心AQS。

AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。
登入後複製


AQS不會在這裡展開討論,只需要知道Worker包裝了Thread,由它去執行任務。

呼叫execute將會根據執行緒池的情況建立Worker,可以歸納出下圖四種情況:

Java 執行緒池執行原理分析

圖2 worker在執行緒池裡的四種可能

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            //3
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //4
            addWorker(null, false);
    }
    //5
    else if (!addWorker(command, false))
        //6
        reject(command);
}
登入後複製

標記1對應第一種情況,要留意addWorker傳入了core,core=true為corePoolSize,core=false為maximumPoolSize,新增時需要檢查workerCount是否超過允許的最大值。

標記2對應第二種情況,檢查執行緒池是否正在執行,並且將任務加入等待佇列。標記3再檢查一次執行緒池狀態,如果執行緒池忽然處於非運作狀態,那就將等待佇列剛加的任務刪掉,再交給RejectedExecutionHandler處理。標記4發現沒有worker,就先補充一個空任務的worker。

標記5對應第三種情況,等待佇列不能再新增任務了,呼叫addWorker新增一個去處理。

標記6對應第四種情況,addWorker的core傳入false,回傳呼叫失敗,代表workerCount已經超出maximumPoolSize,那就交給RejectedExecutionHandler處理。

private boolean addWorker(Runnable firstTask, boolean core) {
        //1
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //2
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
登入後複製

標記1的第一段程式碼,目的很簡單,是為workerCount加一。至於為什麼程式碼寫了這麼長,是因為執行緒池的狀態不斷變化,並發環境下需要保證變數的同步性。外循環判斷執行緒池狀態、任務非空和佇列非空,內循環使用CAS機制保證workerCount正確地遞增。不了解CAS可以看認識非阻塞的同步機制CAS,後續增減workerCount都會使用CAS。

標記2的第二段程式碼,就比較簡單。建立一個新Worker對象,將Worker加入workers裡(Set集合)。成功加入後,啟動worker裡的執行緒。在finally裡判斷執行緒是否啟動成功,不成功直接呼叫addWorkerFailed。

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
登入後複製

addWorkerFailed將減少已經遞增的workerCount,並且呼叫tryTerminate結束執行緒池。

Worker的執行

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
public void run() {
    runWorker(this);
}
登入後複製

Worker在建構函式裡採用ThreadFactory建立Thread,在run方法裡呼叫了runWorker,看來是真正執行任務的地方。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
       //1
        while (task != null || (task = getTask()) != null) {
            w.lock();
           //2
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
               //3
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                 //4
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;       //5
    } finally {
        //6
        processWorkerExit(w, completedAbruptly);
    }
}
登入後複製

標記1進入循環,從getTask取得要執行的任務,直到返回null。這裡達到了線程復用的效果,讓線程處理多個任務。

標記2是比較複雜的判斷,保證了執行緒池在STOP狀態下執行緒是中斷的,非STOP狀態下執行緒沒有被中斷。如果你不了解Java的中斷機制,看看如何正確結束Java執行緒這篇。

標記3呼叫了run方法,真正執行了任務。執行前後提供了beforeExecute和afterExecute兩個方法,由子類別實作。

標記4裡的completedTasks統計worker執行了多少任務,最後累加進completedTaskCount變量,可以呼叫對應方法回傳一些統計資料。

標記5的變數completedAbruptly表示worker是否異常終止,執行到這裡代表執行正常,後續的方法需要這個變數。

標記6呼叫processWorkerExit結束,後面會分析。

接著來看worker從等待佇列取得任務的getTask方法:

#
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //1
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //2
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
       //3
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
登入後複製

标记1检查线程池的状态,这里就体现出SHUTDOWN和STOP的区别。如果线程池是SHUTDOWN状态,还会先处理完等待队列的任务;如果是STOP状态,就不再处理等待队列里的任务了。

标记2先看allowCoreThreadTimeOut这个变量,false时worker空闲,也不会结束;true时,如果worker空闲超过keepAliveTime,就会结束。接着是一个很复杂的判断,好难转成文字描述,自己看吧。注意一下wc>maximumPoolSize,出现这种可能是在运行中调用setMaximumPoolSize,还有wc>1,在等待队列非空时,至少保留一个worker。

标记3是从等待队列取任务的逻辑,根据timed分为等待keepAliveTime或者阻塞直到有任务。

最后来看结束worker需要执行的操作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   //1
    if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusted
        decrementWorkerCount();
  //2
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
   //3
    tryTerminate();
    int c = ctl.get();
    //4
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}
登入後複製

正常情况下,在getTask里就会将workerCount减一。标记1处用变量completedAbruptly判断worker是否异常退出,如果是,需要补充对workerCount的减一。

标记2将worker处理任务的数量累加到总数,并且在集合workers中去除。

标记3尝试终止线程池,后续会研究。

标记4处理线程池还是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker。如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。

总结一下worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。

线程池的关闭

线程池的关闭不是一关了事,worker在池里处于不同状态,必须安排好worker的”后事”,才能真正释放线程池。ThreadPoolExecutor提供两种方法关闭线程池:

shutdown:不能再提交任务,已经提交的任务可继续运行;

shutdownNow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();   //1 安全策略机制
        advanceRunState(SHUTDOWN);   //2
        interruptIdleWorkers();   //3
        onShutdown(); //4 空方法,子类实现
    } finally {
        mainLock.unlock();
    }
    tryTerminate();   //5
}
登入後複製

shutdown将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();  //1
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
登入後複製

shutdownNow和shutdown类似,将线程池切换为STOP状态,中断目标是所有worker。drainQueue会将等待队列里未执行的任务返回。

interruptIdleWorkers和interruptWorkers实现原理都是遍历workers集合,中断条件符合的worker。

上面的代码多次出现调用tryTerminate,这是一个尝试将线程池切换到TERMINATED状态的方法。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //1
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //2
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
       //3
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
登入後複製

标记1检查线程池状态,下面几种情况,后续操作都没有必要,直接return。

RUNNING(还在运行,不能停)

TIDYING或TERMINATED(已经没有在运行的worker)

SHUTDOWN并且等待队列非空(执行完才能停)

标记2在worker非空的情况下又调用了interruptIdleWorkers,你可能疑惑在shutdown时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?你需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。

标记3是最终的状态切换。线程池会先进入TIDYING状态,再进入TERMINATED状态,中间提供了terminated这个空方法供子类实现。

调用关闭线程池方法后,需要等待线程池切换到TERMINATED状态。awaitTermination检查限定时间内线程池是否进入TERMINATED状态,代码如下:

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}
登入後複製

后言

 以上就是Java 线程池执行原理分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Java Spring 面試題 Java Spring 面試題 Aug 30, 2024 pm 04:29 PM

在本文中,我們保留了最常被問到的 Java Spring 面試問題及其詳細答案。這樣你就可以順利通過面試。

突破或從Java 8流返回? 突破或從Java 8流返回? Feb 07, 2025 pm 12:09 PM

Java 8引入了Stream API,提供了一種強大且表達力豐富的處理數據集合的方式。然而,使用Stream時,一個常見問題是:如何從forEach操作中中斷或返回? 傳統循環允許提前中斷或返回,但Stream的forEach方法並不直接支持這種方式。本文將解釋原因,並探討在Stream處理系統中實現提前終止的替代方法。 延伸閱讀: Java Stream API改進 理解Stream forEach forEach方法是一個終端操作,它對Stream中的每個元素執行一個操作。它的設計意圖是處

Java 中的時間戳至今 Java 中的時間戳至今 Aug 30, 2024 pm 04:28 PM

Java 中的時間戳記到日期指南。這裡我們也結合範例討論了介紹以及如何在java中將時間戳記轉換為日期。

PHP:網絡開發的關鍵語言 PHP:網絡開發的關鍵語言 Apr 13, 2025 am 12:08 AM

PHP是一種廣泛應用於服務器端的腳本語言,特別適合web開發。 1.PHP可以嵌入HTML,處理HTTP請求和響應,支持多種數據庫。 2.PHP用於生成動態網頁內容,處理表單數據,訪問數據庫等,具有強大的社區支持和開源資源。 3.PHP是解釋型語言,執行過程包括詞法分析、語法分析、編譯和執行。 4.PHP可以與MySQL結合用於用戶註冊系統等高級應用。 5.調試PHP時,可使用error_reporting()和var_dump()等函數。 6.優化PHP代碼可通過緩存機制、優化數據庫查詢和使用內置函數。 7

Java程序查找膠囊的體積 Java程序查找膠囊的體積 Feb 07, 2025 am 11:37 AM

膠囊是一種三維幾何圖形,由一個圓柱體和兩端各一個半球體組成。膠囊的體積可以通過將圓柱體的體積和兩端半球體的體積相加來計算。本教程將討論如何使用不同的方法在Java中計算給定膠囊的體積。 膠囊體積公式 膠囊體積的公式如下: 膠囊體積 = 圓柱體體積 兩個半球體體積 其中, r: 半球體的半徑。 h: 圓柱體的高度(不包括半球體)。 例子 1 輸入 半徑 = 5 單位 高度 = 10 單位 輸出 體積 = 1570.8 立方單位 解釋 使用公式計算體積: 體積 = π × r2 × h (4

PHP與Python:了解差異 PHP與Python:了解差異 Apr 11, 2025 am 12:15 AM

PHP和Python各有優勢,選擇應基於項目需求。 1.PHP適合web開發,語法簡單,執行效率高。 2.Python適用於數據科學和機器學習,語法簡潔,庫豐富。

創造未來:零基礎的 Java 編程 創造未來:零基礎的 Java 編程 Oct 13, 2024 pm 01:32 PM

Java是熱門程式語言,適合初學者和經驗豐富的開發者學習。本教學從基礎概念出發,逐步深入解說進階主題。安裝Java開發工具包後,可透過建立簡單的「Hello,World!」程式來實踐程式設計。理解程式碼後,使用命令提示字元編譯並執行程序,控制台上將輸出「Hello,World!」。學習Java開啟了程式設計之旅,隨著掌握程度加深,可創建更複雜的應用程式。

如何在Spring Tool Suite中運行第一個春季啟動應用程序? 如何在Spring Tool Suite中運行第一個春季啟動應用程序? Feb 07, 2025 pm 12:11 PM

Spring Boot簡化了可靠,可擴展和生產就緒的Java應用的創建,從而徹底改變了Java開發。 它的“慣例慣例”方法(春季生態系統固有的慣例),最小化手動設置

See all articles