首頁 > Java > java教程 > SpringBoot執行緒池和Java執行緒池怎麼使用

SpringBoot執行緒池和Java執行緒池怎麼使用

PHPz
發布: 2023-05-18 12:46:36
轉載
1024 人瀏覽過

    SpringBoot執行緒池和Java執行緒池的用法和實作原理

    使用預設的執行緒池

    方式一:透過@Async註解呼叫

    public class AsyncTest {
        @Async
        public void async(String name) throws InterruptedException {
            System.out.println("async" + name + " " + Thread.currentThread().getName());
            Thread.sleep(1000);
        }
    }
    登入後複製

    啟動類別上需要新增@EnableAsync註解,否則不會生效。

    @SpringBootApplication
    //@EnableAsync
    public class Test1Application {
       public static void main(String[] args) throws InterruptedException {
          ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
          AsyncTest bean = run.getBean(AsyncTest.class);
          for(int index = 0; index <= 10; ++index){
             bean.async(String.valueOf(index));
          }
       }
    }
    登入後複製

    方式二:直接注入 ThreadPoolTask​​Executor

    #此時可不加 @EnableAsync註解

    @SpringBootTest
    class Test1ApplicationTests {
    
       @Resource
       ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
       @Test
       void contextLoads() {
          Runnable runnable = () -> {
             System.out.println(Thread.currentThread().getName());
          };
    
          for(int index = 0; index <= 10; ++index){
             threadPoolTaskExecutor.submit(runnable);
          }
       }
    
    }
    登入後複製

    執行緒池預設設定資訊

    SpringBoot執行緒池的常見配置:

    spring:
      task:
        execution:
          pool:
            core-size: 8
            max-size: 16                          # 默认是 Integer.MAX_VALUE
            keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
            allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
            queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
          shutdown:
            await-termination: false              # 线程关闭等待
          thread-name-prefix: task-               # 线程名称的前缀
    登入後複製

    SpringBoot 執行緒池的實作原理

    TaskExecutionAutoConfiguration 類別中定義了 ThreadPoolTask​​Executor#,該類別的內部實作也是基於java原生的 ThreadPoolExecutor類別。 initializeExecutor()方法在其父類別中被調用,但是在父類別中 RejectedExecutionHandler 被定義為了 private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private RejectedExecutionHandler rejectedExecutionHandler = new Thread#private Recutor.AbecutionHandler #Abortg; ,並透過initialize()方法將AbortPolicy傳入initializeExecutor()中。

    注意在

    TaskExecutionAutoConfiguration 類別中,ThreadPoolTask​​Executor類別的bean的名稱為: applicationTaskExecutor 和 taskExecutor##。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// TaskExecutionAutoConfiguration#applicationTaskExecutor() @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }</pre><div class="contentsignin">登入後複製</div></div><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// ThreadPoolTaskExecutor#initializeExecutor() @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue&lt;Runnable&gt; queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }</pre><div class="contentsignin">登入後複製</div></div><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">// ExecutorConfigurationSupport#initialize() public void initialize() { if (logger.isInfoEnabled()) { logger.info(&quot;Initializing ExecutorService&quot; + (this.beanName != null ? &quot; &amp;#39;&quot; + this.beanName + &quot;&amp;#39;&quot; : &quot;&quot;)); } if (!this.threadNamePrefixSet &amp;&amp; this.beanName != null) { setThreadNamePrefix(this.beanName + &quot;-&quot;); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }</pre><div class="contentsignin">登入後複製</div></div>覆蓋預設的線程池

    覆蓋預設的 

    taskExecutor

    對象,bean的返回類型可以是ThreadPoolTask​​Executor也可以是Executor <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Configuration public class ThreadPoolConfiguration { @Bean(&quot;taskExecutor&quot;) public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix(&quot;myExecutor--&quot;); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }</pre><div class="contentsignin">登入後複製</div></div>管理多個執行緒池

    如果出現了多個執行緒池,例如再定義一個執行緒池 

    taskExecutor2

    ,則直接執行會報錯。此時需要指定bean的名稱即可。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Bean(&quot;taskExecutor2&quot;) public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //设置线程池参数信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix(&quot;myExecutor2--&quot;); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; }</pre><div class="contentsignin">登入後複製</div></div>引用執行緒池時,需要將變數名稱變更為bean的名稱,這樣會依照名稱尋找。

    @Resource
    ThreadPoolTaskExecutor taskExecutor2;
    登入後複製

    對於使用

    @Async

    註解的多執行緒則在註解中指定bean的名字即可。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Async(&quot;taskExecutor2&quot;) public void async(String name) throws InterruptedException { System.out.println(&quot;async&quot; + name + &quot; &quot; + Thread.currentThread().getName()); Thread.sleep(1000); }</pre><div class="contentsignin">登入後複製</div></div>執行緒池的四個拒絕策略

    JAVA常用的四個執行緒池

    ThreadPoolExecutor

     類別的建構子如下:<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue&lt;Runnable&gt; workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }</pre><div class="contentsignin">登入後複製</div></div> newCachedThreadPool

    不限制最大執行緒數(

    maximumPoolSize=Integer.MAX_VALUE

    ),如果有空閒的執行緒超過需要,則回收,否則重複使用現有的執行緒。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue&lt;Runnable&gt;());</pre><div class="contentsignin">登入後複製</div></div>newFixedThreadPool

    定長線程池,超出執行緒數的任務會在佇列中等待。

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
    登入後複製

    newScheduledThreadPool

    類似於

    newCachedThreadPool

    ,執行緒數無上限,但是可以指定corePoolSize。可實現延遲執行、週期執行。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }</pre><div class="contentsignin">登入後複製</div></div>週期執行:​​

    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(()->{
       System.out.println("rate");
    }, 1, 1, TimeUnit.SECONDS);
    登入後複製

    延時執行:

    scheduledThreadPool.schedule(()->{
       System.out.println("delay 3 seconds");
    }, 3, TimeUnit.SECONDS);
    登入後複製

    newSingleThreadExecutor

    單執行緒執行緒池,可以實作執行緒的順序執行。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    登入後複製

    Java 執行緒池中的四個拒絕策略

    • #CallerRunsPolicy

      :執行緒池讓呼叫者去執行。

    • AbortPolicy

      :如果執行緒池拒絕了任務,直接報錯。

    • DiscardPolicy

      :如果執行緒池拒絕了任務,直接丟棄。

    • DiscardOldestPolicy

      :如果執行緒池拒絕了任務,直接將執行緒池中最舊的,未執行的任務丟棄,將新任務入隊。

    • CallerRunsPolicy

    直接在主執行緒中執行了run方法。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    登入後複製

    效果類似於:

    Runnable thread = ()->{
       System.out.println(Thread.currentThread().getName());
       try {
          Thread.sleep(0);
       } catch (InterruptedException e) {
          throw new RuntimeException(e);
       }
    };
    
    thread.run();
    登入後複製

    AbortPolicy

    #直接拋出

    RejectedExecutionException

    異常,並指示任務的訊息,線程池的資訊。 、<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(&quot;Task &quot; + r.toString() + &quot; rejected from &quot; + e.toString()); } }</pre><div class="contentsignin">登入後複製</div></div>DiscardPolicy

    什麼都不做。

    public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    登入後複製

    DiscardOldestPolicy

    • #e.getQueue().poll()

       : 取出佇列最舊的任務。

    • e.execute(r)

       : 目前任務入隊。

      public static class DiscardOldestPolicy implements RejectedExecutionHandler {
       
          public DiscardOldestPolicy() { }
       
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              if (!e.isShutdown()) {
                  e.getQueue().poll();
                  e.execute(r);
              }
          }
      }
      登入後複製
    • Java 執行緒重複使用的原理

    java

    的執行緒池中保存的是 java.util.concurrent.ThreadPoolExecutor. Worker 對象,該物件在被維護在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是儲存待執行的任務的佇列,在執行緒池中加入新的任務時,會將任務加入workQueue佇列。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() &gt;= 0 &amp;&amp; (t = thread) != null &amp;&amp; !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }</pre><div class="contentsignin">登入後複製</div></div>work物件的執行依賴於 

    runWorker()

    ,與我們平時寫的執行緒不同,該執行緒處在一個循環中,並不斷地從佇列中取得新的任務執行。因此執行緒池中的執行緒才可以重複使用,而不是像我們平常使用的執行緒一樣執行完畢就結束。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    登入後複製

    以上是SpringBoot執行緒池和Java執行緒池怎麼使用的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    相關標籤:
    來源:yisu.com
    本網站聲明
    本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
    熱門教學
    更多>
    最新下載
    更多>
    網站特效
    網站源碼
    網站素材
    前端模板