public class AsyncTest { @Async public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); } }
啟動類別上需要新增@EnableAsync
註解,否則不會生效。
@SpringBootApplication //@EnableAsync public class Test1Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); AsyncTest bean = run.getBean(AsyncTest.class); for(int index = 0; index <= 10; ++index){ bean.async(String.valueOf(index)); } } }
#此時可不加 @EnableAsync
註解
@SpringBootTest class Test1ApplicationTests { @Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; for(int index = 0; index <= 10; ++index){ threadPoolTaskExecutor.submit(runnable); } } }
SpringBoot執行緒池的常見配置:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默认是 Integer.MAX_VALUE keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止 allow-core-thread-timeout: true # 是否允许核心线程超时,默认true queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE shutdown: await-termination: false # 线程关闭等待 thread-name-prefix: task- # 线程名称的前缀
TaskExecutionAutoConfiguration
類別中定義了 ThreadPoolTaskExecutor
#,該類別的內部實作也是基於java原生的 ThreadPoolExecutor
類別。 initializeExecutor()
方法在其父類別中被調用,但是在父類別中 RejectedExecutionHandler
被定義為了 private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private Recutor.AbecutionHandler #Abortg; ,並透過
initialize()方法將
AbortPolicy傳入
initializeExecutor()中。
TaskExecutionAutoConfiguration 類別中,
ThreadPoolTaskExecutor類別的bean的名稱為:
applicationTaskExecutor 和
taskExecutor##。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAUL
T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}</pre><div class="contentsignin">登入後複製</div></div><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}</pre><div class="contentsignin">登入後複製</div></div><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// ExecutorConfigurationSupport#initialize()
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService" + (this.beanName != null ? " &#39;" + this.beanName + "&#39;" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}</pre><div class="contentsignin">登入後複製</div></div>
覆蓋預設的線程池
對象,bean的返回類型可以是ThreadPoolTaskExecutor
也可以是Executor
。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Configuration
public class ThreadPoolConfiguration {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}</pre><div class="contentsignin">登入後複製</div></div>
管理多個執行緒池
,則直接執行會報錯。此時需要指定bean的名稱即可。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//设置线程池参数信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor2--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒绝策略为使用当前线程执行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}</pre><div class="contentsignin">登入後複製</div></div>
引用執行緒池時,需要將變數名稱變更為bean的名稱,這樣會依照名稱尋找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
對於使用
@Async註解的多執行緒則在註解中指定bean的名字即可。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Async("taskExecutor2")
public void async(String name) throws InterruptedException {
System.out.println("async" + name + " " + Thread.currentThread().getName());
Thread.sleep(1000);
}</pre><div class="contentsignin">登入後複製</div></div>
執行緒池的四個拒絕策略
JAVA常用的四個執行緒池
ThreadPoolExecutor 類別的建構子如下:<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}</pre><div class="contentsignin">登入後複製</div></div>
newCachedThreadPool
),如果有空閒的執行緒超過需要,則回收,否則重複使用現有的執行緒。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());</pre><div class="contentsignin">登入後複製</div></div>
newFixedThreadPool
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newScheduledThreadPool
,執行緒數無上限,但是可以指定corePoolSize
。可實現延遲執行、週期執行。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}</pre><div class="contentsignin">登入後複製</div></div>
週期執行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(()->{ System.out.println("rate"); }, 1, 1, TimeUnit.SECONDS);
延時執行:
scheduledThreadPool.schedule(()->{ System.out.println("delay 3 seconds"); }, 3, TimeUnit.SECONDS);
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Java 執行緒池中的四個拒絕策略
:執行緒池讓呼叫者去執行。
:如果執行緒池拒絕了任務,直接報錯。
:如果執行緒池拒絕了任務,直接丟棄。
:如果執行緒池拒絕了任務,直接將執行緒池中最舊的,未執行的任務丟棄,將新任務入隊。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
效果類似於:
Runnable thread = ()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(0); } catch (InterruptedException e) { throw new RuntimeException(e); } }; thread.run();
AbortPolicy
異常,並指示任務的訊息,線程池的資訊。 、<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}</pre><div class="contentsignin">登入後複製</div></div>
DiscardPolicy
什麼都不做。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
: 取出佇列最舊的任務。
: 目前任務入隊。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
的執行緒池中保存的是 java.util.concurrent.ThreadPoolExecutor. Worker
對象,該物件在被維護在private final HashSet<Worker> workers = new HashSet<Worker>();
。 workQueue
是儲存待執行的任務的佇列,在執行緒池中加入新的任務時,會將任務加入workQueue
佇列。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}</pre><div class="contentsignin">登入後複製</div></div>
work物件的執行依賴於
,與我們平時寫的執行緒不同,該執行緒處在一個循環中,並不斷地從佇列中取得新的任務執行。因此執行緒池中的執行緒才可以重複使用,而不是像我們平常使用的執行緒一樣執行完畢就結束。 以上是SpringBoot執行緒池和Java執行緒池怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}