Java java지도 시간 Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)

Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)

Mar 29, 2017 am 10:31 AM

본 글은 주로 Java의 ThreadPoolExecutor 원리 분석 관련 정보를 소개하고 있으며, 필요하신 분들은

Java의 ThreadPoolExecutor 원리 분석

을 참고하시기 바랍니다. 🎜> Thread Pool 소개

Java Thread Pool은 개발 시 흔히 사용하는 도구로, 비동기식, 병렬 작업을 처리할 때 자주 사용됩니다. 또는 서버를 구현할 때

연결 처리 요청을 받기 위해 스레드 풀도 사용해야 합니다.

스레드 풀은

을 사용합니다. JDK에서 제공하는 스레드 풀 구현은

java.util.concurrent.ThreadPoolExecutor에 있습니다. 사용 시 일반적으로 submit, InvokeAll, shutdown 등의 공통 메소드를 제공하는 ExecutorService인터페이스를 사용한다.

스레드 풀 구성 측면에서 Executors 클래스는

newFixedThreadPool, newCachedThreadPool, newSingleThreadExecutor정적 메서드를 제공합니다. 🎜> 등, 이러한 메서드는 결국 ThreadPoolExecutor생성자를 호출합니다. 모든 매개변수를 포함하는 ThreadPoolExecutor의 생성자는

/**
   * @param corePoolSize the number of threads to keep in the pool, even
   *    if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *    pool
   * @param keepAliveTime when the number of threads is greater than
   *    the core, this is the maximum time that excess idle threads
   *    will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *    executed. This queue will hold only the {@code Runnable}
   *    tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *    creates a new thread
   * @param handler the handler to use when execution is blocked
   *    because the thread bounds and queue capacities are reached
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
  }
로그인 후 복사

    corePoolSize입니다. 새 작업을 추가할 때 스레드 풀의 코어 스레드 수를 설정합니다. 스레드 풀의 스레드 corePoolSize보다 작으면 현재 유휴 스레드가 있는지 여부에 관계없이 작업을 수행하기 위해 새 스레드가 생성됩니다.
  • maximunPoolSize는 스레드 풀에 허용되는 최대 스레드 수입니다.
  • workQueue는 대기 중인 작업을 저장하는 데 사용됩니다
  • keepAliveTime은 corePoolSize
  • 보다 큰 스레드에 대한 유휴 시간 초과 시간입니다. 작업이 이스케이프되고 스레드 풀이 닫힐 때 작업 처리에 사용됩니다. 스레드 풀은 현재 스레드 수가 corePoolSize보다 적으면
  • 에서 새

    스레드를 추가합니다. 스레드 수 = corePoolSize 및 corePoolSize인 경우 workQueue가 새 작업을 저장할 수 없는 경우에만 새 스레드가 생성됩니다. 초과 스레드는 유휴 keepAliveTime 후에 삭제됩니다.

구현(JDK1.8 기준)


ThreadPoolExecutor에 저장된 상태는

입니다.
RUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED를 포함한 현재 스레드 풀 상태입니다.


현재 실행 중인 스레드의 유효 개수입니다.


이 두 상태를 int 변수에 넣으면 처음 3자리는 스레드 풀 상태, 마지막 29자리는 스레드 개수입니다.


예를 들어 0b11100000000000000000000000000001은 스레드인 RUNNING을 나타냅니다.

HashSet을 통해 작업자 세트를 저장합니다. HashSet에 액세스하기 전에 먼저 보호된 mainLock:ReentrantLock

제출, 실행


execute 실행 방법은 먼저 현재 워커 개수를 확인하는 것인데, corePoolSize보다 작다면 코어 워커를 추가해 보세요. 스레드 풀은 스레드 수 유지 및 상태 확인에 대해 많은 테스트를 수행합니다.

public void execute(Runnable command) {
    int c = ctl.get();
    // 如果当期数量小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
      // 尝试增加worker
      if (addWorker(command, true))
        return;
      c = ctl.get();
    }
    // 如果线程池正在运行并且成功添加到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
      // 再次检查状态,如果已经关闭则执行拒绝处理
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
        reject(command);
      // 如果工作线程都down了
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
    }
    else if (!addWorker(command, false))
      reject(command);
  }
로그인 후 복사

addWorker 메소드 구현

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
      int c = ctl.get();
      int rs = runStateOf(c);
      // Check if queue empty only if necessary.
      if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
          firstTask == null &&
          ! workQueue.isEmpty()))
        return false;
      for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
          return false;
        if (compareAndIncrementWorkerCount(c))
          break retry;
        c = ctl.get(); // Re-read ctl
        if (runStateOf(c) != rs)
          continue retry;
        // else CAS failed due to workerCount change; retry inner loop
      }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
      w = new Worker(firstTask);
      final Thread t = w.thread;
      if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
          // Recheck while holding lock.
          // Back out on ThreadFactory failure or if
          // shut down before lock acquired.
          int rs = runStateOf(ctl.get());
          if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
              throw new IllegalThreadStateException();
            workers.add(w);
            int s = workers.size();
            if (s > largestPoolSize)
              largestPoolSize = s;
            workerAdded = true;
          }
        } finally {
          mainLock.unlock();
        }
        if (workerAdded) {
          // 如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)
          t.start();
          workerStarted = true;
        }
      }
    } finally {
      if (! workerStarted)
        addWorkerFailed(w);
    }
    return workerStarted;
  }
로그인 후 복사
Worker 클래스는 동기 대기 기능을 얻기 위해 AbstractQueuedSynchronizer를 상속합니다.

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
  {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in. Null if factory fails. */
    final Thread thread;
    /** Initial task to run. Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker */
    public void run() {
      runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
      return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    protected boolean tryRelease(int unused) {
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }
    public void lock()    { acquire(1); }
    public boolean tryLock() { return tryAcquire(1); }
    public void unlock()   { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
      Thread t;
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        }
      }
    }
로그인 후 복사

runWorker(Worker)는 Work Queue에서 지속적으로 작업을 가져와서 실행하는 Worker의 폴링 실행 로직입니다. 작업을 실행하는 동안 작업자가 중단되는 것을 방지하려면 각 작업을 실행하기 전에 작업자를 잠가야 합니다.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      while (task != null || (task = getTask()) != null) {
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted. This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
           runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
          wt.interrupt();
        try {
          beforeExecute(wt, task);
          Throwable thrown = null;
          try {
            task.run();
          } catch (RuntimeException x) {
            thrown = x; throw x;
          } catch (Error x) {
            thrown = x; throw x;
          } catch (Throwable x) {
            thrown = x; throw new Error(x);
          } finally {
            afterExecute(task, thrown);
          }
        } finally {
          task = null;
          w.completedTasks++;
          w.unlock();
        }
      }
      completedAbruptly = false;
    } finally {
      processWorkerExit(w, completedAbruptly);
    }
  }
로그인 후 복사

ThreadPoolExecutor의 submit 메소드에서 Callable은 FutureTask로 패키징된 후 Execute 메소드로 넘겨집니다.

FutureTask


FutureTask는 Runnable과 Future에서 상속합니다.

NEW, 아직 실행되지 않음,

COMPLETING,
NORMAL을 실행하면 정상 실행이 완료되고 결과를 얻습니다
EXCEPTIONAL, 실행
예외가 발생합니다
CANCELLED, 실행이 취소됩니다INTERRUPTING, 실행이 중단됩니다
INTERRUPTED, 중단되었습니다.


키 get 메소드

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
로그인 후 복사

가 먼저 현재 상태를 가져옵니다. 실행이 완료되지 않고 정상이면 대기 결과 프로세스로 들어갑니다. 현재 상태를 얻기 위해 계속해서

루프를 돌립니다. 결과가 없으면 CAS를 통해 대기 목록의 선두에 자신을 추가합니다. 시간 초과가 설정된 경우 LockSupport.parkNanos는 지정된 시간에 추가됩니다.

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
        if (q != null)
          q.thread = null;
        return s;
      }
      else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
      else if (q == null)
        q = new WaitNode();
      else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                           q.next = waiters, q);
      else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
          removeWaiter(q);
          return state;
        }
        LockSupport.parkNanos(this, nanos);
      }
      else
        LockSupport.park(this);
    }
  }
로그인 후 복사
FutureTask의 실행 메소드는 작업을 실행하고 결과의 위치를 ​​설정하는 것입니다. 먼저 현재 상태가 NEW인지 확인하고 현재 스레드를 실행 스레드로 설정한 다음 Callable의 호출을 호출하여 가져옵니다. 결과를 설정하고 결과를 설정하여 FutureTask 상태를 수정합니다. 아아아아

위 내용은 Java의 ThreadPoolExecutor 원리에 대한 자세한 분석(코드 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
1 몇 달 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
1 몇 달 전 By 尊渡假赌尊渡假赌尊渡假赌
Will R.E.P.O. 크로스 플레이가 있습니까?
1 몇 달 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

자바의 완전수 자바의 완전수 Aug 30, 2024 pm 04:28 PM

Java의 완전수 가이드. 여기서는 정의, Java에서 완전 숫자를 확인하는 방법, 코드 구현 예제에 대해 논의합니다.

자바의 웨카 자바의 웨카 Aug 30, 2024 pm 04:28 PM

Java의 Weka 가이드. 여기에서는 소개, weka java 사용 방법, 플랫폼 유형 및 장점을 예제와 함께 설명합니다.

Java의 스미스 번호 Java의 스미스 번호 Aug 30, 2024 pm 04:28 PM

Java의 Smith Number 가이드. 여기서는 정의, Java에서 스미스 번호를 확인하는 방법에 대해 논의합니다. 코드 구현의 예.

Java Spring 인터뷰 질문 Java Spring 인터뷰 질문 Aug 30, 2024 pm 04:29 PM

이 기사에서는 가장 많이 묻는 Java Spring 면접 질문과 자세한 답변을 보관했습니다. 그래야 면접에 합격할 수 있습니다.

Java 8 Stream foreach에서 나누거나 돌아 오시겠습니까? Java 8 Stream foreach에서 나누거나 돌아 오시겠습니까? Feb 07, 2025 pm 12:09 PM

Java 8은 스트림 API를 소개하여 데이터 컬렉션을 처리하는 강력하고 표현적인 방법을 제공합니다. 그러나 스트림을 사용할 때 일반적인 질문은 다음과 같은 것입니다. 기존 루프는 조기 중단 또는 반환을 허용하지만 스트림의 Foreach 메소드는이 방법을 직접 지원하지 않습니다. 이 기사는 이유를 설명하고 스트림 처리 시스템에서 조기 종료를 구현하기위한 대체 방법을 탐색합니다. 추가 읽기 : Java Stream API 개선 스트림 foreach를 이해하십시오 Foreach 메소드는 스트림의 각 요소에서 하나의 작업을 수행하는 터미널 작동입니다. 디자인 의도입니다

Java의 날짜까지의 타임스탬프 Java의 날짜까지의 타임스탬프 Aug 30, 2024 pm 04:28 PM

Java의 TimeStamp to Date 안내. 여기서는 소개와 예제와 함께 Java에서 타임스탬프를 날짜로 변환하는 방법에 대해서도 설명합니다.

캡슐의 양을 찾기위한 Java 프로그램 캡슐의 양을 찾기위한 Java 프로그램 Feb 07, 2025 am 11:37 AM

캡슐은 3 차원 기하학적 그림이며, 양쪽 끝에 실린더와 반구로 구성됩니다. 캡슐의 부피는 실린더의 부피와 양쪽 끝에 반구의 부피를 첨가하여 계산할 수 있습니다. 이 튜토리얼은 다른 방법을 사용하여 Java에서 주어진 캡슐의 부피를 계산하는 방법에 대해 논의합니다. 캡슐 볼륨 공식 캡슐 볼륨에 대한 공식은 다음과 같습니다. 캡슐 부피 = 원통형 볼륨 2 반구 볼륨 안에, R : 반구의 반경. H : 실린더의 높이 (반구 제외). 예 1 입력하다 반경 = 5 단위 높이 = 10 단위 산출 볼륨 = 1570.8 입방 단위 설명하다 공식을 사용하여 볼륨 계산 : 부피 = π × r2 × h (4

Spring Tool Suite에서 첫 번째 Spring Boot 응용 프로그램을 실행하는 방법은 무엇입니까? Spring Tool Suite에서 첫 번째 Spring Boot 응용 프로그램을 실행하는 방법은 무엇입니까? Feb 07, 2025 pm 12:11 PM

Spring Boot는 강력하고 확장 가능하며 생산 가능한 Java 응용 프로그램의 생성을 단순화하여 Java 개발에 혁명을 일으킨다. Spring Ecosystem에 내재 된 "구성에 대한 협약"접근 방식은 수동 설정, Allo를 최소화합니다.

See all articles