The most commonly used Executors implementation to create a thread pool and use threads are mainly using the above The classes provided in the class diagram. The class diagram above includes an Executor framework, which is a framework that schedules execution and controls asynchronous tasks based on a set of execution strategy calls. The purpose is to provide a mechanism that separates task submission from how tasks are run. It contains three executor interfaces:
Executor: a simple interface to run new tasks
ExecutorService: extends Executor, adding Methods of managing executor life cycle and task life cycle
ScheduleExcutorService: Extends ExecutorService to support Future and regular execution of tasks
Reduce resource consumption-Reuse existing threads, reduce the cost of object creation and destruction, and have good performance
Improve response speed - It can effectively control the maximum number of concurrent threads, improve system resource utilization, and avoid excessive resource competition and blocking. When a task arrives, the task can be executed immediately without waiting for the thread to be created.
Improving thread manageability-Provides scheduled execution, periodic execution, single-thread, Concurrency control and other functions.
Every time new Thread creates a new object, the performance is poor
Thread Lack of unified management may create unlimited new threads, compete with each other, and may occupy too many system resources, leading to a crash or OOM (out of memory memory overflow). The cause of this problem is not simply a new Thread, but may be due to It is caused by program bugs or design flaws that lead to constant new Thread.
Missing more features such as more executions, periodic executions, and thread interruptions.
Parameter description: ThreadPoolExecutor has a total of seven parameters. These seven parameters work together to form the powerful function of the thread pool.
corePoolSize: Number of core threads
maximumPoolSize: Maximum number of threads
workQueue: Blocking Queues, which store tasks waiting to be executed, are very important and will have a significant impact on the running process of the thread pool.
When we submit a new task to the thread pool, the thread pool will be based on the number of threads currently running in the pool. Decide how the task will be handled. There are three processing methods:
1. Direct switching (SynchronusQueue)
2. The maximum number of threads that can be created by the unbounded queue (LinkedBlockingQueue) is corePoolSize. In this case, maximumPoolSize will not work. When all core threads in the thread pool are running, new task submissions will be placed in the waiting queue.
3. The maximum maximumPoolSize of the bounded queue (ArrayBlockingQueue) can reduce resource consumption, but this method makes thread pool scheduling more difficult. Because the thread pool and queue capacity are limited. So if we want the throughput rate of the thread pool and processing tasks to reach a reasonable range, and if we want to make our thread scheduling relatively simple and reduce resource consumption as much as possible, we need to reasonably limit these two quantity allocation techniques: [ If you want to reduce resource consumption, including reducing CPU usage, operating system resource consumption, context switching overhead, etc., you can set a larger queue capacity and a smaller thread pool capacity, which will reduce the throughput of the thread pool. If the tasks we submit often block, we can adjust the maximumPoolSize. If our queue capacity is small, we need to set the thread pool size larger, so that the CPU usage will be relatively higher. However, if the capacity of the thread pool is set too large and the number of tasks is increased too much, the amount of concurrency will increase, so scheduling between threads is an issue that needs to be considered. This may instead reduce the throughput of processing tasks. ]
keepAliveTime: The maximum time for a thread to be kept until it terminates when there is no task execution (when the number of threads in the thread is greater than corePoolSize, if there is no new task submitted to a thread outside the core thread at this time Will not be destroyed immediately, but wait until keepAliveTime is exceeded)
unit: The time unit of keepAliveTime
threadFactory: Thread factory, used Create a thread, there is a default factory to create threads, so that the newly created threads have the same priority, are non-daemon threads, and have a set name)
rejectHandler: When rejection processing Strategy when task (blocking queue is full) (AbortPolicy default policy throws an exception directly, CallerRunsPolicy uses the thread of the caller to execute the task, DiscardOldestPolicy discards the top task in the queue and executes the current task, DiscardPolicy directly discards the current task)
The relationship between corePoolSize, maximumPoolSize, and workQueue: If the number of running threads is less than corePoolSize, a new thread will be created directly to handle the task. Even if other threads in the thread pool are idle. If the number of running threads is greater than corePoolSize and less than maximumPoolSize, a new thread will be created to process the task only when the workQueue is full. If corePoolSize and maximumPoolSize are the same, the size of the created thread pool is fixed. At this time, a new task is submitted, and when the workQueue is not full, the request is placed in the workQueue. Wait for the empty thread to remove the task from the workQueue. If the workQueue is also full at this time, then use additional rejection policy parameters to execute the rejection policy.
Initialization method: composed of seven parameters into four initialization methods
Other methods:
execute(); //提交任务,交给线程池执行 submit();//提交任务,能够返回执行结果 execute+Future shutdown();//关闭线程池,等待任务都执行完 shutdownNow();//关闭线程池,不等待任务执行完 getTaskCount();//线程池已执行和未执行的任务总数 getCompleteTaskCount();//已完成的任务数量 getPoolSize();//线程池当前的线程数量 getActiveCount();//当前线程池中正在执行任务的线程数量
Thread pool life cycle:
running: can accept newly submitted tasks and can also process tasks in the blocking queue
shutdown : Cannot process new tasks, but can continue to process tasks in the blocked queue
stop: Cannot receive new tasks, and cannot process tasks in the queue
tidying: If all tasks have been terminated, the number of effective threads is 0
terminated: Final state
You can create four thread pools using Executors: corresponding to the four thread pool initialization methods mentioned above
newCachedThreadPool is a Create a thread pool for new threads as needed. When a task is submitted, corePoolSize is 0 and no core thread is created. SynchronousQueue is a queue that does not store elements. It can be understood that the queue is always full, so non-core threads will eventually be created. Perform tasks. Non-core threads will be recycled when idle for 60 seconds. Because Integer.MAX_VALUE is very large, it can be considered that threads can be created infinitely, which can easily cause OOM exceptions when resources are limited.
//创建newCachedThreadPool线程池源码 public static ExecutorService newCachedThreadPool() { /** *corePoolSize: 0,核心线程池的数量为0 *maximumPoolSize: Integer.MAX_VALUE,可以认为最大线程数是无限的 *keepAliveTime: 60L *unit: 秒 *workQueue: SynchronousQueue **/ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
Use case:
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; executor.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } }
It is worth noting that the return value of newCachedThreadPool is the ExecutorService type, which only contains basic thread pool methods, but does not include thread monitoring related methods. Therefore, specific circumstances must be considered when creating a new thread using a thread pool type with a return value of ExecutorService.
newSingleThreadExecutor is a single-threaded thread pool with only one core thread and uses only one shared thread to execute tasks to ensure that all tasks are executed as specified Sequential execution (FIFO, priority...)
//newSingleThreadExecutor创建线程池源码 public static ExecutorService newSingleThreadExecutor() { /** * corePoolSize : 1,核心线程池的数量为1 * maximumPoolSize : 1,只可以创建一个非核心线程 * keepAliveTime : 0L * unit => 秒 * workQueue => LinkedBlockingQueue **/ return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
When a task is submitted, a core thread will first be created to execute the task. If the number of core threads is exceeded, it will be put into the queue because LinkedBlockingQueue is A queue with a length of Integer.MAX_VALUE can be considered an unbounded queue, so an infinite number of tasks can be inserted into the queue, which can easily cause OOM exceptions when resources are limited. At the same time, because of the unbounded queue, the maximumPoolSize and keepAliveTime parameters will be invalid and will not work at all. Non-core threads will be created.
Fixed-length thread pool, the number of core threads and the maximum number of threads are passed in by the user. You can set the maximum concurrent number of threads, and wait in the queue after exceeding it
//newFixedThreadPool创建线程池源码 public static ExecutorService newFixedThreadPool(int nThreads) { /** * corePoolSize : 核心线程的数量为自定义输入nThreads * maximumPoolSize : 最大线程的数量为自定义输入nThreads * keepAliveTime : 0L * unit : 秒 * workQueue : LinkedBlockingQueue **/ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool is similar to SingleThreadExecutor. The only difference is that the number of core threads is different, and because LinkedBlockingQueue is used, it is easy to cause OOM exceptions when resources are limited.
Fixed-length thread pool, the number of core threads is passed in by the user, and supports scheduled and periodic task execution
//newScheduledThreadPool创建线程池源码 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { /** * corePoolSize : 核心线程的数量为自定义输入corePoolSize * maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE * keepAliveTime : 0L * unit : 纳秒 * workQueue : DelayedWorkQueue **/ super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
When a task is submitted, corePoolSize is automatically Define the input, core threads are created first, after the core threads are full, so eventually non-core threads are created to perform tasks. Non-core threads will be recycled after use. Because Integer.MAX_VALUE is very large, it can be considered that threads can be created infinitely, which can easily cause OOM exceptions when resources are limited. Because the DelayedWorkQueue used can implement scheduled and periodic tasks. ScheduledExecutorService provides three methods that can be used:
schedule: Execute the task after delay scheduleAtFixedRate: Execute the task at the specified rate scheduleWithFixedDelay: Execute the task with the specified delay. Use case:
public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); // executorService.schedule(new Runnable() { // @Override // public void run() { // log.warn("schedule run"); // } // //延迟3秒后执行 // }, 3, TimeUnit.SECONDS); // executorService.shutdown(); // executorService.scheduleWithFixedDelay(new Runnable() { // @Override // public void run() { // log.warn("scheduleWithFixedDelay run"); // } // //延迟一秒后每隔3秒执行 // }, 1, 3, TimeUnit.SECONDS); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { log.warn("schedule run"); } //延迟一秒后每隔3秒执行 }, 1, 3, TimeUnit.SECONDS); /** * 定时器调度,不推荐使用,推荐ScheduledExecutorService调度 */ // Timer timer = new Timer(); // timer.schedule(new TimerTask() { // @Override // public void run() { // log.warn("timer run"); // } // //从当前时间每隔5秒执行 // }, new Date(), 5 * 1000); }
The allowed request queue length of FixedThreadPool and SingleThreadExecutor is Integer.MAX_VALUE, which may accumulate a large number of requests, causing OOM exceptions
The number of threads allowed to be created by CachedThreadPool and newScheduledThreadPool is Integer.MAX_VALUE, which may create a large number of threads, causing OOM exceptions
This is why their use is prohibited The reason why Executors create a thread pool, but recommends creating ThreadPoolExecutor yourself
CPU-intensive: The size of the thread pool is recommended to be the number of CPUs 1. The number of CPUs can be obtained according to the Runtime.availableProcessors method IO-intensive: Number of CPUs * CPU utilization * (1 thread waiting time/thread CPU time) Mixed type: Divide tasks into It is CPU-intensive and IO-intensive, and then uses different thread pools for processing, so that each thread pool can be adjusted according to its own workload Blocking queue: It is recommended to use bounded queue, bounded The queue helps to avoid resource exhaustion. Rejection policy: The default policy is the AbortPolicy rejection policy, which throws a RejectedExecutionException directly in the program [because it is a runtime exception and does not force a catch]. This The handling is not elegant enough. The following strategies are recommended for handling rejections:
Catch the RejectedExecutionException exception in the program, and process the task in the caught exception. Use the CallerRunsPolicy rejection policy for the default rejection policy
. This policy will hand over the task to the thread that calls execute for execution [usually the main thread]. At this time, the main thread will not be able to do so for a period of time. Submit any task, causing the worker thread to handle the executing task. The submitted thread at this time will be saved in the TCP queue. If the TCP queue is full, it will affect the client. This is a gentle performance degradation.
Custom rejection policy only needs to be implemented The RejectedExecutionHandler interface can be used
If the task is not particularly important, it is also possible to use the DiscardPolicy and DiscardOldestPolicy rejection strategies to discard the task
If you use Executors The static method creates a ThreadPoolExecutor object. You can use Semaphore to limit the execution of tasks and avoid OOM exceptions
The above is the detailed content of How to use Java thread pool Executor. For more information, please follow other related articles on the PHP Chinese website!