#我們最常使用的Executors實作建立執行緒池使用執行緒主要是用上述類別圖中提供的類別。在上邊的類別圖中,包含了一個Executor框架,它是一個根據一組執行策略的呼叫調度執行和控制非同步任務的框架,目的是提供一個將任務提交與任務如何運行分開的機制。它包含了三個executor介面:
Executor:運行新任務的簡單介面
ExecutorService:擴充了Executor,新增了用來管理執行器生命週期和任務生命週期的方法
ScheduleExcutorService:擴展了ExecutorService,支援Future和定期執行任務
降低資源消耗-重複使用存在的線程,減少物件建立、消亡的開銷,效能好
提高反應速度 -可有效控制最大並發執行緒數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞。當任務到達時,任務可不用等待執行緒建立就能立即執行
提高執行緒的可管理性-提供定時執行、定期執行、單執行緒、並發數控制等功能。
每次new Thread 新建對象,效能差
corePoolSize:核心執行緒數量
maximumPoolSize:執行緒最大執行緒數
workQueue:阻塞佇列,儲存等待執行的任務,很重要,會對執行緒池運行過程產生重大影響
當我們提交一個新的任務到執行緒池,執行緒池會根據目前池中正在運行的執行緒數量來決定該任務的處理方式。處理方式有三種:1、直接切換(SynchronusQueue)#2、無界佇列(LinkedBlockingQueue)能夠建立的最大執行緒數為corePoolSize,這時maximumPoolSize就不會起作用了。當執行緒池中所有的核心執行緒都是運行狀態的時候,新的任務提交就會放入等待佇列中。 3、有界隊列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得執行緒池對執行緒調度變的更困難。因為線程池與隊列容量都是有限的。所以想讓線程池的吞吐率和處理任務達到一個合理的範圍,又想使我們的線程調度相對簡單,並且還盡可能降低資源的消耗,我們就需要合理的限制這兩個數量分配技巧: [如果想要降低資源的消耗包括降低cpu使用率、作業系統資源的消耗、上下文切換的開銷等等,可以設定一個較大的佇列容量和較小的執行緒池容量,這會降低執行緒池的吞吐量。如果我們提交的任務經常發生阻塞,我們可以調整maximumPoolSize。如果我們的佇列容量較小,我們需要把線程池大小設定的大一些,這樣cpu的使用率相對來說會高一些。但是如果執行緒池的容量設定的過大,提高任務的數量過多的時候,並發量會增加,那麼執行緒之間的調度就是一個需要考慮的問題。這樣反而可能會降低處理任務的吞吐量。 ]keepAliveTime:執行緒沒有任務執行時最多保持多久時間終止(當執行緒中的執行緒數大於corePoolSize的時候,如果這時沒有新的任務提交核心執行緒外的執行緒不會立即銷毀,而是等待,直到超過keepAliveTime)
unit:keepAliveTime的時間單位
threadFactory:執行緒工廠,用來創建線程,有一個預設的工場來創建線程,這樣新創建出來的線程有相同的優先權,是非守護線程、設定好了名稱)
rejectHandler:當拒絕處理任務時(阻塞佇列滿)的策略(AbortPolicy預設策略直接拋出例外狀況、CallerRunsPolicy用呼叫者所在的執行緒執行任務、DiscardOldestPolicy丟棄佇列中最靠前的任務並執行目前任務、DiscardPolicy直接丟棄目前任務)
corePoolSize、maximumPoolSize、workQueue 三者關係:如果執行的執行緒數小於corePoolSize的時候,直接建立新執行緒來處理任務。即使線程池中的其他線程是空閒的。如果執行中的執行緒數大於corePoolSize且小於maximumPoolSize時,那麼只有當workQueue滿的時候才會建立新的執行緒去處理任務。如果corePoolSize與maximumPoolSize是相同的,那麼建立的執行緒池大小是固定的。這時有新任務提交,當workQueue未滿時,就把請求放入workQueue中。等待空執行緒從workQueue取出任務。如果workQueue此時也滿了,那就使用另外的拒絕策略參數去執行拒絕策略。
初始化方法:由七個參數組合成四個初始化方法
#其他方法:
execute(); //提交任务,交给线程池执行 submit();//提交任务,能够返回执行结果 execute+Future shutdown();//关闭线程池,等待任务都执行完 shutdownNow();//关闭线程池,不等待任务执行完 getTaskCount();//线程池已执行和未执行的任务总数 getCompleteTaskCount();//已完成的任务数量 getPoolSize();//线程池当前的线程数量 getActiveCount();//当前线程池中正在执行任务的线程数量
執行緒池生命週期:
running:能接受新提交的任務,也能處理阻塞佇列中的任務
//创建newCachedThreadPool线程池源码 public static ExecutorService newCachedThreadPool() { /** *corePoolSize: 0,核心线程池的数量为0 *maximumPoolSize: Integer.MAX_VALUE,可以认为最大线程数是无限的 *keepAliveTime: 60L *unit: 秒 *workQueue: SynchronousQueue **/ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; executor.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } }
//newSingleThreadExecutor创建线程池源码 public static ExecutorService newSingleThreadExecutor() { /** * corePoolSize : 1,核心线程池的数量为1 * maximumPoolSize : 1,只可以创建一个非核心线程 * keepAliveTime : 0L * unit => 秒 * workQueue => LinkedBlockingQueue **/ return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
#
//newFixedThreadPool创建线程池源码 public static ExecutorService newFixedThreadPool(int nThreads) { /** * corePoolSize : 核心线程的数量为自定义输入nThreads * maximumPoolSize : 最大线程的数量为自定义输入nThreads * keepAliveTime : 0L * unit : 秒 * workQueue : LinkedBlockingQueue **/ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
//newScheduledThreadPool创建线程池源码 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { /** * corePoolSize : 核心线程的数量为自定义输入corePoolSize * maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE * keepAliveTime : 0L * unit : 纳秒 * workQueue : DelayedWorkQueue **/ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); // executorService.schedule(new Runnable() { // @Override // public void run() { // log.warn("schedule run"); // } // //延迟3秒后执行 // }, 3, TimeUnit.SECONDS); // executorService.shutdown(); // executorService.scheduleWithFixedDelay(new Runnable() { // @Override // public void run() { // log.warn("scheduleWithFixedDelay run"); // } // //延迟一秒后每隔3秒执行 // }, 1, 3, TimeUnit.SECONDS); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { log.warn("schedule run"); } //延迟一秒后每隔3秒执行 }, 1, 3, TimeUnit.SECONDS); /** * 定时器调度,不推荐使用,推荐ScheduledExecutorService调度 */ // Timer timer = new Timer(); // timer.schedule(new TimerTask() { // @Override // public void run() { // log.warn("timer run"); // } // //从当前时间每隔5秒执行 // }, new Date(), 5 * 1000); }
CachedThreadPool 和newScheduledThreadPool允許創建的線程數為Integer.MAX_VALUE,可能會創建大量的線程,從而引起OOM異常
這就是為什麼禁止使用Executors去建立執行緒池,而是推薦自己去建立ThreadPoolExecutor的原因
CPU密集型: 執行緒池的大小建議為CPU數量1,CPU數量可以根據Runtime.availableProcessors方法取得IO密集型: CPU數量* CPU利用率* (1 執行緒等待時間/執行緒CPU時間) 混合型: 將任務分為CPU密集型和IO密集型,然後分別使用不同的線程池去處理,從而使每個線程池可以根據各自的工作負載來調整阻塞隊列: 建議使用有界隊列,有界佇列有助於避免資源耗盡的情況發生拒絕策略: 預設採用的是AbortPolicy拒絕策略,直接在程式中拋出RejectedExecutionException異常【因為是執行時例外,不強制catch】,這種處理方式不夠優雅。處理拒絕策略有以下幾種比較推薦:
在程式中捕獲RejectedExecutionException異常,在捕獲異常中對任務進行處理。針對預設拒絕策略
使用CallerRunsPolicy拒絕策略,該策略會將任務交給呼叫execute的執行緒執行【一般為主執行緒】,此時主執行緒將在一段時間內不能提交任何任務,從而使工作執行緒處理正在執行的任務。此時提交的線程將被保存在TCP隊列中,TCP隊列滿將會影響客戶端,這是一種平緩的性能降低
以上是Java線程池Executor怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!