Home > Java > javaTutorial > body text

Java thread pool framework core code analysis

伊谢尔伦
Release: 2016-12-05 11:44:59
Original
1305 people have browsed it

In multi-threaded programming, it is unrealistic to allocate a thread to each task. The overhead and resource consumption of thread creation are very high. The thread pool emerged as the times require and has become a powerful tool for us to manage threads. Java provides a standard method to decouple the task submission process and execution process through the Executor interface, and uses Runnable to represent the task.

Now, let’s analyze ThreadPoolExecutor, the implementation of the Java thread pool framework.

The following analysis is based on JDK1.7

Life cycle

In ThreadPoolExecutor, the high 3 bits of CAPACITY are used to represent the running status, which are:

RUNNING: Receive new tasks and process tasks in the task queue
SHUTDOWN: Do not receive new tasks, but process tasks in the task queue
STOP: Do not receive new tasks, do not come out of the task queue, and interrupt all ongoing tasks
TIDYING: All tasks have been terminated, the number of worker threads is 0, reaching this state will Execute terminated()
TERMINATED: terminated() execution is completed

Java thread pool framework core code analysis

ThreadPoolExecutor uses atomic classes to represent status bits

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
Copy after login

Thread pool model

Core parameters

corePoolSize: the minimum number of surviving worker threads (if allowCoreThreadTimeOut is set , then the value is 0)
maximumPoolSize: the maximum number of threads, limited by CAPACITY
keepAliveTime: the survival time of the corresponding thread, the time unit is specified by TimeUnit
workQueue: work queue, storing tasks to be executed
RejectExecutionHandler: rejection policy, When the thread pool is full, it will trigger
Maximum capacity of the thread pool: The first three digits in CAPACITY are used as flag bits, which means that the maximum capacity of the worker thread is (2^29)-1

Four models

CachedThreadPool: one Cacheable thread pool. If the current size of the thread pool exceeds the processing demand, idle threads will be recycled. When demand increases, new threads can be added. There is no limit to the size of the thread pool.
FixedThreadPool: A fixed-size thread pool. When a task is submitted, a thread is created until the maximum number of thread pools is reached, at which time the size of the thread pool will no longer change.
SingleThreadPool: A single-threaded thread pool. It has only one worker thread to execute tasks. It can ensure that tasks are executed serially in the order in the queue. If this thread ends abnormally, a new thread will be created to execute the task.
ScheduledThreadPool: A fixed-size thread pool that performs tasks in a delayed or scheduled manner, similar to Timer.
Execute task execute

Core logic:

The current number of threads < corePoolSize, directly start a new core thread to execute the task addWorker(command, true)
The current number of threads >= corePoolSize, and the task is added to the work queue successfully
Check the thread Whether the current status of the pool is RUNNING
If not, reject the task
If yes, determine whether the current number of threads is 0, if it is 0, add a worker thread.
Enable the ordinary thread execution task addWorker(command, false), and reject the task if it fails to start.
From the above analysis, we can summarize the four stages of thread pool operation:

poolSize < corePoolSize and the queue is empty, a new one will be created at this time Thread to process the submitted task
poolSize == corePoolSize. At this time, the submitted task enters the work queue, and the worker thread obtains the task execution from the queue. At this time, the queue is not empty and not full.
poolSize == corePoolSize, and the queue is full, a new thread will be created to process the submitted tasks, but poolSize < maxPoolSize
poolSize == maxPoolSize, and the queue is full, the rejection policy will be triggered at this time
Rejection policy

Earlier we mentioned that tasks that cannot be executed will be rejected. RejectedExecutionHandler is the interface for handling rejected tasks. Here are four rejection strategies.

AbortPolicy: Default policy, terminates the task, throws RejectedException
CallerRunsPolicy: Execute the current task in the caller thread, no exception is thrown
DiscardPolicy: Discard policy, directly discard the task, no exception is thrown
DiscardOldersPolicy: Discard the oldest task, execute The current task does not throw exceptions

Worker in the thread pool

Worker inherits AbstractQueuedSynchronizer and Runnable. The former provides the lock function to the Worker, and the latter executes the main method of the worker thread runWorker(Worker w) (gets the task execution from the task queue ). The Worker reference is stored in the workers collection and is guarded by mainLock.

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
Copy after login

Core function runWorker

The following is the simplified logic. Note: The run of each worker thread executes the following function

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);        
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}
Copy after login

从getTask()中获取任务
锁住 worker
执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
执行afterExecute(task, thrown);
解锁 worker
如果获取到的任务为 null,关闭 worker
获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;
Copy after login

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

现有的线程数量超过最大线程数量
线程池处于STOP状态
线程池处于SHUTDOWN状态且工作队列为空
线程等待任务超时,且线程数量超过保留线程数量
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
Copy after login

在以下两种情况下等待任务会超时:

允许核心线程等待超时,即allowCoreThreadTimeOut(true) 
当前线程是普通线程,此时wc > corePoolSize 
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。

总结

ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。 
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。 
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。 
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。


Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template