Rumah > Java > javaTutorial > java ThreadPoolExecutor analisis contoh dasar penolakan kolam benang

java ThreadPoolExecutor analisis contoh dasar penolakan kolam benang

WBOY
Lepaskan: 2023-05-13 23:16:04
ke hadapan
1172 orang telah melayarinya

1.场景

线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞。

为了便于理解,将实际情景抽象为下面的代码:

ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(
        1,
        1,
        1,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(1),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.DiscardOldestPolicy());//新建线程池时核心线程数及最大线程数都设置为1,阻塞队列使用ArrayBlockingQueue,拒绝策略为DiscardOldestPolicy
public void doBusiness(){
    Task task1 = new Task();
    Task task2 = new Task();
    Task task3 = new Task();
    Future<Boolean> future1 = threadPoolExecutor.submit(task1);//当前工作线程为0,会新建一个worker作为工作线程,并执行task1
    Future<Boolean> future2 = threadPoolExecutor.submit(task2);//当前核心线程数已满,会将任务放入阻塞队列
    Future<Boolean> future3 = threadPoolExecutor.submit(task3);
    /*当前核心线程已满并且阻塞队列已满,execute()时会调用ThreadPoolExecutord的addWorker(command,false),由
    于目前task1还没执行完,则工作线程数量为1,已经达到了最大线程数,则addWorker(command,false)返回false,
    触发对应的拒绝策略,会从阻塞队列中移除task2对应的任务(阻塞队列中并不是直接放的task2,而是以task2为入
    参构造的一个FutureTask,参见AbstarctExecutorService的submit(Callable<T> task)方法*/
    try{
        boolean result = future2.get();
        System.out.println(result);
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
@Test
public void test_doBusiness(){
    doBusiness();//入口
}
private class Task implements Callable<Boolean>{
    @Override
    public Boolean call() throws Exception {
        try {
            Thread.sleep(1000);//模拟业务执行
            return true;
        }catch(Exception e){
            e.printStackTrace();
        }
        return true;
    }
}
Salin selepas log masuk

2. 原因分析

通过上面代码我们明白了阻塞队列会将task2对应的任务移除,那么为何移除之后调用get()方法线程会一直阻塞呢?

其实Future future2= threadPoolExecutor.submit(task2)实际会调用AbstractExecutorService的submit(Callable task)方法,并且最终返回的future2实际是一个FutureTask类型。

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
Salin selepas log masuk
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
Salin selepas log masuk

因此,我们直接看FutureTask的get()方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s &lt;= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
Salin selepas log masuk

由于future2已经从阻塞队列中移除,并且从始至终都没有工作线程执行它,即FutureTask的状态一直都为NEW状态,其会进入awaitDone(false,0L)中,接下列我们追踪该方法。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)//第一次进for循环时q==null,进入到该分支
            q = new WaitNode();
        else if (!queued)//第二次进for循环时queue为false,则使用CAS将q置为waiters的头结点
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else//将q置为头结点后,最终会进入这里调用park()方法,阻塞当前线程
            LockSupport.park(this);
    }
Salin selepas log masuk

从上面的代码可以看出调用future2.get()后会一直阻塞在park()方法处,这便是本次问题出现的原因,

3.总结

本次问题出现主要是同时满足了以下几点:

1)使用了有界的阻塞队列ArrayBlockingQueue

2)工作线程达到了线程池配置的最大线程数

3)拒绝策略使用了DiscardOldestPolicy(使用DiscardPolicy也会出现这个问题)

4.思考

我们日常使用线程池提交任务后,如果在任务执行完成之前调用future的get()方法,当前线程会进入阻塞状态,当任务执行完成后,才会将当前线程唤醒,如何从代码上分析该流程?

首先当任务提交到线程池,如果任务当前在阻塞队列中,则FutureTask的状态依然像上面的情况一样,是处于New状态,调用get()方法依然会到达LockSupport.park(this)处,将当前线程阻塞。什么时候才会将当前线程唤醒了?

那就是当存在工作线程Worker目前分配的任务执行完成后,其会去调用Worker类的getTask()方法从阻塞队列中拿到该任务,并执行该任务的run()方法,下面是FutureTask的run()方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);//如果任务执行成功,则调用set(V result)方法
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Salin selepas log masuk

其会在执行成功后,调用set(V result)方法

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();//
    }
}
Salin selepas log masuk

然后将FutureTask状态置为NORMAL(FutureTask的状态要和ThreadPoolExecutor的状态区分开),接着调用finishCompletion()方法

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;//q在await()方法中设置的,其值为调用get()方法的线程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);//唤醒该线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();//熟悉的钩子方法
    callable = null;        // to reduce footprint
}
Salin selepas log masuk

在finishCompletion中唤起因get()而阻塞的线程。

Atas ialah kandungan terperinci java ThreadPoolExecutor analisis contoh dasar penolakan kolam benang. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan