Heim > Java > javaLernprogramm > Detaillierte Erläuterung des grafischen Codes des Thread-Pools in Java

Detaillierte Erläuterung des grafischen Codes des Thread-Pools in Java

黄舟
Freigeben: 2017-09-25 10:56:19
Original
2079 Leute haben es durchsucht

Der Thread-Pool ist ein wichtiger Wissenspunkt in Java. Hier werde ich den mit Java gelieferten Thread-Pool als Beispiel nehmen, um ihn aufzuzeichnen und zu analysieren. Dieser Artikel bezieht sich auf die gleichzeitige Java-Programmierung: Verwendung von Thread-Pools, Java-Thread-Pool---addWorker-Methodenanalyse, Thread-Pools, Auswahl von Strategien in ThreadPoolExecutor und Auswahl von Arbeitswarteschlangen (Java-Thread-Pool) sowie ThreadPoolExecutor-Thread-Pool-Analyse und drei Arten von BlockingQueue erreichen. Dieser Artikel implementiert Anwendungsfälle und Quellcodeanalysen basierend auf JDK1.8 und konzentriert sich hauptsächlich auf den Prozess.

1. Thread-Pool verwenden

Um das Prinzip von etwas zu kennen, müssen Sie zunächst wissen, wie man es verwendet. Beginnen wir also mit einem Beispiel für die Verwendung eines Thread-Pools.

1. Task-Klasse

Um den mit Java gelieferten Thread-Pool zu verwenden, benötigen Sie zunächst eine Task-Klasse, um die Runnable-Schnittstelle zu implementieren und die Ausführungsmethode zu überschreiben Multi-Thread-Ausführung) Aufgabenlogik).

package org.my.threadPoolDemo;
/**
 * 任务类,实现Runnable接口 重写run方法
 */
public class MyTask implements Runnable{
    
    private int taskNum;
    
    public MyTask(int taskNum) {
        super();
        this.taskNum = taskNum;
    }

    @Override
    public void run() {
        System.out.println("正在执行task"+taskNum);
        try {
            Thread.currentThread().sleep(4000);//sleep 4秒模拟执行代码过程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task"+taskNum+"执行完毕");
    }
}
Nach dem Login kopieren

2. Erstellen Sie einen Thread-Pool und führen Sie mehrere Aufgaben aus.

Erstellen Sie als Nächstes mit der Task-Klasse einen Thread-Pool und führen Sie mehrere Aufgaben aus. Wir verwenden ThreadPoolExecutor, um einen Thread-Pool zu erstellen.

package org.my.threadPoolDemo;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolUseDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
        for (int i = 0; i < 15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数量:"+executor.getPoolSize()+",线程池中等待执行的任务数量:"+executor.getQueue().size()+",已执行完的任务数量:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }

}
Nach dem Login kopieren

Die Logik ist sehr einfach: Erstellen Sie einen Thread-Pool-Manager und verwenden Sie ihn dann zum Ausführen von 15 Aufgaben. Hier müssen wir die Bedeutung der beim Erstellen von ThreadPoolExecutor übergebenen Parameter erklären:

5 (corePoolSize) bezieht sich auf die Kernpoolgröße. Das heißt, die Anzahl der erstellten Threads. Wenn die Anzahl der Threads im Thread-Pool dieser Zahl entspricht, wird die nächste kommende Aufgabe in die Aufgabenwarteschlange gestellt (wird später im Detail erläutert).

10 (maximumPoolSize) bezieht sich auf die maximale Anzahl von Threads, die der Thread-Pool erstellen kann. Wenn die Aufgabenwarteschlange des vorherigen Schritts voll ist, erstellt der Thread-Pool weiterhin Threads, bis die Anzahl der Threads = 10 ist, und die Ausführung der nächsten Aufgabe wird verweigert.

200 (keepAliveTime) bezieht sich auf die maximale Zeit, die der Thread dauern kann, bevor er beendet wird, wenn keine Aufgabe auszuführen ist. Standardmäßig funktioniert keepAliveTime nur, wenn die Anzahl der Threads im Thread-Pool größer als corePoolSize ist und bis die Anzahl der Threads im Thread-Pool nicht größer als corePoolSize ist, d größer als corePoolSize, wenn ein Thread inaktiv ist. Wenn die Zeit keepAliveTime erreicht, wird er beendet, bis die Anzahl der Threads im Thread-Pool corePoolSize nicht überschreitet. Wenn jedoch die Methode „allowCoreThreadTimeOut(boolean)“ aufgerufen wird und die Anzahl der Threads im Thread-Pool nicht größer als corePoolSize ist, funktioniert der Parameter keepAliveTime auch, bis die Anzahl der Threads im Thread-Pool 0 beträgt.

TimeUnit.MILLISECONDS ist die Zeiteinheit von keepAliveTime.

TimeUnit.DAYS; //Milliseconds
TimeUnit.MICROSECONDS; //Nanoseconds


ArrayBlockingQueue Die eingehende Blockierungswarteschlange wird zum Speichern der Aufgabenwarteschlange workQueue verwendet. Das heißt, die oben erwähnte Aufgabenwarteschlange. Neben ArrayBlockingQueue stehen auch LinkedBlockingQueue und SynchronousQueue zur Auswahl.

ThreadPoolExecutor verfügt über vier Konstruktoren. Zusätzlich zu den oben übergebenen Parametern können weitere Konstruktoren übergeben werden:

threadFactory: Thread-Factory, die hauptsächlich zum Erstellen von Threads verwendet wird

handler: Gibt die Richtlinie an, wenn die Ausführung einer Aufgabe verweigert wird. Es gibt vier Werte:

ThreadPoolExecutor.AbortPolicy: Die Aufgabe verwerfen und RejectedExecutionException auslösen.

ThreadPoolExecutor.DiscardPolicy: verwirft auch Aufgaben, löst aber keine Ausnahme aus.

ThreadPoolExecutor.DiscardOldestPolicy: Die vorderste Aufgabe der Warteschlange verwerfen und dann versuchen, die Aufgabe erneut auszuführen (diesen Vorgang wiederholen)

ThreadPoolExecutor.CallerRunsPolicy: Die Aufgabe wird vom aufrufenden Thread verarbeitet

Execute Im obigen Programm sind die Ergebnisse wie folgt:


Da corePoolSize 5 ist, können Sie sehen, dass die nächste Aufgabe ausgeführt wird, wenn die Anzahl der Aufgaben größer als 5 ist wird zum Warten in die Aufgabenwarteschlange gestellt, aber Da die maximale Kapazität der Aufgabenwarteschlange 5 und MaximumPoolSize = 10 beträgt, erstellt der Thread-Pool-Manager weiterhin 5 Threads, nachdem die Aufgabenwarteschlange voll ist, und schließlich die Anzahl der Threads in Thread-Pool erreicht 10. Zu diesem Zeitpunkt können 15 Aufgaben von diesen Threads verarbeitet werden. Wenn weitere Aufgaben hinzugefügt werden, beispielsweise die Anzahl der for-Schleifen auf 20 erhöht wird, tritt eine java.util.concurrent.RejectedExecutionException auf.


2. Prinzipanalyse

正在执行task0
线程池中线程数量:1,线程池中等待执行的任务数量:0,已执行完的任务数量:0
线程池中线程数量:2,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task1
线程池中线程数量:3,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task2
线程池中线程数量:4,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task3
线程池中线程数量:5,线程池中等待执行的任务数量:0,已执行完的任务数量:0
正在执行task4
线程池中线程数量:5,线程池中等待执行的任务数量:1,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:2,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:3,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:4,已执行完的任务数量:0
线程池中线程数量:5,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:6,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task10
线程池中线程数量:7,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task11
线程池中线程数量:8,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task12
正在执行task13
线程池中线程数量:9,线程池中等待执行的任务数量:5,已执行完的任务数量:0
线程池中线程数量:10,线程池中等待执行的任务数量:5,已执行完的任务数量:0
正在执行task14
task1执行完毕
task10执行完毕
正在执行task5
task11执行完毕
task13执行完毕
task4执行完毕
task3执行完毕
task2执行完毕
task0执行完毕
正在执行task9
正在执行task8
正在执行task7
task14执行完毕
task12执行完毕
正在执行task6
task5执行完毕
task9执行完毕
task7执行完毕
task8执行完毕
task6执行完毕
Nach dem Login kopieren
Aus dem obigen Beispiel der Verwendung des Thread-Pools zu urteilen, bestehen die wichtigsten zwei Schritte darin, das ThreadPoolExecutor-Objekt zu erstellen und dann jeweils die Ausführungsmethode des ThreadPoolExecutor-Objekts aufzurufen Zeit kommt eine Aufgabe.

1. ThreadPoolExecutor-Struktur

Die Hauptstruktur und Vererbungsbeziehung von ThreadPoolExecutor sind in der folgenden Abbildung dargestellt:

ThreadPoolExecutor Struktur und Vererbungsbeziehung

主要成员变量:任务队列——存放那些暂时无法执行的任务;工作线程池——存放当前启用的所有线程;线程工厂——创建线程;还有一些用来调度线程与任务并保证线程安全的成员。

了解了ThreadPoolExecutor的主要结构,再简单梳理一下“一个传入线程池的任务能够被最终正常执行需要经过的主要流程”,方法名称前面没有“XXX.”这种标注的都是ThreadPoolExecutor的方法:

Detaillierte Erläuterung des grafischen Codes des Thread-Pools in Java

Detaillierte Erläuterung des grafischen Codes des Thread-Pools in Java

2、ThreadPoolExecutor构造器及重要常量

简单了解下构造器,ThreadPoolExecutor的四个构造器的源码如下:


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }
    
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Nach dem Login kopieren

从源码中可以看出,这四个构造器都是调用最后一个构造器,只是根据开发者传入的参数的不同而填充一些默认的参数。比如如果开发者没有传入线程工厂threadFactory参数,那么构造器就使用默认的Executors.defaultThreadFactor。

在这里还要理解ThreadPoolExecutor的几个常量的含义和几个简单方法:


//Integer.SIZE是一个静态常量,值为32,也就是说COUNT_BITS是29
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY是最大容量536870911,因为1左移29位之后-1,导致最高三位为0,而其余29位全部为1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//ctl用于表示线程池状态和有效线程数量,最高三位表示线程池的状态,其余低位表示有效线程数量
//初始化之后ctl等于RUNNING的值,即默认状态是运行状态,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//1110 0000 0000 0000 0000 0000 0000 0000最高三位为111
private static final int RUNNING    = -1 << COUNT_BITS;
//最高三位为000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000最高三位为001
private static final int STOP       =  1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000最高三位为010
private static final int TIDYING    =  2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000最高三位为011
private static final int TERMINATED =  3 << COUNT_BITS;
/**
* 获取运行状态,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当取反的时候,就只有最高三位为1,再经过与运算,只会取到ctl的最高三位
* 而这最高三位如上文所述,表示线程池的状态
*/
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/**
* 获取工作线程数量,入参为ctl。因为CAPACITY是最高三位为0,其余低位为1
* 所以当进行与运算的时候,只会取到低29位,这29位正好表示有效线程数量
*/
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//任务队列,用于存放待执行任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
/** 判断线程池是否是运行状态,传入的参数是ctl的值
* 只有RUNNING的符号位是1,也就是只有RUNNING为负数
* 所以如果目前的ctl值<0,就是RUNNING状态
*/
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//从任务队列中移除任务
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
Nach dem Login kopieren

3、getTask()源代码

通过前面的流程图,我们知道getTask()是由runWorker方法调用的,目的是取出一个任务。

private Runnable getTask() {
        //记录循环体中上个循环在从阻塞队列中取任务时是否超时
        boolean timedOut = false; 
        //无条件循环,主要是在线程池运行正常情况下
        //通过循环体内部的阻塞队列的阻塞时间,来控制当前线程的超时时间
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池状态

            /*先获取线程池的状态,如果状态大于等于STOP,也就是STOP、TIDYING、TERMINATED之一
             *这时候不管队列中有没有任务,都不用去执行了;
             *如果线程池的状态为SHUTDOWN且队列中没有任务了,也不用继续执行了
             *将工作线程数量-1,并且返回null
             **/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取工作线程数量
            int wc = workerCountOf(c);

            //是否启用超时参数keepAliveTime
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果这个条件成立,如果工作线程数量-1成功,返回null,否则跳出循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //如果需要采用阻塞形式获取,那么就poll设定阻塞时间,否则take无限期等待。
            //这里通过阻塞时间,间接控制了超时的值,如果取值超时,意味着这个线程在超时时间内处于空闲状态
            //那么下一个循环,将会return null并且线程数量-1
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
Nach dem Login kopieren

4、runWorker(Worker w)源代码

通过上面流程图,可以看出runWorker(Worker w)实际上已经是在线程启动之后执行任务了,所以其主要逻辑就是获取任务,然后执行任务的run方法。


final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//获取传入的Worker对象中的任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
                /* 
                 * 如果传入的任务为null,就从任务队列中获取任务,只要这两者有一个不为null,就进入循环体
                 */
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //如果线程池状态已被标为停止,那么则不允许该线程继续执行任务!或者该线程已是中断状态,
                //也不允许执行任务,还需要中断该线程!
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//为子类预留的空方法(钩子)
                    Throwable thrown = null;
                    try {
                        task.run();//终于真正执行传入的任务了
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//为子类预留的空方法(钩子)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
                /*
                 * 从这里可以看出,当一个任务执行完毕之后,循环并没有退出,此时会再次执行条件判断
                 * 也就是说如果执行完第一个任务之后,任务队列中还有任务,那么将会继续在这个线程执行。
                 * 这里也是很巧妙的地方,不需要额外开一个控制线程来看那些线程处于空闲状态,然后给他分配任务。
                 * 直接自己去任务队列拿任务
                 */
            }
            completedAbruptly = false;
        } finally {
                /*
                 * 执行到这里,说明getTask返回了null,要么是超时(任务队列没有任务了),要么是线程池状态有问题了
                 * 当前线程将被回收了
                 */
            processWorkerExit(w, completedAbruptly);
        }
    }
Nach dem Login kopieren

5、addWorker(Runnable firstTask, boolean core)源代码

runWorker是从Worker对象中获取第一个任务,然后从任务队列中一直获取任务执行。流程图中已经说过,这个Worker对象是在addWorker方法中创建的,所以新线程创建、启动的源头是在addWorker方法中。而addWorker是被execute所调用,execute根据addWorker的返回值,进行条件判断。

private boolean addWorker(Runnable firstTask, boolean core) {
        //上来就是retry,后面continue retry;语句执行之后都会从这里重新开始
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池运行状态

            /*
             * 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false
             * 如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);//获取工作线程数量
                /*
                 * addWorker传入的第二个Boolean参数用来判别当前线程数量是否大于核心池数量
                 * true,代表当前线程数量小于核心池数量
                 */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                        //如果成功,跳出retry
                    break retry;
                c = ctl.get();  // Re-read ctl
                //判断线程池状态,改变了就重试一次
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
                /*
                 * 前面顺利增加了工作线程数,那么这里就真正创建Worker
                 * 上面流程图中说过,创建Worker时会创建新的线程.如果这里创建失败
                 * finally中会将工作线程数-1
                 */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将Worker放入工作线程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Nach dem Login kopieren

6、execute方法

execute方法其实最主要是根据线程池的策略来传递不同的参数给addWorker方法。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:

a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

方法execute主要是在控制上面四条策略的实现。


 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取到成员变量ctl的值(线程池状态)
        int c = ctl.get();
        //如果工作线程的数量<核心池的大小
        if (workerCountOf(c) < corePoolSize) {
            //调用addWorker(这里传入的true代表工作线程数量<核心池大小)
            //如果成功,方法结束。
            if (addWorker(command, true))
                return;
            //否则,再重新获取一次ctl的值
            //个人理解是防止前面这段代码执行的时候有其他线程改变了ctl的值。
            c = ctl.get();
        }
        //如果工作线程数量>=核心池的大小或者上一步调用addWorker返回false,继续走到下面
        //如果线程池处于运行状态,并且成功将当前任务放入任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            //为了线程安全,重新获取ctl的值
            int recheck = ctl.get();
            //如果线程池不处于运行状态并且任务从任务队列移除成功
            if (! isRunning(recheck) && remove(command))
                //调用reject拒绝执行,根据handler的实现类抛出异常或者其他操作
                reject(command);
            //否则,如果工作线程数量==0,调用addWorker并传入null和false
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //执行到这里代表当前线程已超越了核心线程且任务提交到任务队列失败。(可以注意这里的addWorker是false)
        //那么这里再次调用addWroker创建新线程(这时创建的线程是maximumPoolSize)。
        //如果还是提交任务失败则调用reject处理失败任务
        else if (!addWorker(command, false))
            reject(command);
    }
Nach dem Login kopieren

三、线程池相关影响因素

1、阻塞队列的选择

本段引用至ThreadPoolExecutor中策略的选择与工作队列的选择(java线程池)

直接提交:
工作队列的默认选项是SynchronousQueue,它将任务直接提交给线程而不存储它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列:
使用无界队列(例如,不具有预定义容量的LinkedBlockingQueue将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

Begrenzte Warteschlangen:
Begrenzte Warteschlangen (wie ArrayBlockingQueue) tragen dazu bei, die Erschöpfung der Ressourcen zu verhindern, wenn begrenzte maximale Poolgrößen verwendet werden, können jedoch schwieriger zu optimieren und zu kontrollieren sein. Die Warteschlangengröße und die maximale Poolgröße erfordern möglicherweise einen Kompromiss: Die Verwendung großer Warteschlangen und kleiner Pools kann die CPU-Auslastung, die Betriebssystemressourcen und den Kontextwechsel-Overhead minimieren, kann jedoch zu einem künstlich reduzierten Durchsatz führen. Wenn Aufgaben häufig blockieren (z. B. wenn es sich um E/A-Grenzen handelt), plant das System möglicherweise mehr Threads, als Sie zulassen. Die Verwendung kleiner Warteschlangen erfordert normalerweise größere Poolgrößen und eine höhere CPU-Auslastung, kann jedoch zu einem inakzeptablen Planungsaufwand führen, der auch den Durchsatz verringern kann.

2. Häufig verwendete Thread-Pools

Wenn Sie ThreadPoolExecutor zum Erstellen eines Thread-Pools verwenden, können Sie verschiedene Konstruktoren verwenden, um Thread-Pools mit unterschiedlichen Eigenschaften gemäß unterschiedlichen Parametern zu erstellen. In tatsächlichen Situationen verwenden wir im Allgemeinen die von der Executors-Klasse bereitgestellten Methoden, um Thread-Pools zu erstellen. Executors ruft letztendlich den Konstruktor von ThreadPoolExecutor auf, die relevanten Parameter wurden jedoch konfiguriert.

1) Thread-Pool mit fester Größe: Executors.newFixedThreadPool

coresize ist dasselbe wie maxsize, das Timeout ist 0 und die Warteschlange verwendet eine unbegrenzte FIFO-Warteschlange von LinkedBlockingQueue, was bedeutet, dass dieser Thread Der Pool hat immer nur die Kerngröße. Es werden Threads ausgeführt. Wenn die Aufgabe ausgeführt wird, ruft dieser Thread gemäß der vorherigen Analyse weiterhin Aufgaben zur Ausführung aus der Aufgabenwarteschlange ab. Wenn keine Aufgaben vorhanden sind, wird der Thread sofort geschlossen.

2) Einzeltask-Thread-Pool: newSingleThreadExecutor

Im Pool arbeitet nur ein Thread und die Blockierungswarteschlange ist unbegrenzt. Dadurch kann sichergestellt werden, dass Aufgaben in der Reihenfolge ausgeführt werden, in der sie ausgeführt werden sie werden eingereicht.

3) Thread-Pool variabler Größe: newCachedThreadPool

SynchronousQueue-Warteschlange, eine blockierende Warteschlange, die keine Elemente speichert. Jede Einfügungsoperation muss warten, bis ein anderer Thread eine Entfernungsoperation aufruft. Wenn wir die erste Aufgabe übermitteln, können wir daher nicht in die Warteschlange aufgenommen werden. Dies erfüllt eine Thread-Pool-Bedingung: „Wenn wir der Warteschlange nicht beitreten können und die Aufgabe die maximale Größe nicht erreicht, starten wir eine neue Thread-Aufgabe.“ Unsere maximale Größe ist also der Maximalwert der unteren 29 Bits von ctl. Die Zeitüberschreitung beträgt 60 Sekunden. Wenn für einen Thread keine Aufgaben ausgeführt werden müssen, wird die Zeitüberschreitung von 60 Sekunden vorübergehend gespeichert. Wenn keine neuen Aufgaben vorhanden sind, wird er aus dem Thread-Pool entfernt.

4) geplanter Thread-Pool, newScheduledThreadPool

erstellt einen Thread-Pool mit unbegrenzter Größe. Dieser Thread-Pool unterstützt die Notwendigkeit einer zeitlichen Abstimmung und regelmäßigen Ausführung von Aufgaben.

Endlich ist er fast zu Ende. Ich habe das Gefühl, dass noch viele Dinge nicht detailliert beschrieben wurden, insbesondere die große Anzahl wiedereintrittsfähiger Sperren. Werde es bei Gelegenheit nachholen. Ich habe viele Online-Ressourcen integriert und selbst ein paar Bilder gezeichnet. Ich persönlich denke, dass es einfacher zu verstehen ist als viele Blogs, die Thread-Pools einführen. Bei Fehlern sind Kritik und Korrekturen willkommen.

Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung des grafischen Codes des Thread-Pools in Java. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage