Javaのスレッドプールのグラフィックコードの詳細説明

黄舟
リリース: 2017-09-25 10:56:19
オリジナル
2043 人が閲覧しました

スレッドプールはJavaの重要な知識ポイントです。ここでは、Javaに付属するスレッドプールを例として記録し、分析します。この記事では、Java 並行プログラミングについて説明します。スレッド プールの使用、Java スレッド プール ---addWorker メソッド分析、スレッド プール、ThreadPoolExecutor での戦略の選択とワーク キュー (Java スレッド プール) の選択、および ThreadPoolExecutor スレッド プール分析と 3 つについて説明します。 BlockingQueue のタイプを達成します。この記事では、JDK1.8をベースにしたユースケースとソースコード解析を実装し、そのプロセスを中心に説明します。

1. スレッドプールの使用

何かの原理を知るには、まずその使用方法を知る必要があります。それでは、スレッド プールの使用例から始めましょう。

1. タスククラス

Javaに付属のスレッドプールを使用するには、まずタスククラスが必要です。このタスククラスはRunnableインターフェースを実装し、runメソッド(マルチスレッド実行を必要とするタスクロジック)を書き直す必要があります。

package org.my.threadPoolDemo;
/**
 * 任务类,实现Runnable接口 重写run方法
 */
public class MyTask implements Runnable{
    
    private int taskNum;
    
    public MyTask(int taskNum) {
        super();
        this.taskNum = taskNum;
    }

    @Override
    public void run() {
        System.out.println("正在执行task"+taskNum);
        try {
            Thread.currentThread().sleep(4000);//sleep 4秒模拟执行代码过程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task"+taskNum+"执行完毕");
    }
}
ログイン後にコピー

2. スレッドプールを作成して複数のタスクを実行します

タスククラスを使用して、次にスレッドプールを作成し、複数のタスクを実行します。 ThreadPoolExecutor を使用してスレッド プールを作成します。

package org.my.threadPoolDemo;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUseDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数量:"+executor.getPoolSize()+",线程池中等待执行的任务数量:"+executor.getQueue().size()+",已执行完的任务数量:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }

}
ログイン後にコピー

ロジックは非常に単純で、スレッド プール マネージャーを作成し、それを使用して 15 個のタスクを実行します。ここで、ThreadPoolExecutor を構築するときに渡されるパラメーターの意味を説明する必要があります。

5 (corePoolSize) は、コア プールのサイズを指します。つまり、作成されたスレッドの数です。スレッド プール内のスレッドの数がこの数と等しい場合、次に来るタスクはタスク キューに配置されます (後で詳しく説明します)。

10 (maximumPoolSize) は、スレッド プールが作成できるスレッドの最大数を指します。前のステップのタスク キューがいっぱいの場合、スレッド プールはスレッド数 = 10 になるまでスレッドの作成を続け、次のタスクの実行は拒否されます。

200 (keepAliveTime) は、実行するタスクがない場合にスレッドが終了するまでに保持できる最大時間を指します。デフォルトでは、keepAliveTime は、スレッド プール内のスレッドの数が corePoolSize より大きい場合、およびスレッド プール内のスレッドの数が corePoolSize 以下になるまで、つまりスレッド プール内のスレッドの数が以下の場合にのみ機能します。 corePoolSize より大きい (スレッドがアイドル状態の場合) 時間が keepAliveTime に達すると、スレッド プール内のスレッド数が corePoolSize を超えなくなるまで終了します。ただし、スレッド プール内のスレッド数が corePoolSize 以下のときに、allowCoreThreadTimeOut(boolean) メソッドが呼び出された場合、スレッド プール内のスレッド数が 0 になるまで keepAliveTime パラメータも機能します。

TimeUnit.MILLISECONDS は keepAliveTime の時間単位です。

TimeUnit.DAYS; //ミリ秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //Nano Second


ArrayBlockingQueue は、タスク キュー workQueue を格納するために使用されます。つまり、前述のタスクキューです。 ArrayBlockingQueue に加えて、LinkedBlockingQueue と SynchronousQueue も選択できます。

ThreadPoolExecutor には、上記で渡されるパラメーターに加えて、次のコンストラクターもあります。

threadFactory: 主にスレッドの作成に使用されるスレッド ファクトリ

handler: タスクの実行を拒否するための戦略を示します。 、値は 4 つあります:

ThreadPoolExecutor.AbortPolicy: タスクを破棄し、RejectedExecutionException をスローします。

ThreadPoolExecutor.DiscardPolicy: タスクも破棄しますが、例外はスローしません。

ThreadPoolExecutor.DiscardOldestPolicy: キュー内の最前部のタスクを破棄し、再度タスクの実行を試みます (このプロセスを繰り返します)

ThreadPoolExecutor.CallerRunsPolicy: タスクは呼び出しスレッドによって処理されます


上記のプログラムを実行すると、結果は次のようになります

正在执行task0
线程池中线程数量:1,线程池中等待执行的任务数量:0,已执行完的任务数量:0
线程池中线程数量:2,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task1
线程池中线程数量:3,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task2
线程池中线程数量:4,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task3
线程池中线程数量:5,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task4
线程池中线程数量:5,线程池中等待执行的任务数量:1,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:2,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:3,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:4,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:6,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task10
线程池中线程数量:7,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task11
线程池中线程数量:8,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task12
正在执行task13
线程池中线程数量:9,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:10,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task14
task1执行完毕
task10执行完毕
正在执行task5
task11执行完毕
task13执行完毕
task4执行完毕
task3执行完毕
task2执行完毕
task0执行完毕
正在执行task9
正在执行task8
正在执行task7
task14执行完毕
task12执行完毕
正在执行task6
task5执行完毕
task9执行完毕
task7执行完毕
task8执行完毕
task6执行完毕
ログイン後にコピー

OK ご覧のとおり、corePoolSize が 5 であるため、タスクの数が 5 を超えると、次のタスクがタスクキューに入れられて待機します。ただし、タスクの最大容量があるためです。 queue が 5 で、maximumPoolSize=10 の場合、タスク キューがいっぱいになった後、スレッド プール マネージャーはさらに 5 つのスレッドを作成し続け、最終的にスレッド プール内のスレッドの数は 10 に達します。現時点では、これらのスレッドで処理できるタスクは 15 個です。for ループの数を 20 に増やすなど、さらにタスクを追加すると、java.util.concurrent.RejectedExecutionException が発生します。


2. 原理分析

スレッド プールを使用する上記の例から、最も重要な 2 つの手順は、ThreadPoolExecutor オブジェクトを構築し、タスクが来るたびに ThreadPoolExecutor オブジェクトの実行メソッドを呼び出すことです。

1. ThreadPoolExecutor の構造

ThreadPoolExecutor の主な構造と継承関係を次の図に示します。

主要成员变量:任务队列——存放那些暂时无法执行的任务;工作线程池——存放当前启用的所有线程;线程工厂——创建线程;还有一些用来调度线程与任务并保证线程安全的成员。

了解了ThreadPoolExecutor的主要结构,再简单梳理一下“一个传入线程池的任务能够被最终正常执行需要经过的主要流程”,方法名称前面没有“XXX.”这种标注的都是ThreadPoolExecutor的方法:

Javaのスレッドプールのグラフィックコードの詳細説明

Javaのスレッドプールのグラフィックコードの詳細説明

2、ThreadPoolExecutor构造器及重要常量

简单了解下构造器,ThreadPoolExecutor的四个构造器的源码如下:


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
ログイン後にコピー

从源码中可以看出,这四个构造器都是调用最后一个构造器,只是根据开发者传入的参数的不同而填充一些默认的参数。比如如果开发者没有传入线程工厂threadFactory参数,那么构造器就使用默认的Executors.defaultThreadFactor。

在这里还要理解ThreadPoolExecutor的几个常量的含义和几个简单方法:


//Integer.SIZE是一个静态常量,值为32,也就是说COUNT_BITS是29
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY是最大容量536870911,因为1左移29位之后-1,导致最高三位为0,而其余29位全部为1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//ctl用于表示线程池状态和有效线程数量,最高三位表示线程池的状态,其余低位表示有效线程数量
//初始化之后ctl等于RUNNING的值,即默认状态是运行状态,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//1110 0000 0000 0000 0000 0000 0000 0000最高三位为111
private static final int RUNNING    = -1 << COUNT_BITS;
//最高三位为000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000最高三位为001
private static final int STOP       =  1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000最高三位为010
private static final int TIDYING    =  2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000最高三位为011
private static final int TERMINATED =  3 << COUNT_BITS;
/**
* 获取运行状态,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当取反的时候,就只有最高三位为1,再经过与运算,只会取到ctl的最高三位
* 而这最高三位如上文所述,表示线程池的状态
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/**
* 获取工作线程数量,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当进行与运算的时候,只会取到低29位,这29位正好表示有效线程数量
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列,用于存放待执行任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
/** 判断线程池是否是运行状态,传入的参数是ctl的值
* 只有RUNNING的符号位是1,也就是只有RUNNING为负数
* 所以如果目前的ctl值<0,就是RUNNING状态
*/
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//从任务队列中移除任务
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
ログイン後にコピー

3、getTask()源代码

通过前面的流程图,我们知道getTask()是由runWorker方法调用的,目的是取出一个任务。

private Runnable getTask() {
        //记录循环体中上个循环在从阻塞队列中取任务时是否超时
        boolean timedOut = false; 
        //无条件循环,主要是在线程池运行正常情况下
        //通过循环体内部的阻塞队列的阻塞时间,来控制当前线程的超时时间
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池状态

            /*先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一
             *这时候不管队列中有没有任务,都不用去执行了;
             *如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了
             *将工作线程数量-1,并且返回null
             **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取工作线程数量
            int wc = workerCountOf(c);

            //是否启用超时参数keepAliveTime
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果这个条件成立,如果工作线程数量-1成功,返回null,否则跳出循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。
            //这里通过阻塞时间,间接控制了超时的值,如果取值超时,意味着这个线程在超时时间内处于空闲状态
            //那么下一个循环,将会return null并且线程数量-1
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
ログイン後にコピー

4、runWorker(Worker w)源代码

通过上面流程图,可以看出runWorker(Worker w)实际上已经是在线程启动之后执行任务了,所以其主要逻辑就是获取任务,然后执行任务的run方法。


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//获取传入的Worker对象中的任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
                /* 
                 * 如果传入的任务为null,就从任务队列中获取任务,只要这两者有一个不为null,就进入循环体
                 */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态,
                //也不允许执行任务,还需要中断该线程!
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//为子类预留的空方法(钩子)
                    Throwable thrown = null;
                    try {
                        task.run();//终于真正执行传入的任务了
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//为子类预留的空方法(钩子)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
                /*
                 * 从这里可以看出,当一个任务执行完毕之后,循环并没有退出,此时会再次执行条件判断
                 * 也就是说如果执行完第一个任务之后,任务队列中还有任务,那么将会继续在这个线程执行。
                 * 这里也是很巧妙的地方,不需要额外开一个控制线程来看那些线程处于空闲状态,然后给他分配任务。
                 * 直接自己去任务队列拿任务
                 */
            }
            completedAbruptly = false;
        } finally {
                /*
                 * 执行到这里,说明getTask返回了null,要么是超时(任务队列没有任务了),要么是线程池状态有问题了
                 * 当前线程将被回收了
                 */
            processWorkerExit(w, completedAbruptly);
        }
    }
ログイン後にコピー

5、addWorker(Runnable firstTask, boolean core)源代码

runWorker是从Worker对象中获取第一个任务,然后从任务队列中一直获取任务执行。流程图中已经说过,这个Worker对象是在addWorker方法中创建的,所以新线程创建、启动的源头是在addWorker方法中。而addWorker是被execute所调用,execute根据addWorker的返回值,进行条件判断。

private boolean addWorker(Runnable firstTask, boolean core) {
        //上来就是retry,后面continue retry;语句执行之后都会从这里重新开始
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池运行状态

            /*
             * 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false
             * 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//获取工作线程数量
                /*
                 * addWorker传入的第二个Boolean参数用来判别当前线程数量是否大于核心池数量
                 * true,代表当前线程数量小于核心池数量
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                        //如果成功,跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                //判断线程池状态,改变了就重试一次
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
                /*
                 * 前面顺利增加了工作线程数,那么这里就真正创建Worker
                 * 上面流程图中说过,创建Worker时会创建新的线程.如果这里创建失败
                 * finally中会将工作线程数-1
                 */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将Worker放入工作线程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
ログイン後にコピー

6、execute方法

execute方法其实最主要是根据线程池的策略来传递不同的参数给addWorker方法。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:

a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

方法execute主要是在控制上面四条策略的实现。


 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取到成员变量ctl的值(线程池状态)
        int c = ctl.get();
        //如果工作线程的数量<核心池的大小
        if (workerCountOf(c) < corePoolSize) {
            //调用addWorker(这里传入的true代表工作线程数量<核心池大小)
            //如果成功,方法结束。
            if (addWorker(command, true))
                return;
            //否则,再重新获取一次ctl的值
            //个人理解是防止前面这段代码执行的时候有其他线程改变了ctl的值。
            c = ctl.get();
        }
        //如果工作线程数量>=核心池的大小或者上一步调用addWorker返回false,继续走到下面
        //如果线程池处于运行状态,并且成功将当前任务放入任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            //为了线程安全,重新获取ctl的值
            int recheck = ctl.get();
            //如果线程池不处于运行状态并且任务从任务队列移除成功
            if (! isRunning(recheck) && remove(command))
                //调用reject拒绝执行,根据handler的实现类抛出异常或者其他操作
                reject(command);
            //否则,如果工作线程数量==0,调用addWorker并传入null和false
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false)
        //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。
        //如果还是提交任务失败则调用reject处理失败任务
        else if (!addWorker(command, false))
            reject(command);
    }
ログイン後にコピー

三、线程池相关影响因素

1、阻塞队列的选择

本段引用至ThreadPoolExecutor中策略的选择与工作队列的选择(java线程池)

直接提交:
工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不存储它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列:
使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

境界付きキュー:
境界付きキュー (ArrayBlockingQueue など) は、制限された MaximumPoolSize を使用するときにリソースの枯渇を防ぐのに役立ちますが、調整と制御が難しくなる可能性があります。キュー サイズと最大プール サイズにはトレードオフが必要な場合があります。大きなキューと小さなプールを使用すると、CPU 使用率、オペレーティング システム リソース、コンテキスト スイッチングのオーバーヘッドを最小限に抑えることができますが、スループットが人為的に低下する可能性があります。タスクが頻繁にブロックされる場合 (たとえば、I/O 制限がある場合)、システムは、許可されているよりも多くのスレッドをスケジュールする可能性があります。小さなキューを使用すると、通常、より大きなプール サイズとより高い CPU 使用率が必要になりますが、許容できないスケジューリング オーバーヘッドが発生する可能性があり、これによりスループットも低下する可能性があります。

2. 一般的に使用されるスレッド プール

ThreadPoolExecutor を使用してスレッド プールを作成する場合、さまざまなコンストラクターを使用して、さまざまなパラメーターに従ってさまざまな特性を持つスレッド プールを構築できます。実際の状況では、通常、Executors クラスが提供するメソッドを使用してスレッド プールを作成します。エグゼキュータは最終的に ThreadPoolExecutor のコンストラクタを呼び出しますが、関連するパラメータはすでに構成されています。

1) 固定サイズのスレッド プール: Executors.newFixedThreadPool

coresize は maxsize と同じで、タイムアウトは 0 で、キューは LinkedBlockingQueue の無制限の FIFO キューを使用します。つまり、このスレッドではコアサイズのスレッドのみが常に実行されます。プール。前の分析によると、タスクが実行されると、このスレッドは実行のためにタスク キューからタスクをフェッチし続け、タスクがなくなると、スレッドはすぐに閉じられます。

2) シングルタスク スレッド プール: newSingleThreadExecutor

プール内で動作するスレッドは 1 つだけであり、ブロッキング キューには制限がなく、タスクが送信された順序で実行されることが保証されます。

3) 可変サイズのスレッド プール: newCachedThreadPool

SynchronousQueue キュー、要素を保存しないブロッキング キュー。各挿入操作は、別のスレッドが削除操作を呼び出すまで待機する必要があります。したがって、最初のタスクを送信すると、キューに参加できなくなり、「キューに参加できず、タスクが maxsize に達しない場合は、新しいスレッド タスクを開始します。」というスレッド プールの条件が満たされます。したがって、maxsize は ctl の下位 29 ビットの最大値です。タイムアウトは 60 秒です。スレッドに実行するタスクがない場合、60 秒のタイムアウトは一時的に保存され、新しいタスクがない場合はスレッド プールから削除されます。

4) スケジュールされたスレッド プール、newScheduledThreadPool

は、サイズ無制限のスレッド プールを作成します。このスレッド プールは、タスクのタイミングと定期的な実行の必要性をサポートします

ついに、まだ詳細に説明されていないことがたくさんあると感じています。特にリエントラント ロックの多さです。機会があれば補います。多くのオンライン リソースを統合し、自分でいくつかの図を描きましたが、個人的にはスレッド プールを紹介する多くのブログよりも理解しやすいと思います。間違いがあれば、批判や修正を歓迎します。

以上がJavaのスレッドプールのグラフィックコードの詳細説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート