Generally speaking, the speed of production tasks is greater than the speed of consumption. A detail is the queue length and how to match the speed of production and consumption.
A typical producer-consumer model is as follows:
Using the Queue implementation provided by J.U.C in a concurrent environment can easily ensure production and consumption. Thread safety in the process. What needs to be noted here is that the Queue must set the initial capacity to prevent the producer from producing too quickly, causing the queue length to skyrocket, and eventually triggering OutOfMemory.
For the general situation where production is faster than consumption. When the queue is full, we do not want any tasks to be ignored or not executed. At this time, the producer can wait for a while before submitting the task. A better approach is to block the producer in the method of submitting the task, and wait for the task to be submitted. Continue to submit tasks when the queue is not full, so there is no wasted idle time. Blocking is also very easy. BlockingQueue is built for this. Both ArrayBlockingQueue and LinkedBlockingQueue can provide capacity limits when constructing. LinkedBlockingQueue determines the capacity after each lock is obtained when the queue is actually operated.
Furthermore, when the queue is empty, the consumer cannot get the task, so he can wait for a while and then get it again. A better approach is to use the take method of BlockingQueue to block and wait, and when there is a task, he can immediately To obtain execution, it is recommended to call the overloaded method of take with a timeout parameter. The thread will exit after the timeout. In this way, when the producer has actually stopped producing, the consumer will not be left waiting indefinitely.
So an efficient production and consumption model that supports blocking is implemented.
Wait a minute, since J.U.C has helped us implement the thread pool, why do we still need to use this set of things? Isn't it more convenient to use ExecutorService directly?
Let’s take a look at the basic structure of ThreadPoolExecutor:
As you can see, in ThreadPoolExecutor, the BlockingQueue and Consumer parts have been implemented for us, and There are many advantages to directly using the thread pool implementation, such as dynamic adjustment of the number of threads.
But the problem is that even if you manually specify a BlockingQueue as a queue implementation when constructing ThreadPoolExecutor, in fact when the queue is full, the execute method will not block because ThreadPoolExecutor The non-blocking offer method of BlockingQueue is called:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
At this time, something needs to be done to achieve a result: when the producer submits the task and the queue is full, the producer can be blocked and wait for the task. be consumed.
The key is that in a concurrent environment, the producer cannot determine whether the queue is full, and ThreadPoolExecutor.getQueue().size() cannot be called to determine whether the queue is full.
In the implementation of the thread pool, when the queue is full, the RejectedExecutionHandler passed in during construction will be called to reject the processing of the task. The default implementation is AbortPolicy, which directly throws a RejectedExecutionException.
I won’t go into details here. The one that is closer to our needs is CallerRunsPolicy. This strategy will allow the thread that submitted the task to execute the task when the queue is full, which is equivalent to letting the production The producer temporarily does the work of the consumer, so that although the producer is not blocked, the submitted task will also be suspended.
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
But this strategy also has hidden dangers. When there are few producers, during the time when the producers consume tasks, the consumers may have finished consuming all the tasks, and the queue will be in an empty state. When the producers finish executing the tasks, Only then can the production task be continued. This process may cause starvation of the consumer thread.
Referring to a similar idea, the simplest way is to define a RejectedExecutionHandler directly, and when the queue is full, call BlockingQueue.put to implement producer blocking:
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
In this way, we You no longer need to care about the logic of Queue and Consumer, just focus on the implementation logic of producer and consumer threads, and just submit tasks to the thread pool.
Compared with the original design, this method can reduce the amount of code a lot, and can avoid many problems in concurrent environments. Of course, you can also use other methods, such as using semaphores to limit entry when submitting, but if you just want the producer to block, it becomes complicated.
For more articles related to Java thread pools that support production blocking, please pay attention to the PHP Chinese website!