The principle of thread pool is as follows:
Instructions:
If the number of currently running threads is less than corePoolSize, create a new thread to perform the task.
If the running threads are equal to or more than corePoolSize, the task is added to the queue.
When the task queue is full, a new thread is created in a non-corePool to process the task.
Creating a new thread will cause the currently running threads to exceed maximumPoolSize, the task will be rejected, and the RejectedExecutionHandler.rejectedExecution() method will be called.
The thread pool provides us with four rejection policies: CallerRunsPolicy, AbortPolicy, DiscardPolicy, DiscardOldestPolicy
The default rejection strategy in ThreadPoolExecutor is that AbortPolicy directly throws an exception. The specific implementation is as follows
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
Description: This strategy is very simple and crude. It will directly throw a RejectedExecutionException exception and will not perform subsequent tasks. .
Example description:
public class ThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(1), new ThreadPoolExecutor.AbortPolicy()); //异步执行 for(int i=0; i<10;i++) { System.out.println("添加第"+i+"个任务"); threadPoolExecutor.execute(new TestThread("线程"+i)); } } } public class TestThread implements Runnable { private String name; public TestThread(String name){ this.name=name; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread name:"+Thread.currentThread().getName()+",执行:"+name); } }
Execution result:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.skywares.fw.juc .thread.TestThread@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
at java.util.concurrent. ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java: 1369)
at com.skywares.fw.juc.thread.ThreadPoolTest.main(ThreadPoolTest.java:26)
thread name:pool-1-thread-5, execution: thread 5
thread name: pool-1-thread-2, execution: thread 1
thread name: pool-1-thread-4, execution: thread 4
thread name: pool-1-thread-3, execution: thread 3
thread name: pool-1-thread-1, execution: thread 0
thread name: pool-1-thread-5, execution: thread 2
We know from the execution results, Using the AbortPolicy strategy, an error will be reported directly when the task is executed to the seventh task, resulting in subsequent business logic not being executed.
After the task is rejected, CallerRunsPolicy will use the upper thread that calls the execute function to execute the rejected task.
Related examples
public class ThreadPoolTest { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(1), new ThreadPoolExecutor.CallerRunsPolicy()); //异步执行 for(int i=0; i<10;i++) { System.out.println("添加第"+i+"个任务"); threadPoolExecutor.execute(new TestThread("线程"+i)); } } }
Execution results:
Add the 0th task
Add the 1st task
Add the 2nd task
Add the 3rd task
Add the 4th task
Add the 5th task
Add the 6th task
thread name:main, execution: thread 6
thread name:pool- 1-thread-3, execution: thread 3
thread name: pool-1-thread-1, execution: thread 0
thread name: pool-1-thread-4, execution: thread 4
thread name:pool-1-thread-2, execution: thread 1
thread name: pool-1-thread-5, execution: thread 5
Add the 7th task
Add the 8th task
thread name:main,execution:thread 8
thread name:pool-1-thread-1,execution:thread 7
thread name:pool-1-thread-3,execution:thread 2
Add The 9th task
thread name:pool-1-thread-1, execution: thread 9
From the execution results, we can know that when the 7th task is executed, due to Thread pool rejection policy, this task is executed by the main thread, and other tasks will continue to be executed when the thread pool is idle. So this strategy may block the main thread.
This rejection policy is relatively simple. Tasks rejected by the thread pool are directly discarded without throwing exceptions or executing
Modify the above code and change the rejection policy to DiscardPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(1), new ThreadPoolExecutor.CallerRunsPolicy());
##invoke dealStock successJudging from the execution results, only 6 tasks were executed, and the other tasks were abandoned. DiscardOldestPolicyDiscardOldestPolicy When a task refuses to be added, the task that was first added to the queue will be discarded and the new task will be added. Example descriptiongoodsId:mobile phone
thread name:pool-1- thread-1, execution: thread 0
thread name: pool-1-thread-4, execution: thread 4
thread name: pool-1-thread-5, execution: thread 5
thread name: pool-1-thread-3, execution: thread 3
thread name: pool-1-thread-2, execution: thread 1
thread name: pool-1-thread-1, execution: thread 2
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 2, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
Add the 0th taskAdd the 1st task
Add the 2nd task
Add the 3rd task
Add the 4th task
Add the 5th task
invoke dealStock success
goodsId: mobile phone
thread name:pool-1-thread-2, execute: Thread 3
thread name:pool-1-thread-1, execution: thread 0
thread name: pool-1-thread-1, execution: thread 2
thread name:pool-1-thread- 2. Execution: Thread 1
当线程池提供的拒绝策略无法满足要求时,我们可以采用自定义的拒绝策略,只需要实现RejectedExecutionHandler接口即可
public class CustRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { new Thread(r,"线程:"+new Random().nextInt(10)).start(); } } ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 2, 10, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(2), new CustRejectedExecutionHandler());
执行结果:
thread name:客户线程:6,执行:线程5
thread name:pool-1-thread-1,执行:线程0
thread name:客户线程:8,执行:线程4
thread name:pool-1-thread-2,执行:线程3
thread name:pool-1-thread-1,执行:线程1
thread name:pool-1-thread-2,执行:线程2
从执行的结果来看,被拒绝的任务都在客户的新线程中执行。
AbortPolicy:直接抛出异常,后续的任务不会执行
CallerRunsPolicy:子任务执行的时间过长,可能会阻塞主线程。
DiscardPolicy:不抛异常,任务直接丢弃
DiscardOldestPolicy;丢弃最先加入队列的任务
The above is the detailed content of How to implement Java ThreadPoolExecutor's rejection policy?. For more information, please follow other related articles on the PHP Chinese website!