Vor kurzem habe ich die Parallelitätsfunktion des Projekts verbessert, aber die Entwicklung verlief holprig. Nachdem ich viele Informationen gelesen hatte, vertiefte ich schließlich mein Verständnis. Deshalb hatte ich vor, gemeinsam den Quellcode zu überprüfen und die Prinzipien der gleichzeitigen Programmierung zusammenzufassen.
Seien Sie darauf vorbereitet, mit dem am häufigsten verwendeten Thread-Pool zu beginnen und die Implementierungsprinzipien des gesamten Lebenszyklus des Thread-Pools rund um die Erstellung, Ausführung und das Herunterfahren zu verstehen. Später werden wir Themen wie atomare Variablen, gleichzeitige Container, blockierende Warteschlangen, Synchronisierungstools, Sperren usw. untersuchen. Die Parallelitätstools in java.util.concurrent sind nicht schwer zu verwenden, aber man kann sie nicht einfach verwenden, wir müssen den verdammten Quellcode lesen, haha. Das von mir verwendete JDK ist übrigens 1.8.
Executor ist ein Thread-Pool-Management-Framework. Es gibt nur eine Methodeexecute in der Schnittstelle, die ausführbare Aufgaben ausführt. Die ExecutorService-Schnittstelle erweitert Executor, fügt Thread-Lebenszyklusverwaltung hinzu und stellt Methoden wie die Beendigung von Aufgaben und die Rückgabe von Aufgabenergebnissen bereit. AbstractExecutorService implementiert ExecutorService und stellt Standardimplementierungslogik wie die Submit-Methode bereit.
Dann erbt das heutige Thema, ThreadPoolExecutor, AbstractExecutorService und stellt die spezifische Implementierung des Thread-Pools bereit.
Das Folgende ist der häufigste Konstruktor von ThreadPoolExecutor mit bis zu sieben Parametern. Ich werde nicht den spezifischen Code veröffentlichen, sondern nur einige Anweisungen zur Parameterüberprüfung und -einstellung.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
corePoolSize ist die Zielgröße des Thread-Pools, also die Größe, wenn der Thread-Pool gerade erst erstellt wurde und keine Aufgaben auszuführen sind. MaximumPoolSize ist die maximale Obergrenze des Thread-Pools. keepAliveTime ist die Überlebenszeit des Threads. Wenn die Anzahl der Threads im Thread-Pool größer als corePoolSize ist, werden inaktive Threads, die die Überlebenszeit überschreiten, recycelt. Selbstverständlich werden die restlichen drei Parameter später analysiert.
ThreadPoolExecutor legt einige benutzerdefinierte Thread-Pools vor, die durch die Factory-Methode in Executors erstellt wurden. Lassen Sie uns die Erstellungsparameter von newSingleThreadExecutor, newFixedThreadPool und newCachedThreadPool analysieren.
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
corePoolSize und maximumPoolSize von newFixedThreadPool sind beide auf die eingehende feste Zahl gesetzt, und keepAliveTim ist auf 0 gesetzt. Nachdem der Thread-Pool erstellt wurde, wird die Anzahl der Threads festgelegt, was für Situationen geeignet ist, in denen Thread-Stabilität erforderlich ist.
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor ist eine Version von newFixedThreadPool mit einer festen Anzahl von Threads von 1, wodurch die Serialisierung von Aufgaben im sichergestellt wird Pool. Beachten Sie, dass FinalizableDelegatedExecutorService zurückgegeben wird. Schauen wir uns den Quellcode an:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } }
FinalizableDelegatedExecutorService erbt DelegatedExecutorService und fügt nur den Vorgang zum Schließen des Thread-Pools während des GC hinzu im Quellcode von DelegatedExecutorService:
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } //... }
Der Code ist sehr einfach, sodass ExecutorService nur die Methoden von ExecutorService verfügbar macht, sodass die Parameter des Thread-Pools nicht angezeigt werden nicht mehr konfiguriert werden. Ursprünglich können die vom Thread-Pool erstellten Parameter angepasst werden, und ThreadPoolExecutor stellt die Set-Methode bereit. Der Zweck der Verwendung von newSingleThreadExecutor besteht darin, einen seriellen Single-Thread-Thread-Pool zu generieren. Es wäre langweilig, wenn auch die Thread-Pool-Größe konfiguriert werden könnte.
Executors stellt außerdem die unconfigurableExecutorService-Methode bereit, die den gewöhnlichen Thread-Pool in einen nicht konfigurierbaren Thread-Pool einschließt. Wenn Sie nicht möchten, dass der Thread-Pool von unbekannten zukünftigen Generationen geändert wird, können Sie diese Methode aufrufen.
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool generiert einen zwischengespeicherten Thread-Pool. Die Anzahl der Threads kann zwischen 0 und Integer.MAX_VALUE liegen Die Zeitüberschreitung beträgt 1 Minute. Die Verwendung des Thread-Pools hat folgende Auswirkungen: Wenn ein Thread im Leerlauf vorhanden ist, wird er wiederverwendet. Wenn kein Thread im Leerlauf vorhanden ist, wird ein neuer Thread erstellt recycelt.
newScheduledThreadPool
newScheduledThreadPool erstellt einen Thread-Pool, der regelmäßig Aufgaben ausführen kann. Dies soll in diesem Artikel nicht behandelt werden und wird später in einem separaten Artikel ausführlich besprochen.
newCachedThreadPool的线程上限几乎等同于无限,但系统资源是有限的,任务的处理速度总有可能比不上任务的提交速度。因此,可以为ThreadPoolExecutor提供一个阻塞队列来保存因线程不足而等待的Runnable任务,这就是BlockingQueue。
JDK为BlockingQueue提供了几种实现方式,常用的有:
ArrayBlockingQueue:数组结构的阻塞队列
LinkedBlockingQueue:链表结构的阻塞队列
PriorityBlockingQueue:有优先级的阻塞队列
SynchronousQueue:不会存储元素的阻塞队列
newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue。要注意的是,如果任务一直提交,但线程池又不能及时处理,等待队列将会无限制地加长,系统资源总会有消耗殆尽的一刻。所以,推荐使用有界的等待队列,避免资源耗尽。但解决一个问题,又会带来新问题:队列填满之后,再来新任务,这个时候怎么办?后文会介绍如何处理队列饱和。
newCachedThreadPool使用的SynchronousQueue十分有趣,看名称是个队列,但它却不能存储元素。要将一个任务放进队列,必须有另一个线程去接收这个任务,一个进就有一个出,队列不会存储任何东西。因此,SynchronousQueue是一种移交机制,不能算是队列。newCachedThreadPool生成的是一个没有上限的线程池,理论上提交多少任务都可以,使用SynchronousQueue作为等待队列正合适。
当有界的等待队列满了之后,就需要用到饱和策略去处理,ThreadPoolExecutor的饱和策略通过传入RejectedExecutionHandler来实现。如果没有为构造函数传入,将会使用默认的defaultHandler。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
AbortPolicy是默认的实现,直接抛出一个RejectedExecutionException异常,让调用者自己处理。除此之外,还有几种饱和策略,来看一下:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果队列满了,后续的任务都抛弃掉。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardOldestPolicy会将等待队列里最旧的任务踢走,让新任务得以执行。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最后一种饱和策略是CallerRunsPolicy,它既不抛弃新任务,也不抛弃旧任务,而是直接在当前线程运行这个任务。当前线程一般就是主线程啊,让主线程运行任务,说不定就阻塞了。如果不是想清楚了整套方案,还是少用这种策略为妙。
每当线程池需要创建一个新线程,都是通过线程工厂获取。如果不为ThreadPoolExecutor设定一个线程工厂,就会使用默认的defaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
平时打印线程池里线程的name时,会输出形如pool-1-thread-1之类的名称,就是在这里设置的。这个默认的线程工厂,创建的线程是普通的非守护线程,如果需要定制,实现ThreadFactory后传给ThreadPoolExecutor即可。
不看代码不总结不会知道,光是线程池的创建就可以引出很多学问。别看平时创建线程池是一句代码的事,其实ThreadPoolExecutor提供了很灵活的定制方法。
欢迎留言和转发,下一篇打算分析线程池如何执行任务。
以上就是Java 线程池的创建过程分析 的内容,更多相关内容请关注PHP中文网(www.php.cn)!