線程池作為Java中一個重要的知識點,看了很多文章,在此以Java自帶的線程池為例,記錄分析一下。本文參考了Java並發程式設計:執行緒池的使用、Java執行緒池---addWorker方法解析、執行緒池、ThreadPoolExecutor中策略的選擇與工作佇列的選擇(java執行緒池)和ThreadPoolExecutor執行緒池解析與BlockingQueue的三種實現。本文基於JDK1.8實現用例和原始碼分析,並主要著重流程。
要知道一個東西的原理,首先要知道如何使用它。所以先上一個使用線程池的範例。
要使用Java自帶的執行緒池,首先需要一個任務類,這個任務類需要實作Runnable接口,並重寫run方法(需要多執行緒執行的任務邏輯)。
package org.my.threadPoolDemo; /** * 任务类,实现Runnable接口 重写run方法 */ public class MyTask implements Runnable{ private int taskNum; public MyTask(int taskNum) { super(); this.taskNum = taskNum; } @Override public void run() { System.out.println("正在执行task"+taskNum); try { Thread.currentThread().sleep(4000);//sleep 4秒模拟执行代码过程 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task"+taskNum+"执行完毕"); } }
有了任務類,接下來建立執行緒池,並執行多個任務。我們使用ThreadPoolExecutor來建立線程池。
package org.my.threadPoolDemo; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolUseDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()); for (int i = 0; i < 15; i++) { MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("线程池中线程数量:"+executor.getPoolSize()+",线程池中等待执行的任务数量:"+executor.getQueue().size()+",已执行完的任务数量:"+executor.getCompletedTaskCount()); } executor.shutdown(); } }
邏輯很簡單,建立了一個執行緒池管理器,然後使用它執行了15個任務。這裡需要解釋一下建構ThreadPoolExecutor的時候傳入的參數的意義:
5(corePoolSize)是指核心池大小。即創建的線程數量。如果執行緒池中執行緒數等於這個數量,那麼下一個來的任務就會被放在任務佇列中(稍後會詳細圖解)。
10(maximumPoolSize)是指執行緒池能建立的最大執行緒數。如果上一步的任務佇列已滿,那麼線程池將繼續建立線程,直到線程數=10,而下一個來的任務會被拒絕執行。
200(keepAliveTime)是指表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但如果呼叫了allowCoreThreadTimeOut(boolean)方法,在執行緒池中的執行緒數不大於corePoolSize時,keepAliveTime參數也會起作用,直到執行緒池中的執行緒數為0。
TimeUnit.MILLISECONDS是keepAliveTime的時間單位。
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES;
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //奈秒
handler:表示拒絕執行任務時的策略,有四種取值:ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException例外。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但不拋出例外。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
正在执行task0 线程池中线程数量:1,线程池中等待执行的任务数量:0,已执行完的任务数量:0 线程池中线程数量:2,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task1 线程池中线程数量:3,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task2 线程池中线程数量:4,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task3 线程池中线程数量:5,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task4 线程池中线程数量:5,线程池中等待执行的任务数量:1,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:2,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:3,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:4,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:5,已执行完的任务数量:0 线程池中线程数量:6,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task10 线程池中线程数量:7,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task11 线程池中线程数量:8,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task12 正在执行task13 线程池中线程数量:9,线程池中等待执行的任务数量:5,已执行完的任务数量:0 线程池中线程数量:10,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task14 task1执行完毕 task10执行完毕 正在执行task5 task11执行完毕 task13执行完毕 task4执行完毕 task3执行完毕 task2执行完毕 task0执行完毕 正在执行task9 正在执行task8 正在执行task7 task14执行完毕 task12执行完毕 正在执行task6 task5执行完毕 task9执行完毕 task7执行完毕 task8执行完毕 task6执行完毕
主要成员变量:任务队列——存放那些暂时无法执行的任务;工作线程池——存放当前启用的所有线程;线程工厂——创建线程;还有一些用来调度线程与任务并保证线程安全的成员。
了解了ThreadPoolExecutor的主要结构,再简单梳理一下“一个传入线程池的任务能够被最终正常执行需要经过的主要流程”,方法名称前面没有“XXX.”这种标注的都是ThreadPoolExecutor的方法:
简单了解下构造器,ThreadPoolExecutor的四个构造器的源码如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ?null :AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
从源码中可以看出,这四个构造器都是调用最后一个构造器,只是根据开发者传入的参数的不同而填充一些默认的参数。比如如果开发者没有传入线程工厂threadFactory参数,那么构造器就使用默认的Executors.defaultThreadFactor。
在这里还要理解ThreadPoolExecutor的几个常量的含义和几个简单方法:
//Integer.SIZE是一个静态常量,值为32,也就是说COUNT_BITS是29 private static final int COUNT_BITS = Integer.SIZE - 3; //CAPACITY是最大容量536870911,因为1左移29位之后-1,导致最高三位为0,而其余29位全部为1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //ctl用于表示线程池状态和有效线程数量,最高三位表示线程池的状态,其余低位表示有效线程数量 //初始化之后ctl等于RUNNING的值,即默认状态是运行状态,线程数量为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //1110 0000 0000 0000 0000 0000 0000 0000最高三位为111 private static final int RUNNING = -1 << COUNT_BITS; //最高三位为000 private static final int SHUTDOWN = 0 << COUNT_BITS; //0010 0000 0000 0000 0000 0000 0000 0000最高三位为001 private static final int STOP = 1 << COUNT_BITS; //0100 0000 0000 0000 0000 0000 0000 0000最高三位为010 private static final int TIDYING = 2 << COUNT_BITS; //0110 0000 0000 0000 0000 0000 0000 0000最高三位为011 private static final int TERMINATED = 3 << COUNT_BITS; /** * 获取运行状态,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1 * 所以当取反的时候,就只有最高三位为1,再经过与运算,只会取到ctl的最高三位 * 而这最高三位如上文所述,表示线程池的状态 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 获取工作线程数量,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1 * 所以当进行与运算的时候,只会取到低29位,这29位正好表示有效线程数量 */ private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } //任务队列,用于存放待执行任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; /** 判断线程池是否是运行状态,传入的参数是ctl的值 * 只有RUNNING的符号位是1,也就是只有RUNNING为负数 * 所以如果目前的ctl值<0,就是RUNNING状态 */ private static boolean isRunning(int c) { return c < SHUTDOWN; } //从任务队列中移除任务 public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; }
通过前面的流程图,我们知道getTask()是由runWorker方法调用的,目的是取出一个任务。
private Runnable getTask() { //记录循环体中上个循环在从阻塞队列中取任务时是否超时 boolean timedOut = false; //无条件循环,主要是在线程池运行正常情况下 //通过循环体内部的阻塞队列的阻塞时间,来控制当前线程的超时时间 for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取线程池状态 /*先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一 *这时候不管队列中有没有任务,都不用去执行了; *如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了 *将工作线程数量-1,并且返回null **/ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取工作线程数量 int wc = workerCountOf(c); //是否启用超时参数keepAliveTime boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果这个条件成立,如果工作线程数量-1成功,返回null,否则跳出循环 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。 //这里通过阻塞时间,间接控制了超时的值,如果取值超时,意味着这个线程在超时时间内处于空闲状态 //那么下一个循环,将会return null并且线程数量-1 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
通过上面流程图,可以看出runWorker(Worker w)实际上已经是在线程启动之后执行任务了,所以其主要逻辑就是获取任务,然后执行任务的run方法。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//获取传入的Worker对象中的任务 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { /* * 如果传入的任务为null,就从任务队列中获取任务,只要这两者有一个不为null,就进入循环体 */ while (task != null || (task = getTask()) != null) { w.lock(); //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态, //也不允许执行任务,还需要中断该线程! if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//为子类预留的空方法(钩子) Throwable thrown = null; try { task.run();//终于真正执行传入的任务了 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//为子类预留的空方法(钩子) } } finally { task = null; w.completedTasks++; w.unlock(); } /* * 从这里可以看出,当一个任务执行完毕之后,循环并没有退出,此时会再次执行条件判断 * 也就是说如果执行完第一个任务之后,任务队列中还有任务,那么将会继续在这个线程执行。 * 这里也是很巧妙的地方,不需要额外开一个控制线程来看那些线程处于空闲状态,然后给他分配任务。 * 直接自己去任务队列拿任务 */ } completedAbruptly = false; } finally { /* * 执行到这里,说明getTask返回了null,要么是超时(任务队列没有任务了),要么是线程池状态有问题了 * 当前线程将被回收了 */ processWorkerExit(w, completedAbruptly); } }
runWorker是从Worker对象中获取第一个任务,然后从任务队列中一直获取任务执行。流程图中已经说过,这个Worker对象是在addWorker方法中创建的,所以新线程创建、启动的源头是在addWorker方法中。而addWorker是被execute所调用,execute根据addWorker的返回值,进行条件判断。
private boolean addWorker(Runnable firstTask, boolean core) { //上来就是retry,后面continue retry;语句执行之后都会从这里重新开始 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取线程池运行状态 /* * 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false * 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//获取工作线程数量 /* * addWorker传入的第二个Boolean参数用来判别当前线程数量是否大于核心池数量 * true,代表当前线程数量小于核心池数量 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //增加工作线程数量 if (compareAndIncrementWorkerCount(c)) //如果成功,跳出retry break retry; c = ctl.get(); // Re-read ctl //判断线程池状态,改变了就重试一次 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /* * 前面顺利增加了工作线程数,那么这里就真正创建Worker * 上面流程图中说过,创建Worker时会创建新的线程.如果这里创建失败 * finally中会将工作线程数-1 */ w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//将Worker放入工作线程池 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
execute方法其实最主要是根据线程池的策略来传递不同的参数给addWorker方法。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
方法execute主要是在控制上面四条策略的实现。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //获取到成员变量ctl的值(线程池状态) int c = ctl.get(); //如果工作线程的数量<核心池的大小 if (workerCountOf(c) < corePoolSize) { //调用addWorker(这里传入的true代表工作线程数量<核心池大小) //如果成功,方法结束。 if (addWorker(command, true)) return; //否则,再重新获取一次ctl的值 //个人理解是防止前面这段代码执行的时候有其他线程改变了ctl的值。 c = ctl.get(); } //如果工作线程数量>=核心池的大小或者上一步调用addWorker返回false,继续走到下面 //如果线程池处于运行状态,并且成功将当前任务放入任务队列 if (isRunning(c) && workQueue.offer(command)) { //为了线程安全,重新获取ctl的值 int recheck = ctl.get(); //如果线程池不处于运行状态并且任务从任务队列移除成功 if (! isRunning(recheck) && remove(command)) //调用reject拒绝执行,根据handler的实现类抛出异常或者其他操作 reject(command); //否则,如果工作线程数量==0,调用addWorker并传入null和false else if (workerCountOf(recheck) == 0) addWorker(null, false); } //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false) //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。 //如果还是提交任务失败则调用reject处理失败任务 else if (!addWorker(command, false)) reject(command); }
本段引用至ThreadPoolExecutor中策略的选择与工作队列的选择(java线程池)
直接提交:
工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不存储它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列:
使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界佇列:
當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但可能導致人工降低吞吐量。如果任務經常阻塞(例如,如果它們是 I/O 邊界),則系統可能會為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這也會降低吞吐量。
在使用ThreadPoolExecutor建立執行緒池的時候,根據不同參數,可以使用不同建構器建構不同特性的執行緒池。而實際情況中,我們一般會使用Executors類別提供的方法來建立線程池。 Executors最終也是呼叫ThreadPoolExecutor的建構器,但已經配置了相關參數。
1)固定大小的執行緒池:Executors.newFixedThreadPool
coresize和maxsize相同,超時時間為0,佇列用的LinkedBlockingQueue無界的FIFO佇列,這表示這個執行緒池總是只有coresize個線程在運行。根據前面分析的,如果執行完任務,這個執行緒會繼續從任務佇列取任務執行,當沒有任務的時候,執行緒會立即關閉。
2)單任務執行緒池:newSingleThreadExecutor
池中只有一個執行緒工作,阻塞佇列無界,它能保證按照任務提交的順序來執行任務。
3)可變尺寸的執行緒池:newCachedThreadPool
SynchronousQueue佇列,一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作。所以,當我們提交第一個任務的時候,是加入不了隊列的,這就滿足了,一個線程池條件“當無法加入隊列的時候,且任務沒有達到maxsize時,我們將新開啟一個線程任務” 。所以我們的maxsize是ctl低29位的最大值。超時時間是60s,當一個執行緒沒有任務執行會暫時保存60s超時時間,如果沒有的新的任務的話,會從執行緒池中remove掉。
4)scheduled執行緒池,newScheduledThreadPool
建立一個大小無限的執行緒池。此線程池支援定時以及週期性執行任務的需求
終於差不多可以畫上句號了,感覺有很多東西還是沒有細說,尤其是其中大量的重入鎖。有機會再補上。綜合了許多網路資源,也自己畫了幾張圖,個人認為相對於許多介紹線程池的部落格來說,比較容易理解吧。如有錯誤,歡迎批評指正。
以上是Java中線程池的圖文程式碼詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!