首页 Java java教程 Java 线程池框架核心代码分析

Java 线程池框架核心代码分析

Dec 05, 2016 am 11:44 AM
java

多线程编程中,为每个任务分配一个线程是不现实的,线程创建的开销和资源消耗都是很高的。线程池应运而生,成为我们管理线程的利器。Java 通过Executor接口,提供了一种标准的方法将任务的提交过程和执行过程解耦开来,并用Runnable表示任务。

下面,我们来分析一下 Java 线程池框架的实现ThreadPoolExecutor。

下面的分析基于JDK1.7

生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位来表示运行状态,分别是:

RUNNING:接收新任务,并且处理任务队列中的任务 
SHUTDOWN:不接收新任务,但是处理任务队列的任务 
STOP:不接收新任务,不出来任务队列,同时中断所有进行中的任务 
TIDYING:所有任务已经被终止,工作线程数量为 0,到达该状态会执行terminated() 
TERMINATED:terminated()执行完毕 

2340.jpg

ThreadPoolExecutor中用原子类来表示状态位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
登录后复制

线程池模型

核心参数

corePoolSize:最小存活的工作线程数量(如果设置allowCoreThreadTimeOut,那么该值为 0)
maximumPoolSize:最大的线程数量,受限于CAPACITY
keepAliveTime:对应线程的存活时间,时间单位由TimeUnit指定
workQueue:工作队列,存储待执行的任务
RejectExecutionHandler:拒绝策略,线程池满后会触发
线程池的最大容量:CAPACITY中的前三位用作标志位,也就是说工作线程的最大容量为(2^29)-1

四种模型

CachedThreadPool:一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何的限制。
FixedThreadPool:一个固定大小的线程池,提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的大小将不再变化。
SingleThreadPool:一个单线程的线程池,它只有一个工作线程来执行任务,可以确保按照任务在队列中的顺序来串行执行,如果这个线程异常结束将创建一个新的线程来执行任务。
ScheduledThreadPool:一个固定大小的线程池,并且以延迟或者定时的方式来执行任务,类似于Timer。
执行任务 execute

核心逻辑:

当前线程数量 < corePoolSize,直接开启新的核心线程执行任务addWorker(command, true)
当前线程数量 >= corePoolSize,且任务加入工作队列成功
检查线程池当前状态是否处于RUNNING
如果否,则拒绝该任务
如果是,判断当前线程数量是否为 0,如果为 0,就增加一个工作线程。
开启普通线程执行任务addWorker(command, false),开启失败就拒绝该任务
从上面的分析可以总结出线程池运行的四个阶段:

poolSize < corePoolSize 且队列为空,此时会新建线程来处理提交的任务
poolSize == corePoolSize,此时提交的任务进入工作队列,工作线程从队列中获取任务执行,此时队列不为空且未满。
poolSize == corePoolSize,并且队列已满,此时也会新建线程来处理提交的任务,但是poolSize < maxPoolSize
poolSize == maxPoolSize,并且队列已满,此时会触发拒绝策略
拒绝策略

前面我们提到任务无法执行会被拒绝,RejectedExecutionHandler是处理被拒绝任务的接口。下面是四种拒绝策略。

AbortPolicy:默认策略,终止任务,抛出RejectedException
CallerRunsPolicy:在调用者线程执行当前任务,不抛异常
DiscardPolicy: 抛弃策略,直接丢弃任务,不抛异常
DiscardOldersPolicy:抛弃最老的任务,执行当前任务,不抛异常

线程池中的 Worker

Worker继承了AbstractQueuedSynchronizer和Runnable,前者给Worker提供锁的功能,后者执行工作线程的主要方法runWorker(Worker w)(从任务队列捞任务执行)。Worker 引用存在workers集合里面,用mainLock守护。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
登录后复制

核心函数 runWorker

下面是简化的逻辑,注意:每个工作线程的run都执行下面的函数

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);        
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}
登录后复制

从getTask()中获取任务
锁住 worker
执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
执行afterExecute(task, thrown);
解锁 worker
如果获取到的任务为 null,关闭 worker
获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;
登录后复制

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

现有的线程数量超过最大线程数量
线程池处于STOP状态
线程池处于SHUTDOWN状态且工作队列为空
线程等待任务超时,且线程数量超过保留线程数量
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
登录后复制

在以下两种情况下等待任务会超时:

允许核心线程等待超时,即allowCoreThreadTimeOut(true) 
当前线程是普通线程,此时wc > corePoolSize 
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。

总结

ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。 
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。 
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。 
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。


本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
3 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Java 中的平方根 Java 中的平方根 Aug 30, 2024 pm 04:26 PM

Java 中的平方根指南。下面我们分别通过例子和代码实现来讨论平方根在Java中的工作原理。

Java 中的完美数 Java 中的完美数 Aug 30, 2024 pm 04:28 PM

Java 完美数指南。这里我们讨论定义,如何在 Java 中检查完美数?,示例和代码实现。

Java 中的阿姆斯特朗数 Java 中的阿姆斯特朗数 Aug 30, 2024 pm 04:26 PM

Java 中的阿姆斯特朗数指南。这里我们讨论一下java中阿姆斯特朗数的介绍以及一些代码。

Java 中的随机数生成器 Java 中的随机数生成器 Aug 30, 2024 pm 04:27 PM

Java 随机数生成器指南。在这里,我们通过示例讨论 Java 中的函数,并通过示例讨论两个不同的生成器。

Java中的Weka Java中的Weka Aug 30, 2024 pm 04:28 PM

Java 版 Weka 指南。这里我们通过示例讨论简介、如何使用weka java、平台类型和优点。

Java 中的史密斯数 Java 中的史密斯数 Aug 30, 2024 pm 04:28 PM

Java 史密斯数指南。这里我们讨论定义,如何在Java中检查史密斯号?带有代码实现的示例。

Java Spring 面试题 Java Spring 面试题 Aug 30, 2024 pm 04:29 PM

在本文中,我们保留了最常被问到的 Java Spring 面试问题及其详细答案。这样你就可以顺利通过面试。

突破或从Java 8流返回? 突破或从Java 8流返回? Feb 07, 2025 pm 12:09 PM

Java 8引入了Stream API,提供了一种强大且表达力丰富的处理数据集合的方式。然而,使用Stream时,一个常见问题是:如何从forEach操作中中断或返回? 传统循环允许提前中断或返回,但Stream的forEach方法并不直接支持这种方式。本文将解释原因,并探讨在Stream处理系统中实现提前终止的替代方法。 延伸阅读: Java Stream API改进 理解Stream forEach forEach方法是一个终端操作,它对Stream中的每个元素执行一个操作。它的设计意图是处

See all articles