首頁 > Java > java教程 > 主體

Java 執行緒池執行原理分析

黄舟
發布: 2017-02-23 10:42:44
原創
1200 人瀏覽過

上一篇已經對線程池的創建進行了分析,了解線程池既有預設的模板,也提供多種參數支撐靈活的客製化。

本文將會圍繞著執行緒池的生命週期,分析執行緒池執行任務的過程。

執行緒池狀態

先認識兩個貫穿執行緒池程式碼的參數:

runState:執行緒池運行狀態

workerCount:工作執行緒的數量

執行緒池用一個32位元的int來同時儲存runState和workerCount,其中高3位元是runState,其餘29位元是workerCount。程式碼中會重複使用runStateOf和workerCountOf來取得runState和workerCount。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
登入後複製
// 线程池状态
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
登入後複製
// ctl操作
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
登入後複製

RUNNING:可接收新任務,可執行等待佇列裡的任務

SHUTDOWN:不可接收新任務,可執行等待佇列裡的任務

STOP:不可接收新任務,不可執行等待佇列裡的任務,並且嘗試終止所有在執行任務

TIDYING:所有任務已經終止,執行terminated()

TERMINATED: terminated()執行完成

執行緒池狀態預設從RUNNING開始流轉,到狀態TERMINATED結束,中間不需要經過每一種狀態,但不能讓狀態回退。以下是狀態變更可能的路徑與變更條件:

Java 執行緒池執行原理分析

圖1 執行緒池狀態變更路徑

Worker的建立

執行緒池是由Worker類別負責執行任務,Worker繼承了AbstractQueuedSynchronizer,引出了Java並發框架的核心AQS。

AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。
登入後複製


AQS不會在這裡展開討論,只需要知道Worker包裝了Thread,由它去執行任務。

呼叫execute將會根據執行緒池的情況建立Worker,可以歸納出下圖四種情況:

Java 執行緒池執行原理分析

圖2 worker在執行緒池裡的四種可能

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            //3
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //4
            addWorker(null, false);
    }
    //5
    else if (!addWorker(command, false))
        //6
        reject(command);
}
登入後複製

標記1對應第一種情況,要留意addWorker傳入了core,core=true為corePoolSize,core=false為maximumPoolSize,新增時需要檢查workerCount是否超過允許的最大值。

標記2對應第二種情況,檢查執行緒池是否正在執行,並且將任務加入等待佇列。標記3再檢查一次執行緒池狀態,如果執行緒池忽然處於非運作狀態,那就將等待佇列剛加的任務刪掉,再交給RejectedExecutionHandler處理。標記4發現沒有worker,就先補充一個空任務的worker。

標記5對應第三種情況,等待佇列不能再新增任務了,呼叫addWorker新增一個去處理。

標記6對應第四種情況,addWorker的core傳入false,回傳呼叫失敗,代表workerCount已經超出maximumPoolSize,那就交給RejectedExecutionHandler處理。

private boolean addWorker(Runnable firstTask, boolean core) {
        //1
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //2
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
登入後複製

標記1的第一段程式碼,目的很簡單,是為workerCount加一。至於為什麼程式碼寫了這麼長,是因為執行緒池的狀態不斷變化,並發環境下需要保證變數的同步性。外循環判斷執行緒池狀態、任務非空和佇列非空,內循環使用CAS機制保證workerCount正確地遞增。不了解CAS可以看認識非阻塞的同步機制CAS,後續增減workerCount都會使用CAS。

標記2的第二段程式碼,就比較簡單。建立一個新Worker對象,將Worker加入workers裡(Set集合)。成功加入後,啟動worker裡的執行緒。在finally裡判斷執行緒是否啟動成功,不成功直接呼叫addWorkerFailed。

private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
登入後複製

addWorkerFailed將減少已經遞增的workerCount,並且呼叫tryTerminate結束執行緒池。

Worker的執行

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
public void run() {
    runWorker(this);
}
登入後複製

Worker在建構函式裡採用ThreadFactory建立Thread,在run方法裡呼叫了runWorker,看來是真正執行任務的地方。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
       //1
        while (task != null || (task = getTask()) != null) {
            w.lock();
           //2
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
               //3
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                 //4
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;       //5
    } finally {
        //6
        processWorkerExit(w, completedAbruptly);
    }
}
登入後複製

標記1進入循環,從getTask取得要執行的任務,直到返回null。這裡達到了線程復用的效果,讓線程處理多個任務。

標記2是比較複雜的判斷,保證了執行緒池在STOP狀態下執行緒是中斷的,非STOP狀態下執行緒沒有被中斷。如果你不了解Java的中斷機制,看看如何正確結束Java執行緒這篇。

標記3呼叫了run方法,真正執行了任務。執行前後提供了beforeExecute和afterExecute兩個方法,由子類別實作。

標記4裡的completedTasks統計worker執行了多少任務,最後累加進completedTaskCount變量,可以呼叫對應方法回傳一些統計資料。

標記5的變數completedAbruptly表示worker是否異常終止,執行到這裡代表執行正常,後續的方法需要這個變數。

標記6呼叫processWorkerExit結束,後面會分析。

接著來看worker從等待佇列取得任務的getTask方法:

#
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //1
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //2
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
       //3
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
登入後複製

标记1检查线程池的状态,这里就体现出SHUTDOWN和STOP的区别。如果线程池是SHUTDOWN状态,还会先处理完等待队列的任务;如果是STOP状态,就不再处理等待队列里的任务了。

标记2先看allowCoreThreadTimeOut这个变量,false时worker空闲,也不会结束;true时,如果worker空闲超过keepAliveTime,就会结束。接着是一个很复杂的判断,好难转成文字描述,自己看吧。注意一下wc>maximumPoolSize,出现这种可能是在运行中调用setMaximumPoolSize,还有wc>1,在等待队列非空时,至少保留一个worker。

标记3是从等待队列取任务的逻辑,根据timed分为等待keepAliveTime或者阻塞直到有任务。

最后来看结束worker需要执行的操作:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   //1
    if (completedAbruptly) // If abrupt, then workerCount wasn&#39;t adjusted
        decrementWorkerCount();
  //2
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
   //3
    tryTerminate();
    int c = ctl.get();
    //4
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}
登入後複製

正常情况下,在getTask里就会将workerCount减一。标记1处用变量completedAbruptly判断worker是否异常退出,如果是,需要补充对workerCount的减一。

标记2将worker处理任务的数量累加到总数,并且在集合workers中去除。

标记3尝试终止线程池,后续会研究。

标记4处理线程池还是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker。如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。

总结一下worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。

线程池的关闭

线程池的关闭不是一关了事,worker在池里处于不同状态,必须安排好worker的”后事”,才能真正释放线程池。ThreadPoolExecutor提供两种方法关闭线程池:

shutdown:不能再提交任务,已经提交的任务可继续运行;

shutdownNow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();   //1 安全策略机制
        advanceRunState(SHUTDOWN);   //2
        interruptIdleWorkers();   //3
        onShutdown(); //4 空方法,子类实现
    } finally {
        mainLock.unlock();
    }
    tryTerminate();   //5
}
登入後複製

shutdown将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();  //1
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
登入後複製

shutdownNow和shutdown类似,将线程池切换为STOP状态,中断目标是所有worker。drainQueue会将等待队列里未执行的任务返回。

interruptIdleWorkers和interruptWorkers实现原理都是遍历workers集合,中断条件符合的worker。

上面的代码多次出现调用tryTerminate,这是一个尝试将线程池切换到TERMINATED状态的方法。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //1
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //2
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
       //3
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
登入後複製

标记1检查线程池状态,下面几种情况,后续操作都没有必要,直接return。

RUNNING(还在运行,不能停)

TIDYING或TERMINATED(已经没有在运行的worker)

SHUTDOWN并且等待队列非空(执行完才能停)

标记2在worker非空的情况下又调用了interruptIdleWorkers,你可能疑惑在shutdown时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?你需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。

标记3是最终的状态切换。线程池会先进入TIDYING状态,再进入TERMINATED状态,中间提供了terminated这个空方法供子类实现。

调用关闭线程池方法后,需要等待线程池切换到TERMINATED状态。awaitTermination检查限定时间内线程池是否进入TERMINATED状态,代码如下:

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}
登入後複製

后言

 以上就是Java 线程池执行原理分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板