> Java > java지도 시간 > Java 스레드 풀에 대한 자세한 소개

Java 스레드 풀에 대한 자세한 소개

零下一度
풀어 주다: 2017-07-26 16:58:07
원래의
1309명이 탐색했습니다.

이전 기사에서 스레드를 사용할 때 스레드를 생성했습니다. 구현하기는 매우 간단하지만 문제가 있습니다.

동시 스레드가 많고 각 스레드가 일정 시간 동안 실행되는 경우 매우 짧습니다. 작업이 종료되므로 스레드를 자주 생성하면 스레드를 자주 생성하고 삭제하는 데 시간이 걸리기 때문에 시스템 효율성이 크게 저하됩니다.

 그렇다면 스레드를 재사용할 수 있는 방법, 즉 작업을 실행한 후에는 소멸되지 않고 다른 작업을 계속해서 실행할 수 있는 방법이 있을까요?

Java에서는 스레드 풀을 통해 이 효과를 얻을 수 있습니다. 오늘은 Java의 스레드 풀에 대해 먼저 설명하고 핵심 ThreadPoolExecutor 클래스의 메소드부터 시작하여 구현 원리를 설명하고 사용 예를 제공하며 마지막으로 스레드 크기를 합리적으로 구성하는 방법에 대해 설명합니다. 스레드 풀.

1. Java의 ThreadPoolExecutor 클래스

 java.uitl.concurrent.ThreadPoolExecutor 클래스는 스레드 풀의 핵심 클래스이므로 Java의 스레드 풀을 제대로 이해하려면 먼저 이 클래스를 이해해야 합니다. ThreadPoolExecutor 클래스의 구체적인 구현 소스 코드를 살펴보겠습니다.

  ThreadPoolExecutor 클래스에는 4개의 생성자가 제공됩니다:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}</runnable></runnable></runnable></runnable>
로그인 후 복사

위의 코드에서 ThreadPoolExecutor가 AbstractExecutorService 클래스를 상속하고 4개의 생성자 장치를 제공한다는 것을 알 수 있습니다. 각 생성자의 소스 코드에 대한 구체적인 구현을 살펴보면 처음 세 생성자는 호출된 네 번째 생성자가 수행하는 초기화 작업임을 알 수 있습니다.

다음은 생성자에서 각 매개변수의 의미를 설명합니다.

  • corePoolSize: 코어 풀의 크기. 이 매개변수는 나중에 설명하는 스레드 풀의 구현 원리와 큰 관계가 있습니다. 스레드 풀이 생성된 후에는 기본적으로 스레드 풀에 스레드가 없지만, 대신 prestartAllCoreThreads() 또는 prestartCoreThread() 메서드가 호출되지 않는 한 작업이 도착할 때까지 작업을 실행하기 위해 스레드가 생성됩니다. 두 가지 방법, 이름에서 알 수 있듯이 미리 생성된 스레드, 즉 작업이 도착하지 않기 전에 corePoolSize 스레드 또는 하나의 스레드가 생성된다는 의미입니다. 기본적으로 스레드 풀이 생성된 후 스레드 풀에 있는 스레드 수는 0입니다. 작업이 오면 스레드가 생성되어 작업을 실행합니다. 스레드 풀에 있는 스레드 수가 corePoolSize에 도달하면 해당 작업을 수행합니다. 작업은 캐시 대기열에 배치됩니다.

  • maximumPoolSize: 스레드 풀의 최대 스레드 수 또한 이 매개변수는 매우 중요한 매개변수입니다.

  • keepAliveTime: 작업이 실행되지 않을 때 스레드가 지속되는 최대 시간을 나타냅니다. 기본적으로 keepAliveTime은 스레드 풀의 스레드 수가 corePoolSize보다 클 때, 그리고 스레드 풀의 스레드 수가 corePoolSize보다 크지 않을 때까지, 즉 스레드 풀의 스레드 수가 다음과 같을 때까지만 작동합니다. corePoolSize보다 큼, 스레드가 유휴 상태인 경우 시간이 keepAliveTime에 도달하면 스레드 풀의 스레드 수가 corePoolSize를 초과하지 않을 때까지 종료됩니다. 그러나 AllowCoreThreadTimeOut(boolean) 메소드가 호출되면 스레드 풀의 스레드 수가 corePoolSize보다 크지 않은 경우 스레드 풀의 스레드 수가 0이 될 때까지 keepAliveTime 매개변수도 적용됩니다. : keepAliveTime 매개변수의 시간 단위, 7가지 값이 있고 TimeUnit 클래스에는 7가지 정적 속성이 있습니다.

  • TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    로그인 후 복사

workQueue: 작업을 저장하는 데 사용되는 차단 대기열 실행을 기다리는 매개변수의 선택도 매우 중요하며 스레드 풀의 실행 프로세스에 상당한 영향을 미칩니다. 일반적으로 여기의 차단 대기열에는 다음과 같은 옵션이 있습니다:
  • ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;
    로그인 후 복사
  • ArrayBlockingQueue PriorityBlockingQueue는 덜 사용되며 LinkedBlockingQueue 및 동기식은 일반적으로 사용됩니다. 스레드 풀의 대기열 전략은 BlockingQueue와 관련이 있습니다.

threadFactory: 주로 스레드를 생성하는 데 사용되는 스레드 팩토리

  • handler: 다음 네 가지 값을 사용하여 작업 처리를 거부할 때의 전략을 나타냅니다.

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
    로그인 후 복사
  • 특정 매개변수 및 스레드 구성 풀 간의 관계는 다음 섹션에서 다루겠습니다.
위에 제공된 ThreadPoolExecutor 클래스의 코드에서 ThreadPoolExecutor가 AbstractExecutorService를 상속한다는 것을 알 수 있습니다. AbstractExecutorService의 구현을 살펴보겠습니다.

public abstract class AbstractExecutorService implements ExecutorService {
 
     
    protected <t> RunnableFuture<t> newTaskFor(Runnable runnable, T value) { };
    protected <t> RunnableFuture<t> newTaskFor(Callable<t> callable) { };
    public Future> submit(Runnable task) {};
    public <t> Future<t> submit(Runnable task, T result) { };
    public <t> Future<t> submit(Callable<t> task) { };
    private <t> T doInvokeAny(Collection extends Callable<t>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <t> T invokeAny(Collection extends Callable<t>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <t> T invokeAny(Collection extends Callable<t>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <t> List<future>> invokeAll(Collection extends Callable<t>> tasks)
        throws InterruptedException {
    };
    public <t> List<future>> invokeAll(Collection extends Callable<t>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}</t></future></t></t></future></t></t></t></t></t></t></t></t></t></t></t></t></t></t></t></t></t>
로그인 후 복사

   AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

  我们接着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <t> Future<t> submit(Callable<t> task);
    <t> Future<t> submit(Runnable task, T result);
    Future> submit(Runnable task);
    <t> List<future>> invokeAll(Collection extends Callable<t>> tasks)
        throws InterruptedException;
    <t> List<future>> invokeAll(Collection extends Callable<t>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <t> T invokeAny(Collection extends Callable<t>> tasks)
        throws InterruptedException, ExecutionException;
    <t> T invokeAny(Collection extends Callable<t>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}</t></t></t></t></t></future></t></t></future></t></t></t></t></t></t>
로그인 후 복사

   而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

public interface Executor {
    void execute(Runnable command);
}
로그인 후 복사

 

   到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

  在ThreadPoolExecutor类中有几个非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()
로그인 후 복사

   execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

  还有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

二.深入剖析线程池实现原理

  在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解:

  1.线程池状态

  2.任务的执行

  3.线程池中的线程初始化

  4.任务缓存队列及排队策略

  5.任务拒绝策略

  6.线程池的关闭

  7.线程池容量的动态调整

 

1.线程池状态

  在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;
로그인 후 복사

 

   runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

  下面的几个static final变量表示runState可能的几个取值。

  当创建线程池后,初始时,线程池处于RUNNING状态;

  如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

  如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

  当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

  在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
private final BlockingQueue<runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                              //、runState等)的改变都要使用这个锁
private final HashSet<worker> workers = new HashSet<worker>();  //用来存放工作集
 
private volatile long  keepAliveTime;    //线程存活时间   
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
 
private volatile int   poolSize;       //线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数</worker></worker></runnable>
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。

  corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:

  假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

  因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

  当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

  如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

  然后就将任务也分配给这4个临时工人做;

  如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

  当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

 

  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

  不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。

  largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

 

  下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

  在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:

  首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;

  接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
로그인 후 복사

 

   由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

  如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行:

addIfUnderCorePoolSize(command)
로그인 후 복사

 

  如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。

  如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:

if (runState == RUNNING && workQueue.offer(command))
로그인 후 복사
로그인 후 복사

 

   如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

addIfUnderMaximumPoolSize(command)
로그인 후 복사

 

  如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

  回到前面:

if (runState == RUNNING && workQueue.offer(command))
로그인 후 복사
로그인 후 복사

 

   这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

if (runState != RUNNING || poolSize == 0)
로그인 후 복사

 

   这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:

ensureQueuedTaskHandled(command)
로그인 후 복사

 

   进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。

  我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize <div class="cnblogs_code_toolbar"><span class="cnblogs_code_copy"><img src="https://img.php.cn/upload/article/000/000/001/58ed2a422c9a14f121f5e3e7d28e9686-26.gif" alt="Java 스레드 풀에 대한 자세한 소개"></span></div><div class="cnblogs_code_toolbar"><span class="cnblogs_code_copy"><img src="https://img.php.cn/upload/article/000/000/001/1276c355cb927d82618e5054a0e1884f-27.gif" alt="Java 스레드 풀에 대한 자세한 소개"></span></div>
로그인 후 복사

 

   这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心吃大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行

t = addThread(firstTask);
로그인 후 복사

 

   这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。

  我们来看一下addThread方法的实现:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务   
    if (t != null) {
        w.thread = t;            //将创建的线程的引用赋值为w的成员变量       
        workers.add(w);
        int nt = ++poolSize;     //当前线程数加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。

  下面我们看一下Worker类的实现:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState = STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
            //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工作       
        }
    }
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:

Thread t = new Thread(w);
로그인 후 복사

 

   相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。

  既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
                //则通过poll取任务,若等待一定的时间取不到任务,则返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果没取到任务,即r为null,则判断当前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中断处于空闲状态的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

  如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

  如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

  然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大于等于STOP,或者任务缓存队列为空了
    //或者  允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
                                //如果成功获取了锁,说明当前worker处于空闲状态
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

    这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

   我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize <div class="cnblogs_code_toolbar"><span class="cnblogs_code_copy"><img src="https://img.php.cn/upload/article/000/000/001/51891deca72fd64c2c5374bd40d6f29a-58.gif" alt="Java 스레드 풀에 대한 자세한 소개"></span></div><div class="cnblogs_code_toolbar"><span class="cnblogs_code_copy"><img src="https://img.php.cn/upload/article/000/000/001/51891deca72fd64c2c5374bd40d6f29a-59.gif" alt="Java 스레드 풀에 대한 자세한 소개"></span></div>
로그인 후 복사

 

   看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize

  到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含义;

  2)其次,要知道Worker是用来起到什么作用的;

  3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

3.线程池中的线程初始化

  默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;

  • prestartAllCoreThreads():初始化所有核心线程

  下面是这2个方法的实现:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
        ++n;
    return n;
}
로그인 후 복사
Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개

 

   注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take();
로그인 후 복사

 

   即等待任务队列中有任务。

4.任务缓存队列及排队策略

  在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

  workQueue的类型为BlockingQueue,通常可以取下面三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

5.任务拒绝策略

  当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
로그인 후 복사

 

6.线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

7.线程池容量的动态调整

  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小

  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

三.使用示例

  前面我们讨论了关于线程池的实现原理,这一节我们来看一下它的具体使用:

Java 스레드 풀에 대한 자세한 소개
Java 스레드 풀에 대한 자세한 소개
public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<runnable>(5));
          
         for(int i=0;i</runnable>
로그인 후 복사

위 내용은 Java 스레드 풀에 대한 자세한 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿