在前文中我們談到,透過編碼實作Runnable接口,將獲得具有邊界性的 "任務",在指定的執行緒(或執行緒池)中運行。
重新觀察該接口,不難發現它並沒有方法返回值:
public interface Runnable { void run(); }
在JDK1.5之前,想利用任務的執行結果,需要小心的操作線程訪問臨界區資源。使用 回呼
進行解耦是非常不錯的選擇。
注意,為了減少篇幅使用了lambda,但jdk1.5之前並不支援lambda
將計算任務分離到其他線程執行,再回到主線程消費結果
我們將計算、IO等耗時任務丟到其他線程,讓主線程專注於自身業務,假想它在接受用戶輸入以及處理反饋,但我們略去這一部分
我們可以設計出類似下面的程式碼:
雖然它還有很多不合理之處值得優化,但也足以用於演示
class Demo { static final Object queueLock = new Object(); static List<Runnable> mainQueue = new ArrayList<>(); static boolean running = true; static final Runnable FINISH = () -> running = false; public static void main(String[] args) { synchronized (queueLock) { mainQueue.add(Demo::onStart); } while (running) { Runnable runnable = null; synchronized (queueLock) { if (!mainQueue.isEmpty()) runnable = mainQueue.remove(0); } if (runnable != null) { runnable.run(); } Thread.yield(); } } public static void onStart() { //... } public static void finish() { synchronized (queueLock) { mainQueue.clear(); mainQueue.add(FINISH); } } }
再模擬一個計算的線程和任務回調:
interface Callback { void onResultCalculated(int result); } class CalcThread extends Thread { private final Callback callback; private final int a; private final int b; public CalcThread(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { super.run(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } }
填充一下onStart業務:
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new CalcThread(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300).start(); } }
在前文我們提到,如果業務只專注於任務的執行,並不過於關心執行緒本身,則可以利用Runnable:
class Demo { static class CalcRunnable implements Runnable { private final Callback callback; private final int a; private final int b; public CalcRunnable(Callback callback, int a, int b) { this.callback = callback; this.a = a; this.b = b; } @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = a + b; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); synchronized (queueLock) { mainQueue.add(() -> callback.onResultCalculated(result)); } } } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); new Thread(new CalcRunnable(result -> { System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis()); finish(); }, 200, 300)).start(); } }
不難想像出:我們非常需要
較為粗糙。
為非同步而生的Future終於在JDK1.5中,迎來了新特性:Future 以及先前文章中提到的線程池, 時光荏苒,一晃將近20年了。
/** * 略 * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's {@code get} method */ public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
沒有必要用如此複雜的 介面來取代 Runnable。簡單思考後可以對回傳值的情況進行歸納:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
FutureTask,在 FutureTask詳解 章節中再行展開。
public class FutureTask { public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } }
public class FutureTask { //新建 private static final int NEW = 0; //处理中 private static final int COMPLETING = 1; //正常 private static final int NORMAL = 2; //异常 private static final int EXCEPTIONAL = 3; //已取消 private static final int CANCELLED = 4; //中断中 private static final int INTERRUPTING = 5; //已中断 private static final int INTERRUPTED = 6; }
The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException, and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or INTERRUPTING (on cel RUPT Transitions from these intermediate to final states use cheaper ordered/lazy writes because values are unique and cannot be further modified.核心方法
public class FutureTask { public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } }
取消:
且CAS修改state 成功,否則回傳取消失敗
則中斷在執行的執行緒並CAS修改state為INTERRUPTED
#刪除並通知所有等待的執行緒
呼叫done()
設定callable為null
public class FutureTask { public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) { return false; } try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } }
获取结果: 先判断状态,如果未进入到 COMPLETING
(即为NEW状态),则阻塞等待状态改变,返回结果或抛出异常
public class FutureTask { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); } }
而使用则非常简单,也非常的朴素。
我们以文中的的例子进行改造:
沿用原Runnable逻辑
移除回调,增加 CalcResult
将 CalcResult
对象作为既定返回结果,Runnable中设置其属性
class Demo { static class CalcResult { public int result; } public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); calcResult.result = result; }, calcResult); System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } finish(); } }
如果直接使用新特性Callback,则如下:
直接返回结果,当然也可以直接返回Integer,不再包裹一层
class Demo { public static void onStart() { System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis()); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<CalcResult> resultFuture = executor.submit(() -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } final int result = 200 + 300; System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis()); final CalcResult calcResult = new CalcResult(); calcResult.result = result; return calcResult; }); System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis()); if (resultFuture.isDone()) { try { final int ret = resultFuture.get().result; System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); finish(); } }
相信读者诸君会有这样的疑惑:
为何使用Future比原先的回调看起来粗糙?
首先要明确一点:文中前段的回调Demo,虽然达成了既定目标,但效率并不高!!在当时计算很昂贵的背景下,并不会如此莽撞地使用!
而在JDK1.5开始,提供了大量内容支持多线程开发。考虑到篇幅,会在系列文章中逐步展开。
另外,FutureTask中的CAS与Happens-Before本篇中亦不做展开。
接下来,再做一些引申,简单看一看多线程业务模式。
常用的多线程设计模式包括:
Future模式
Master-Worker模式
Guarded Suspension模式
不变模式
生产者-消费
文中对于Future的使用方式遵循了Future模式。
业务方在使用时,已经明确了任务被分离到其他线程执行时有等待期,在此期间,可以干点别的事情,不必浪费系统资源。
在程序系统中设计两类线程,并相互协作:
Master线程(单个)
Worker线程
Master线程负责接受任务、分配任务、接收(必要时进一步组合)结果并返回;
Worker线程负责处理子任务,当子任务处理完成后,向Master线程返回结果;
作者按:此时可再次回想一下文章开头的Demo
使用缓存队列,使得 服务线程/服务进程 在未就绪、忙碌时能够延迟处理请求。
使用等待-通知机制,将消费 服务的返回结果
的方式规范化
在并行开发过程中,为确保数据的一致性和正确性,有必要对对象进行同步,而同步操作会对程序系统的性能产生相当的损耗。
因此,使用状态不可改变的对象,依靠其不变性来确保 并行操作 在 没有同步机制 的情况下,保持一致性和正确性。
对象创建后,其内部状态和数据不再发生改变
对象被共享、被多个线程访问
设计两类线程:若干个生产者线程和若干个消费者线程。
生产者线程负责提交用户请求,消费者线程负责处理用户请求。生产者和消费者之间通过共享内存缓冲区进行通信。
内存缓冲区的意义:
解决是数据在多线程间的共享问题
缓解生产者和消费者之间的性能差
这几种模式从不同角度出发解决特定问题,但亦有一定的相似之处,不再展开。
以上是怎麼使用Java多執行緒Future取得非同步任務的詳細內容。更多資訊請關注PHP中文網其他相關文章!