일반적으로 생산 작업의 속도는 소비 속도보다 빠릅니다. 세부 사항은 대기열 길이와 생산 및 소비 속도를 일치시키는 방법입니다.
전형적인 생산자-소비자 모델은 다음과 같습니다.
동시 환경에서 J.U.C에서 제공하는 Queue 구현을 사용하면 스레드의 생산과 소비를 쉽게 보장할 수 있습니다. 그 과정에서 안전. 여기서 주목해야 할 점은 생산자가 너무 빨리 생산하여 대기열 길이가 급증하고 결국 OutOfMemory가 트리거되는 것을 방지하기 위해 대기열이 초기 용량을 설정해야 한다는 것입니다.
생산이 소비보다 빠른 일반적인 상황에 적합합니다. 대기열이 가득 차면 작업이 무시되거나 실행되지 않는 것을 원하지 않습니다. 이때 생산자는 작업을 제출하기 전에 잠시 기다릴 수 있습니다. , 작업이 제출될 때까지 기다립니다. 대기열이 가득 차 있지 않은 경우에도 계속 작업을 제출하므로 낭비되는 유휴 시간이 없습니다. BlockingQueue는 이를 위해 구축되었습니다. ArrayBlockingQueue와 LinkedBlockingQueue는 큐가 실제로 작동할 때 각 잠금을 획득한 후 용량을 결정합니다.
게다가 대기열이 비어 있으면 소비자는 작업을 가져올 수 없으며 작업을 가져오기 전에 잠시 기다릴 수 있습니다. 더 나은 접근 방식은 BlockingQueue의 take 메서드를 사용하여 차단하고 기다리는 것입니다. 작업을 즉시 수행할 수 있습니다. 실행하려면 시간 초과 매개변수와 함께 오버로드된 take 메서드를 호출하는 것이 좋습니다. 스레드는 시간 초과 후에 종료됩니다. 이런 식으로 생산자가 실제로 생산을 중단하더라도 소비자는 무한정 기다리지 않게 됩니다.
그래서 차단을 지원하는 효율적인 생산 및 소비 모델이 구현됩니다.
잠깐만요. J.U.C가 스레드 풀 구현을 도와줬는데 왜 이 세트를 계속 사용해야 할까요? ExecutorService를 직접 사용하는 것이 더 편리하지 않나요?
ThreadPoolExecutor의 기본 구조를 살펴보겠습니다.
보시다시피 ThreadPoolExecutor에는 BlockingQueue 및 Consumer 부분이 구현되어 있습니다. , 스레드 수를 동적으로 조정하는 등 스레드 풀 구현을 직접 사용하면 많은 이점이 있습니다.
그런데 문제는 ThreadPoolExecutor를 구성할 때 BlockingQueue를 큐 구현으로 수동으로 지정하더라도 실제로 큐가 가득 차면 실행 메서드가 차단되지 않는다는 것입니다. ThreadPoolExecutor BlockingQueue의 비차단 제안 방법은
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
라고 합니다. 이때 결과를 얻으려면 뭔가 조치를 취해야 합니다. 생산자가 작업을 제출하고 대기열이 가득 차면 생산자가 차단될 수 있습니다. 작업이 완료될 때까지 기다립니다.
핵심은 동시 환경에서는 생산자가 대기열이 가득 찼는지 여부를 확인할 수 없으며 대기열이 가득 찼는지 확인하기 위해 ThreadPoolExecutor.getQueue().size()를 호출할 수 없다는 것입니다.
스레드 풀 구현에서 대기열이 가득 차면 생성 중에 전달된 RejectedExecutionHandler가 호출되어 작업 처리를 거부합니다. 기본 구현은 RejectedExecutionException을 직접 발생시키는 AbortPolicy입니다.
여기에서는 여러 가지 거부 전략을 설명하지 않습니다. 우리의 요구에 더 가까운 것은 CallerRunsPolicy입니다. 이 전략을 사용하면 대기열이 가득 찼을 때 작업을 제출한 스레드가 작업을 실행할 수 있습니다. 생산하기 생산자가 소비자의 작업을 일시적으로 수행하므로 생산자가 차단되지 않더라도 제출된 작업도 일시 중지됩니다.
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a <tt>CallerRunsPolicy</tt>. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
그러나 이 전략에는 생산자가 적을 때, 생산자가 작업을 소비하는 동안 소비자가 모든 작업을 소비하여 대기열이 빈 상태가 될 수도 있습니다. 생산자가 작업 실행을 마친 후에만 생산 작업을 계속할 수 있습니다. 이 프로세스로 인해 소비자 스레드가 부족해질 수 있습니다.
유사한 아이디어를 참고하면 가장 간단한 방법은 RejectedExecutionHandler를 직접 정의하고 대기열이 가득 차면 BlockingQueue.put을 호출하여 생산자 차단을 구현하는 것입니다.
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
이런 식으로 우리는 더 이상 대기열과 소비자의 논리에 신경 쓸 필요가 없습니다. 생산자와 소비자 스레드의 구현 논리에만 집중하고 작업을 스레드 풀에 제출하기만 하면 됩니다.
이 방법은 원래 디자인에 비해 코드 양을 크게 줄일 수 있으며 동시 환경에서 많은 문제를 피할 수 있습니다. 물론 제출 시 입력을 제한하기 위해 세마포어를 사용하는 등 다른 방법을 사용할 수도 있지만 단순히 생산자가 차단하기를 원하는 경우에는 복잡해집니다.
생산 차단을 지원하는 Java 스레드 풀과 관련된 더 많은 기사를 보려면 PHP 중국어 웹사이트를 주목하세요!