멀티 스레드 프로그래밍에서는 스레드 생성에 따른 오버헤드와 리소스 소비가 매우 높습니다. 스레드 풀은 시대의 요구에 따라 등장했으며 스레드를 관리하는 강력한 도구가 되었습니다. Java는 Executor 인터페이스를 통해 작업 제출 프로세스와 실행 프로세스를 분리하는 표준 방법을 제공하고 Runnable을 사용하여 작업을 나타냅니다.
다음으로 Java 스레드 풀 프레임워크의 구현인 ThreadPoolExecutor를 분석해 보겠습니다.
다음 분석은 JDK1.7
ThreadPoolExecutor의 라이프사이클을 기반으로 하며, CAPACITY의 상위 3비트를 사용하여 각각 실행 상태를 나타냅니다.
RUNNING: 새 작업을 받고 작업 큐에 있는 작업을 처리
SHUTDOWN: 새 작업을 받지 않고 작업 큐에 있는 작업을 처리합니다.
STOP: 새 작업을 받지 않고 나오지 않습니다. 작업 대기열의 모든 진행 중인 작업을 중단합니다. 작업
TIDYING: 모든 작업이 종료되었으며 작업자 스레드 수가 0입니다. 이 상태에 도달하면 종료()가 실행됩니다.
TERMINATED: 종료()
ThreadPoolExecutor에서 상태 비트를 나타내기 위해 원자 클래스가 사용됩니다
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
스레드 풀 모델
핵심 매개변수
corePoolSize: 살아남은 작업자 스레드의 최소 수(allowCoreThreadTimeOut이 설정된 경우 값은 0)
maximumPoolSize: CAPACITY로 제한되는 최대 스레드 수
keepAliveTime: 해당 스레드의 생존 시간 스레드에서 시간 단위는 TimeUnit으로 지정됩니다.
workQueue: 작업 대기열, 실행할 작업 저장
RejectExecutionHandler: 거부 전략, 스레드 풀이 가득 차면 트리거됩니다.
스레드 풀의 최대 용량: The CAPACITY의 처음 세 자리는 플래그 비트로 사용됩니다. 즉, 작업 스레드의 최대 용량은 (2^29)-1
4개 모델
CachedThreadPool: 캐시 가능한 스레드 풀입니다. 현재 스레드 풀 크기가 처리 요구량을 초과하면 유휴 스레드가 재활용되며, 수요가 증가하면 새로운 스레드를 추가할 수 있으므로 스레드 풀 크기에는 제한이 없습니다.
FixedThreadPool: 작업이 제출되면 최대 스레드 풀 수에 도달할 때까지 스레드가 생성되며, 이때 스레드 풀의 크기는 더 이상 변경되지 않습니다.
SingleThreadPool: 작업을 실행하는 작업 스레드가 하나만 있습니다. 작업이 대기열의 순서대로 순차적으로 실행되도록 할 수 있습니다. 이 스레드가 비정상적으로 종료되면 새 스레드가 생성됩니다. 작업을 실행합니다.
ScheduledThreadPool: Timer와 유사하게 지연되거나 예약된 방식으로 작업을 수행하는 고정 크기 스레드 풀입니다.
작업 실행 실행
핵심 논리:
현재 스레드 수 < corePoolSize, 작업을 실행하기 위한 새 코어 스레드 직접 시작 addWorker(command, true)
현재 수 of thread> = corePoolSize, 작업이 작업 대기열에 성공적으로 추가되었습니다.
현재 스레드 풀 상태가 RUNNING인지 확인하세요.
그렇지 않으면 작업을 거부하세요.
그렇다면 현재 개수가 RUNNING인지 확인하세요. 스레드 수가 0이고, 0이면 늘리십시오. 작업자 스레드입니다.
일반 스레드를 활성화하여 addWorker(명령, false) 작업을 실행하고 작업 시작에 실패하면 작업을 거부합니다.
위 분석에서 스레드 풀 작업의 4단계를 요약할 수 있습니다.
poolSize < corePoolSize 및 이 때 제출된 작업을 처리하기 위해 새 스레드가 생성됩니다.
poolSize == corePoolSize. 이때 제출된 작업이 작업 대기열에 들어가게 됩니다. 작업자 스레드는 대기열에서 작업 실행을 가져옵니다. 이때 대기열은 비어 있지도 않고 가득 차지도 않습니다.
poolSize == corePoolSize이고 대기열이 가득 차면 제출된 작업을 처리하기 위해 새 스레드가 생성되지만 poolSize < maxPoolSize
poolSize == maxPoolSize이고 대기열이 가득 차면 거부 정책은 다음과 같습니다. Triggered
거부 전략
앞서 실행 불가능한 작업은 거부된다고 언급했습니다. RejectedExecutionHandler는 거부된 작업을 처리하기 위한 인터페이스입니다. 여기 네 가지 거절 전략이 있습니다.
AbortPolicy: 기본 정책, 작업 종료, RejectedException 발생
CallerRunsPolicy: 호출자 스레드에서 현재 작업 실행, 예외 발생 없음
DiscardPolicy: 정책 취소, 작업 직접 삭제, 아니요 예외 발생
DiscardOldersPolicy: 가장 오래된 작업을 취소하고 예외 발생 없이 현재 작업 실행
스레드 풀의 작업자
작업자는 AbstractQueuedSynchronizer 및 Runnable을 상속합니다. 전자는 잠금 기능을 제공합니다. 작업자 스레드를 실행하는 주요 방법은 runWorker(Worker w)(실행을 위해 작업 대기열에서 작업 검색)입니다. 작업자 참조는 작업자 컬렉션에 저장되며 mainLock으로 보호됩니다.
private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>();
핵심 함수 runWorker
다음은 단순화된 논리입니다. 참고: 각 작업자 스레드의 실행은 다음 함수를 실행합니다
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { w.lock(); beforeExecute(wt, task); task.run(); afterExecute(task, thrown); w.unlock(); } processWorkerExit(w, completedAbruptly); }
从getTask()中获取任务
锁住 worker
执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
执行afterExecute(task, thrown);
解锁 worker
如果获取到的任务为 null,关闭 worker
获取任务 getTask
线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。
private final BlockingQueue<Runnable> workQueue;
getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。
现有的线程数量超过最大线程数量
线程池处于STOP状态
线程池处于SHUTDOWN状态且工作队列为空
线程等待任务超时,且线程数量超过保留线程数量
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。
timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
在以下两种情况下等待任务会超时:
允许核心线程等待超时,即allowCoreThreadTimeOut(true)
当前线程是普通线程,此时wc > corePoolSize
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。
总结
ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。