> Java > java지도 시간 > 본문

Java 스레드 풀의 그래픽 코드에 대한 자세한 설명

黄舟
풀어 주다: 2017-09-25 10:56:19
원래의
2035명이 탐색했습니다.

스레드 풀은 Java에서 중요한 지식 포인트입니다. 여기서는 Java와 함께 제공되는 스레드 풀을 예로 들어 기록하고 분석해 보겠습니다. 이 기사에서는 Java 동시 프로그래밍(스레드 풀 사용, Java 스레드 풀---addWorker 메소드 분석, 스레드 풀, ThreadPoolExecutor의 전략 선택 및 작업 대기열 선택(Java 스레드 풀), ThreadPoolExecutor 스레드 풀 분석 및 세 가지)에 대해 설명합니다. BlockingQueue 유형을 수행합니다. 본 글은 JDK1.8을 기반으로 유스케이스와 소스코드 분석을 구현하고, 주로 프로세스에 초점을 맞춘다.

1. 스레드 풀 사용하기

어떤 것의 원리를 알려면 먼저 사용법을 알아야 합니다. 그럼 스레드 풀을 사용하는 예제부터 시작해 보겠습니다.

1. 태스크 클래스

Java와 함께 제공되는 스레드 풀을 사용하려면 먼저 태스크 클래스가 필요합니다. 이 태스크 클래스는 Runnable 인터페이스를 구현하고 실행 메서드(멀티 스레드 실행이 필요한 태스크 로직)를 다시 작성해야 합니다.

package org.my.threadPoolDemo;
/**
 * 任务类,实现Runnable接口 重写run方法
 */
public class MyTask implements Runnable{
    
    private int taskNum;
    
    public MyTask(int taskNum) {
        super();
        this.taskNum = taskNum;
    }

    @Override
    public void run() {
        System.out.println("正在执行task"+taskNum);
        try {
            Thread.currentThread().sleep(4000);//sleep 4秒模拟执行代码过程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task"+taskNum+"执行完毕");
    }
}
로그인 후 복사

2. 스레드 풀을 생성하고 여러 작업을 실행합니다

다음으로 작업 클래스를 사용하여 스레드 풀을 생성하고 여러 작업을 실행합니다. ThreadPoolExecutor를 사용하여 스레드 풀을 생성합니다.

package org.my.threadPoolDemo;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUseDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数量:"+executor.getPoolSize()+",线程池中等待执行的任务数量:"+executor.getQueue().size()+",已执行完的任务数量:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }

}
로그인 후 복사

논리는 매우 간단합니다. 스레드 풀 관리자를 만든 다음 이를 사용하여 15개 작업을 실행합니다. 여기에서는 ThreadPoolExecutor를 생성할 때 전달된 매개변수의 의미를 설명해야 합니다.

5(corePoolSize)는 코어 풀 크기를 나타냅니다. 즉, 생성된 스레드 수입니다. 스레드 풀의 스레드 수가 이 숫자와 같으면 다음 작업이 작업 대기열에 배치됩니다(나중에 자세히 설명).

10(maximumPoolSize)은 스레드 풀이 생성할 수 있는 최대 스레드 수를 나타냅니다. 이전 단계의 작업 큐가 가득 차면 스레드 풀은 스레드 수 = 10이 될 때까지 스레드를 계속 생성하고 다음 작업의 실행이 거부됩니다.

200(keepAliveTime)은 실행할 작업이 없을 때 스레드가 종료되기 전까지 유지할 ​​수 있는 최대 시간을 나타냅니다. 기본적으로 keepAliveTime은 스레드 풀의 스레드 수가 corePoolSize보다 클 때, 그리고 스레드 풀의 스레드 수가 corePoolSize보다 크지 않을 때까지, 즉 스레드 풀의 스레드 수가 다음과 같을 때까지만 작동합니다. corePoolSize보다 큼, 스레드가 유휴 상태인 경우 시간이 keepAliveTime에 도달하면 스레드 풀의 스레드 수가 corePoolSize를 초과하지 않을 때까지 종료됩니다. 그러나 AllowCoreThreadTimeOut(boolean) 메서드가 호출되면 스레드 풀의 스레드 수가 corePoolSize보다 크지 않은 경우 스레드 풀의 스레드 수가 0이 될 때까지 keepAliveTime 매개 변수도 작동합니다.

TimeUnit.MILLISECONDS는 keepAliveTime의 시간 단위입니다.

TimeUnit.DAYS; //Milliseconds
TimeUnit.MICROSECONDS; //Subtle
TimeUnit.NANOSECONDS; //Nanosecond


ArrayBlockingQueue는 작업 대기열 workQueue를 저장하는 데 사용되는 수신 차단 대기열입니다. 즉, 위에서 언급한 작업 대기열입니다. ArrayBlockingQueue 외에도 LinkedBlockingQueue 및SynchronousQueue도 선택할 수 있습니다.

ThreadPoolExecutor에는 위에서 전달된 매개변수 외에도 전달될 수 있는 다른 생성자가 있습니다.

threadFactory: 주로 스레드를 생성하는 데 사용되는 스레드 팩토리

handler: 작업 실행을 거부하는 전략을 나타냅니다. , 4가지 값이 있습니다:

ThreadPoolExecutor.AbortPolicy: 작업을 삭제하고 RejectedExecutionException을 발생시킵니다.

ThreadPoolExecutor.DiscardPolicy: 작업도 삭제하지만 예외를 발생시키지는 않습니다.

ThreadPoolExecutor.DiscardOldestPolicy: 대기열의 맨 앞에 있는 작업을 삭제한 다음 작업을 다시 실행해 봅니다(이 프로세스를 반복).

ThreadPoolExecutor.CallerRunsPolicy: 작업은 호출 스레드에 의해 처리됩니다.


위 프로그램을 실행하면 결과는 다음과 같습니다.

正在执行task0
线程池中线程数量:1,线程池中等待执行的任务数量:0,已执行完的任务数量:0
线程池中线程数量:2,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task1
线程池中线程数量:3,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task2
线程池中线程数量:4,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task3
线程池中线程数量:5,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task4
线程池中线程数量:5,线程池中等待执行的任务数量:1,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:2,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:3,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:4,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:6,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task10
线程池中线程数量:7,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task11
线程池中线程数量:8,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task12
正在执行task13
线程池中线程数量:9,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:10,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task14
task1执行完毕
task10执行完毕
正在执行task5
task11执行完毕
task13执行完毕
task4执行完毕
task3执行完毕
task2执行完毕
task0执行完毕
正在执行task9
正在执行task8
正在执行task7
task14执行完毕
task12执行完毕
正在执行task6
task5执行完毕
task9执行完毕
task7执行完毕
task8执行完毕
task6执行完毕
로그인 후 복사

알겠습니다. corePoolSize가 5이므로 작업 수가 5보다 크면 다음 작업이 작업 대기열에 들어가 대기합니다. queue가 5이고 maximumPoolSize=10이면 작업 대기열이 가득 찬 후 스레드 풀 관리자는 계속해서 5개의 스레드를 추가로 생성하고 마침내 스레드 풀의 스레드 수가 10에 도달합니다. 이때 이 스레드에서는 15개의 작업을 처리할 수 있습니다. for 루프 수를 20개로 늘리는 등 작업이 추가되면 java.util.concurrent.RejectedExecutionException이 발생합니다.


2. 원리 분석

위의 스레드 풀 사용 예에서 가장 중요한 두 단계는 ThreadPoolExecutor 개체를 구성한 다음 작업이 올 때마다 ThreadPoolExecutor 개체의 실행 메서드를 호출하는 것입니다.

1. ThreadPoolExecutor 구조

ThreadPoolExecutor의 주요 구조와 상속 관계는 아래 그림과 같습니다.

ThreadPoolExecutor 구조와 상속 관계Java 스레드 풀의 그래픽 코드에 대한 자세한 설명

主要成员变量:任务队列——存放那些暂时无法执行的任务;工作线程池——存放当前启用的所有线程;线程工厂——创建线程;还有一些用来调度线程与任务并保证线程安全的成员。

了解了ThreadPoolExecutor的主要结构,再简单梳理一下“一个传入线程池的任务能够被最终正常执行需要经过的主要流程”,方法名称前面没有“XXX.”这种标注的都是ThreadPoolExecutor的方法:

Java 스레드 풀의 그래픽 코드에 대한 자세한 설명

Java 스레드 풀의 그래픽 코드에 대한 자세한 설명

2、ThreadPoolExecutor构造器及重要常量

简单了解下构造器,ThreadPoolExecutor的四个构造器的源码如下:


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
로그인 후 복사

从源码中可以看出,这四个构造器都是调用最后一个构造器,只是根据开发者传入的参数的不同而填充一些默认的参数。比如如果开发者没有传入线程工厂threadFactory参数,那么构造器就使用默认的Executors.defaultThreadFactor。

在这里还要理解ThreadPoolExecutor的几个常量的含义和几个简单方法:


//Integer.SIZE是一个静态常量,值为32,也就是说COUNT_BITS是29
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY是最大容量536870911,因为1左移29位之后-1,导致最高三位为0,而其余29位全部为1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//ctl用于表示线程池状态和有效线程数量,最高三位表示线程池的状态,其余低位表示有效线程数量
//初始化之后ctl等于RUNNING的值,即默认状态是运行状态,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//1110 0000 0000 0000 0000 0000 0000 0000最高三位为111
private static final int RUNNING    = -1 << COUNT_BITS;
//最高三位为000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000最高三位为001
private static final int STOP       =  1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000最高三位为010
private static final int TIDYING    =  2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000最高三位为011
private static final int TERMINATED =  3 << COUNT_BITS;
/**
* 获取运行状态,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当取反的时候,就只有最高三位为1,再经过与运算,只会取到ctl的最高三位
* 而这最高三位如上文所述,表示线程池的状态
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/**
* 获取工作线程数量,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当进行与运算的时候,只会取到低29位,这29位正好表示有效线程数量
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列,用于存放待执行任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
/** 判断线程池是否是运行状态,传入的参数是ctl的值
* 只有RUNNING的符号位是1,也就是只有RUNNING为负数
* 所以如果目前的ctl值<0,就是RUNNING状态
*/
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//从任务队列中移除任务
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
로그인 후 복사

3、getTask()源代码

通过前面的流程图,我们知道getTask()是由runWorker方法调用的,目的是取出一个任务。

private Runnable getTask() {
        //记录循环体中上个循环在从阻塞队列中取任务时是否超时
        boolean timedOut = false; 
        //无条件循环,主要是在线程池运行正常情况下
        //通过循环体内部的阻塞队列的阻塞时间,来控制当前线程的超时时间
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池状态

            /*先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一
             *这时候不管队列中有没有任务,都不用去执行了;
             *如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了
             *将工作线程数量-1,并且返回null
             **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取工作线程数量
            int wc = workerCountOf(c);

            //是否启用超时参数keepAliveTime
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果这个条件成立,如果工作线程数量-1成功,返回null,否则跳出循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。
            //这里通过阻塞时间,间接控制了超时的值,如果取值超时,意味着这个线程在超时时间内处于空闲状态
            //那么下一个循环,将会return null并且线程数量-1
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
로그인 후 복사

4、runWorker(Worker w)源代码

通过上面流程图,可以看出runWorker(Worker w)实际上已经是在线程启动之后执行任务了,所以其主要逻辑就是获取任务,然后执行任务的run方法。


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//获取传入的Worker对象中的任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
                /* 
                 * 如果传入的任务为null,就从任务队列中获取任务,只要这两者有一个不为null,就进入循环体
                 */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态,
                //也不允许执行任务,还需要中断该线程!
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//为子类预留的空方法(钩子)
                    Throwable thrown = null;
                    try {
                        task.run();//终于真正执行传入的任务了
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//为子类预留的空方法(钩子)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
                /*
                 * 从这里可以看出,当一个任务执行完毕之后,循环并没有退出,此时会再次执行条件判断
                 * 也就是说如果执行完第一个任务之后,任务队列中还有任务,那么将会继续在这个线程执行。
                 * 这里也是很巧妙的地方,不需要额外开一个控制线程来看那些线程处于空闲状态,然后给他分配任务。
                 * 直接自己去任务队列拿任务
                 */
            }
            completedAbruptly = false;
        } finally {
                /*
                 * 执行到这里,说明getTask返回了null,要么是超时(任务队列没有任务了),要么是线程池状态有问题了
                 * 当前线程将被回收了
                 */
            processWorkerExit(w, completedAbruptly);
        }
    }
로그인 후 복사

5、addWorker(Runnable firstTask, boolean core)源代码

runWorker是从Worker对象中获取第一个任务,然后从任务队列中一直获取任务执行。流程图中已经说过,这个Worker对象是在addWorker方法中创建的,所以新线程创建、启动的源头是在addWorker方法中。而addWorker是被execute所调用,execute根据addWorker的返回值,进行条件判断。

private boolean addWorker(Runnable firstTask, boolean core) {
        //上来就是retry,后面continue retry;语句执行之后都会从这里重新开始
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池运行状态

            /*
             * 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false
             * 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//获取工作线程数量
                /*
                 * addWorker传入的第二个Boolean参数用来判别当前线程数量是否大于核心池数量
                 * true,代表当前线程数量小于核心池数量
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                        //如果成功,跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                //判断线程池状态,改变了就重试一次
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
                /*
                 * 前面顺利增加了工作线程数,那么这里就真正创建Worker
                 * 上面流程图中说过,创建Worker时会创建新的线程.如果这里创建失败
                 * finally中会将工作线程数-1
                 */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将Worker放入工作线程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
로그인 후 복사

6、execute方法

execute方法其实最主要是根据线程池的策略来传递不同的参数给addWorker方法。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:

a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

方法execute主要是在控制上面四条策略的实现。


 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取到成员变量ctl的值(线程池状态)
        int c = ctl.get();
        //如果工作线程的数量<核心池的大小
        if (workerCountOf(c) < corePoolSize) {
            //调用addWorker(这里传入的true代表工作线程数量<核心池大小)
            //如果成功,方法结束。
            if (addWorker(command, true))
                return;
            //否则,再重新获取一次ctl的值
            //个人理解是防止前面这段代码执行的时候有其他线程改变了ctl的值。
            c = ctl.get();
        }
        //如果工作线程数量>=核心池的大小或者上一步调用addWorker返回false,继续走到下面
        //如果线程池处于运行状态,并且成功将当前任务放入任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            //为了线程安全,重新获取ctl的值
            int recheck = ctl.get();
            //如果线程池不处于运行状态并且任务从任务队列移除成功
            if (! isRunning(recheck) && remove(command))
                //调用reject拒绝执行,根据handler的实现类抛出异常或者其他操作
                reject(command);
            //否则,如果工作线程数量==0,调用addWorker并传入null和false
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false)
        //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。
        //如果还是提交任务失败则调用reject处理失败任务
        else if (!addWorker(command, false))
            reject(command);
    }
로그인 후 복사

三、线程池相关影响因素

1、阻塞队列的选择

本段引用至ThreadPoolExecutor中策略的选择与工作队列的选择(java线程池)

直接提交:
工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不存储它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列:
使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

제한된 대기열:
제한된 대기열(예: ArrayBlockingQueue)은 제한된 maximumPoolSize를 사용할 때 리소스 고갈을 방지하는 데 도움이 되지만 조정 및 제어가 더 어려울 수 있습니다. 대기열 크기와 최대 풀 크기에는 절충이 필요할 수 있습니다. 큰 대기열과 작은 풀을 사용하면 CPU 사용량, 운영 체제 리소스 및 컨텍스트 전환 오버헤드를 최소화할 수 있지만 인위적으로 처리량이 줄어들 수 있습니다. 작업이 자주 차단되는 경우(예: I/O 경계인 경우) 시스템은 사용자가 허용하는 것보다 더 많은 스레드를 예약할 수 있습니다. 작은 대기열을 사용하려면 일반적으로 더 큰 풀 크기와 더 높은 CPU 사용량이 필요하지만 허용할 수 없는 일정 오버헤드가 발생할 수 있으며 이로 인해 처리량이 줄어들 수도 있습니다.

2. 일반적으로 사용되는 스레드 풀

ThreadPoolExecutor를 사용하여 스레드 풀을 생성할 때 다양한 생성자를 사용하여 다양한 매개변수에 따라 다양한 특성을 가진 스레드 풀을 구성할 수 있습니다. 실제 상황에서는 일반적으로 Executors 클래스에서 제공하는 메서드를 사용하여 스레드 풀을 생성합니다. 실행자는 궁극적으로 ThreadPoolExecutor의 생성자를 호출하지만 관련 매개변수가 구성되었습니다.

1) 고정 크기 스레드 풀: Executors.newFixedThreadPool

coresize는 maxsize와 동일하고 제한 시간은 0이며 대기열은 LinkedBlockingQueue의 무제한 FIFO 대기열을 사용합니다. 즉, 코어 크기 스레드만 이 스레드에서 항상 실행되고 있음을 의미합니다. 수영장. 이전 분석에 따르면 작업이 실행되면 이 스레드는 실행을 위해 작업 대기열에서 작업을 계속 가져옵니다. 작업이 없으면 스레드가 즉시 닫힙니다.

2) 단일 작업 스레드 풀: newSingleThreadExecutor

풀에는 스레드가 하나만 작동하며 차단 대기열은 제한이 없습니다. 작업이 제출된 순서대로 실행되도록 할 수 있습니다.

3) 가변 크기 스레드 풀: newCachedThreadPool

SynchronousQueue 큐, 요소를 저장하지 않는 차단 큐입니다. 각 삽입 작업은 다른 스레드가 제거 작업을 호출할 때까지 기다려야 합니다. 따라서 첫 번째 작업을 제출하면 대기열에 참가할 수 없습니다. 이는 "큐에 참가할 수 없고 작업이 최대 크기에 도달하지 않으면 새 스레드 작업을 시작합니다."라는 스레드 풀 조건을 충족합니다. 따라서 maxsize는 ctl의 하위 29비트의 최대값입니다. 제한 시간은 60초입니다. 스레드에 실행할 작업이 없으면 60초 제한 시간이 임시로 저장됩니다. 새 작업이 없으면 스레드 풀에서 제거됩니다.

4) 예약된 스레드 풀, newScheduledThreadPool

은 무제한 크기의 스레드 풀을 생성합니다. 이 스레드 풀은 작업의 타이밍과 주기적인 실행에 대한 필요성을 지원합니다

드디어 거의 끝나가는 것 같습니다. 특히 재진입 잠금의 수가 많기 때문에 아직 세부적으로 설명되지 않은 부분이 많은 것 같습니다. 기회가 되면 보충하겠습니다. 저는 개인적으로 스레드 풀을 소개하는 많은 블로그보다 이해하기 쉽다고 생각합니다. 틀린 부분이 있으면 비판과 정정을 환영합니다.

위 내용은 Java 스레드 풀의 그래픽 코드에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿