Thread pool is an important knowledge point in Java. I have read many articles. Here, I will take the thread pool that comes with Java as an example to record and analyze it. This article refers to Java concurrent programming: the use of thread pools, Java thread pool---addWorker method analysis, thread pools, selection of strategies in ThreadPoolExecutor and selection of work queues (java thread pool), and ThreadPoolExecutor thread pool analysis and three types of BlockingQueue accomplish. This article implements use cases and source code analysis based on JDK1.8, and mainly focuses on the process.
To know the principle of something, you must first know how to use it. So let’s start with an example of using a thread pool.
To use the thread pool that comes with Java, you first need a task class. This task class needs to implement the Runnable interface and override the run method (which requires multi-thread execution) task logic).
package org.my.threadPoolDemo; /** * 任务类,实现Runnable接口 重写run方法 */ public class MyTask implements Runnable{ private int taskNum; public MyTask(int taskNum) { super(); this.taskNum = taskNum; } @Override public void run() { System.out.println("正在执行task"+taskNum); try { Thread.currentThread().sleep(4000);//sleep 4秒模拟执行代码过程 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task"+taskNum+"执行完毕"); } }
With the task class, next create a thread pool and execute multiple tasks. We use ThreadPoolExecutor to create a thread pool.
package org.my.threadPoolDemo; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolUseDemo { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()); for (int i = 0; i < 15; i++) { MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("线程池中线程数量:"+executor.getPoolSize()+",线程池中等待执行的任务数量:"+executor.getQueue().size()+",已执行完的任务数量:"+executor.getCompletedTaskCount()); } executor.shutdown(); } }
The logic is very simple, create a thread pool manager, and then use it to execute 15 tasks. Here we need to explain the meaning of the parameters passed in when constructing ThreadPoolExecutor:
5 (corePoolSize) refers to the core pool size. That is, the number of threads created. If the number of threads in the thread pool is equal to this number, then the next coming task will be placed in the task queue (illustrated in detail later).
10 (maximumPoolSize) refers to the maximum number of threads that can be created by the thread pool. If the task queue of the previous step is full, the thread pool will continue to create threads until the number of threads = 10, and the next task will be refused execution.
200 (keepAliveTime) refers to the maximum amount of time that the thread can last before it terminates when there is no task to execute. By default, keepAliveTime will only work when the number of threads in the thread pool is greater than corePoolSize, and until the number of threads in the thread pool is not greater than corePoolSize, that is, when the number of threads in the thread pool is greater than corePoolSize, if a thread is idle When the time reaches keepAliveTime, it will terminate until the number of threads in the thread pool does not exceed corePoolSize. But if the allowCoreThreadTimeOut(boolean) method is called, when the number of threads in the thread pool is not greater than corePoolSize, the keepAliveTime parameter will also take effect until the number of threads in the thread pool is 0.
TimeUnit.MILLISECONDS is the time unit of keepAliveTime.
TimeUnit.DAYS; //Milliseconds
TimeUnit.MICROSECONDS; //Microseconds
TimeUnit.NANOSECONDS; //Nanoseconds
ArrayBlockingQueue is the incoming blocking queue used to store the task queue workQueue. That is, the task queue mentioned above. In addition to ArrayBlockingQueue, there are also LinkedBlockingQueue and SynchronousQueue to choose from.
ThreadPoolExecutor has four constructors. In addition to the parameters passed in above, there are other constructors that can be passed in:
threadFactory: Thread factory, mainly used to create threads
handler: Indicates the policy for refusing to execute tasks. There are four values:
ThreadPoolExecutor.AbortPolicy: Discard the task and throw RejectedExecutionException.
ThreadPoolExecutor.DiscardPolicy: It also discards tasks, but does not throw exceptions.ThreadPoolExecutor.DiscardOldestPolicy: Discard the frontmost task of the queue, and then try to execute the task again (repeat this process)
ThreadPoolExecutor.CallerRunsPolicy: The task is processed by the calling thread
Execute the above program, the results are as follows :
正在执行task0 线程池中线程数量:1,线程池中等待执行的任务数量:0,已执行完的任务数量:0 线程池中线程数量:2,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task1 线程池中线程数量:3,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task2 线程池中线程数量:4,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task3 线程池中线程数量:5,线程池中等待执行的任务数量:0,已执行完的任务数量:0 正在执行task4 线程池中线程数量:5,线程池中等待执行的任务数量:1,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:2,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:3,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:4,已执行完的任务数量:0 线程池中线程数量:5,线程池中等待执行的任务数量:5,已执行完的任务数量:0 线程池中线程数量:6,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task10 线程池中线程数量:7,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task11 线程池中线程数量:8,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task12 正在执行task13 线程池中线程数量:9,线程池中等待执行的任务数量:5,已执行完的任务数量:0 线程池中线程数量:10,线程池中等待执行的任务数量:5,已执行完的任务数量:0 正在执行task14 task1执行完毕 task10执行完毕 正在执行task5 task11执行完毕 task13执行完毕 task4执行完毕 task3执行完毕 task2执行完毕 task0执行完毕 正在执行task9 正在执行task8 正在执行task7 task14执行完毕 task12执行完毕 正在执行task6 task5执行完毕 task9执行完毕 task7执行完毕 task8执行完毕 task6执行完毕
2. Principle Analysis
From the above example of using the thread pool, the most important two steps are to construct the ThreadPoolExecutor object, and then call the execute method of the ThreadPoolExecutor object every time a task comes.
The main structure and inheritance relationship of ThreadPoolExecutor are shown in the following figure:
##ThreadPoolExecutor structure and inheritance relation主要成员变量:任务队列——存放那些暂时无法执行的任务;工作线程池——存放当前启用的所有线程;线程工厂——创建线程;还有一些用来调度线程与任务并保证线程安全的成员。
了解了ThreadPoolExecutor的主要结构,再简单梳理一下“一个传入线程池的任务能够被最终正常执行需要经过的主要流程”,方法名称前面没有“XXX.”这种标注的都是ThreadPoolExecutor的方法:
简单了解下构造器,ThreadPoolExecutor的四个构造器的源码如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ?null :AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
从源码中可以看出,这四个构造器都是调用最后一个构造器,只是根据开发者传入的参数的不同而填充一些默认的参数。比如如果开发者没有传入线程工厂threadFactory参数,那么构造器就使用默认的Executors.defaultThreadFactor。
在这里还要理解ThreadPoolExecutor的几个常量的含义和几个简单方法:
//Integer.SIZE是一个静态常量,值为32,也就是说COUNT_BITS是29 private static final int COUNT_BITS = Integer.SIZE - 3; //CAPACITY是最大容量536870911,因为1左移29位之后-1,导致最高三位为0,而其余29位全部为1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //ctl用于表示线程池状态和有效线程数量,最高三位表示线程池的状态,其余低位表示有效线程数量 //初始化之后ctl等于RUNNING的值,即默认状态是运行状态,线程数量为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //1110 0000 0000 0000 0000 0000 0000 0000最高三位为111 private static final int RUNNING = -1 << COUNT_BITS; //最高三位为000 private static final int SHUTDOWN = 0 << COUNT_BITS; //0010 0000 0000 0000 0000 0000 0000 0000最高三位为001 private static final int STOP = 1 << COUNT_BITS; //0100 0000 0000 0000 0000 0000 0000 0000最高三位为010 private static final int TIDYING = 2 << COUNT_BITS; //0110 0000 0000 0000 0000 0000 0000 0000最高三位为011 private static final int TERMINATED = 3 << COUNT_BITS; /** * 获取运行状态,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1 * 所以当取反的时候,就只有最高三位为1,再经过与运算,只会取到ctl的最高三位 * 而这最高三位如上文所述,表示线程池的状态 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 获取工作线程数量,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1 * 所以当进行与运算的时候,只会取到低29位,这29位正好表示有效线程数量 */ private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } //任务队列,用于存放待执行任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; /** 判断线程池是否是运行状态,传入的参数是ctl的值 * 只有RUNNING的符号位是1,也就是只有RUNNING为负数 * 所以如果目前的ctl值<0,就是RUNNING状态 */ private static boolean isRunning(int c) { return c < SHUTDOWN; } //从任务队列中移除任务 public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; }
通过前面的流程图,我们知道getTask()是由runWorker方法调用的,目的是取出一个任务。
private Runnable getTask() { //记录循环体中上个循环在从阻塞队列中取任务时是否超时 boolean timedOut = false; //无条件循环,主要是在线程池运行正常情况下 //通过循环体内部的阻塞队列的阻塞时间,来控制当前线程的超时时间 for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取线程池状态 /*先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一 *这时候不管队列中有没有任务,都不用去执行了; *如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了 *将工作线程数量-1,并且返回null **/ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //获取工作线程数量 int wc = workerCountOf(c); //是否启用超时参数keepAliveTime boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果这个条件成立,如果工作线程数量-1成功,返回null,否则跳出循环 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。 //这里通过阻塞时间,间接控制了超时的值,如果取值超时,意味着这个线程在超时时间内处于空闲状态 //那么下一个循环,将会return null并且线程数量-1 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
通过上面流程图,可以看出runWorker(Worker w)实际上已经是在线程启动之后执行任务了,所以其主要逻辑就是获取任务,然后执行任务的run方法。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//获取传入的Worker对象中的任务 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { /* * 如果传入的任务为null,就从任务队列中获取任务,只要这两者有一个不为null,就进入循环体 */ while (task != null || (task = getTask()) != null) { w.lock(); //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态, //也不允许执行任务,还需要中断该线程! if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//为子类预留的空方法(钩子) Throwable thrown = null; try { task.run();//终于真正执行传入的任务了 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//为子类预留的空方法(钩子) } } finally { task = null; w.completedTasks++; w.unlock(); } /* * 从这里可以看出,当一个任务执行完毕之后,循环并没有退出,此时会再次执行条件判断 * 也就是说如果执行完第一个任务之后,任务队列中还有任务,那么将会继续在这个线程执行。 * 这里也是很巧妙的地方,不需要额外开一个控制线程来看那些线程处于空闲状态,然后给他分配任务。 * 直接自己去任务队列拿任务 */ } completedAbruptly = false; } finally { /* * 执行到这里,说明getTask返回了null,要么是超时(任务队列没有任务了),要么是线程池状态有问题了 * 当前线程将被回收了 */ processWorkerExit(w, completedAbruptly); } }
runWorker是从Worker对象中获取第一个任务,然后从任务队列中一直获取任务执行。流程图中已经说过,这个Worker对象是在addWorker方法中创建的,所以新线程创建、启动的源头是在addWorker方法中。而addWorker是被execute所调用,execute根据addWorker的返回值,进行条件判断。
private boolean addWorker(Runnable firstTask, boolean core) { //上来就是retry,后面continue retry;语句执行之后都会从这里重新开始 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取线程池运行状态 /* * 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false * 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//获取工作线程数量 /* * addWorker传入的第二个Boolean参数用来判别当前线程数量是否大于核心池数量 * true,代表当前线程数量小于核心池数量 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //增加工作线程数量 if (compareAndIncrementWorkerCount(c)) //如果成功,跳出retry break retry; c = ctl.get(); // Re-read ctl //判断线程池状态,改变了就重试一次 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /* * 前面顺利增加了工作线程数,那么这里就真正创建Worker * 上面流程图中说过,创建Worker时会创建新的线程.如果这里创建失败 * finally中会将工作线程数-1 */ w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//将Worker放入工作线程池 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//启动线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
execute方法其实最主要是根据线程池的策略来传递不同的参数给addWorker方法。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
方法execute主要是在控制上面四条策略的实现。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //获取到成员变量ctl的值(线程池状态) int c = ctl.get(); //如果工作线程的数量<核心池的大小 if (workerCountOf(c) < corePoolSize) { //调用addWorker(这里传入的true代表工作线程数量<核心池大小) //如果成功,方法结束。 if (addWorker(command, true)) return; //否则,再重新获取一次ctl的值 //个人理解是防止前面这段代码执行的时候有其他线程改变了ctl的值。 c = ctl.get(); } //如果工作线程数量>=核心池的大小或者上一步调用addWorker返回false,继续走到下面 //如果线程池处于运行状态,并且成功将当前任务放入任务队列 if (isRunning(c) && workQueue.offer(command)) { //为了线程安全,重新获取ctl的值 int recheck = ctl.get(); //如果线程池不处于运行状态并且任务从任务队列移除成功 if (! isRunning(recheck) && remove(command)) //调用reject拒绝执行,根据handler的实现类抛出异常或者其他操作 reject(command); //否则,如果工作线程数量==0,调用addWorker并传入null和false else if (workerCountOf(recheck) == 0) addWorker(null, false); } //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false) //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。 //如果还是提交任务失败则调用reject处理失败任务 else if (!addWorker(command, false)) reject(command); }
本段引用至ThreadPoolExecutor中策略的选择与工作队列的选择(java线程池)
直接提交:
工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不存储它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列:
使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
Bounded queues:
Bounded queues (such as ArrayBlockingQueue) help prevent resource exhaustion when using limited maximumPoolSizes, but can be harder to tune and control. Queue size and maximum pool size may require trade-offs: using large queues and small pools can minimize CPU usage, operating system resources, and context switching overhead, but may result in artificially reduced throughput. If tasks block frequently (for example, if they are I/O bounds), the system might schedule more threads than you allow. Using small queues typically requires larger pool sizes and higher CPU usage, but may encounter unacceptable scheduling overhead, which can also reduce throughput.
When using ThreadPoolExecutor to create a thread pool, you can use different constructors to construct thread pools with different characteristics according to different parameters. In actual situations, we generally use the methods provided by the Executors class to create thread pools. Executors ultimately calls the constructor of ThreadPoolExecutor, but the relevant parameters have been configured.
1) Fixed-size thread pool: Executors.newFixedThreadPool
coresize is the same as maxsize, the timeout is 0, and the queue uses an unbounded FIFO queue of LinkedBlockingQueue, which means that this thread pool always only has coresize Threads are running. According to the previous analysis, if the task is executed, this thread will continue to fetch tasks from the task queue for execution. When there are no tasks, the thread will be closed immediately.
2) Single-task thread pool: newSingleThreadExecutor
There is only one thread working in the pool, and the blocking queue is unbounded. It can ensure that tasks are executed in the order in which they are submitted.
3) Variable-size thread pool: newCachedThreadPool
SynchronousQueue queue, a blocking queue that does not store elements. Each insert operation must wait until another thread calls a remove operation. Therefore, when we submit the first task, we cannot join the queue. This satisfies a thread pool condition "When we cannot join the queue and the task does not reach maxsize, we will start a new thread task." . So our maxsize is the maximum value of the lower 29 bits of ctl. The timeout is 60s. When a thread has no tasks to execute, the 60s timeout will be temporarily saved. If there are no new tasks, it will be removed from the thread pool.
4) scheduled thread pool, newScheduledThreadPool
Create a thread pool of unlimited size. This thread pool supports the need for timing and periodic execution of tasks
Finally it’s almost come to an end. I feel that there are still many things that have not been detailed, especially the large number of reentrant locks. Will make up for it when I have a chance. I have integrated many online resources and drawn a few pictures myself. I personally think it is easier to understand than many blogs that introduce thread pools. If there are any mistakes, criticisms and corrections are welcome.
The above is the detailed content of Detailed explanation of graphic code of thread pool in Java. For more information, please follow other related articles on the PHP Chinese website!