ホームページ > Java > &#&チュートリアル > Java マルチスレッド Future を使用して非同期タスクを取得する方法

Java マルチスレッド Future を使用して非同期タスクを取得する方法

PHPz
リリース: 2023-05-01 23:04:13
転載
1451 人が閲覧しました

Runnable の制限事項

前の記事で述べたように、Runnable インターフェイスをコーディングによって実装すると、指定したスレッド (またはスレッド プール) で実行される境界「タスク」が取得されます。

インターフェースを再度観察すると、メソッドの戻り値がないことを見つけるのは難しくありません。

public interface Runnable {
    void run();
}
ログイン後にコピー

JDK1.5 より前では、タスクを実行するには、重要なセクションのリソースにアクセスするためにスレッドを慎重に操作する必要があります。 Callback を使用して分離することは非常に良い選択です。

練習用の小さなデモ -- 以前の記事の知識を復習してください

ラムダは長さを減らすために使用されますが、ラムダは jdk1.5 より前ではサポートされていないことに注意してください

計算タスクを他のスレッドに分離して実行し、メイン スレッドに戻って結果を消費します

計算や IO などの時間のかかるタスクを他のスレッドに投げて、メイン スレッドがそのタスクに集中できるようにします。ユーザー入力を受け入れてフィードバックを処理することを想定していますが、この部分は省略しましょう。

次のようなコードを設計できます。

ただし、まだ理不尽な点が多くありますが、最適化、デモンストレーションには十分です

class Demo {
    static final Object queueLock = new Object();
    static List<Runnable> mainQueue = new ArrayList<>();
    static boolean running = true;
    static final Runnable FINISH = () -> running = false;
    public static void main(String[] args) {
        synchronized (queueLock) {
            mainQueue.add(Demo::onStart);
        }
        while (running) {
            Runnable runnable = null;
            synchronized (queueLock) {
                if (!mainQueue.isEmpty())
                    runnable = mainQueue.remove(0);
            }
            if (runnable != null) {
                runnable.run();
            }
            Thread.yield();
        }
    }
    public static void onStart() {
        //...
    }
    public static void finish() {
        synchronized (queueLock) {
            mainQueue.clear();
            mainQueue.add(FINISH);
        }
    }
}
ログイン後にコピー

コンピューティング スレッドとタスク コールバックをシミュレートします:

interface Callback {
    void onResultCalculated(int result);
}
class CalcThread extends Thread {
    private final Callback callback;
    private final int a;
    private final int b;
    public CalcThread(Callback callback, int a, int b) {
        this.callback = callback;
        this.a = a;
        this.b = b;
    }
    @Override
    public void run() {
        super.run();
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        final int result = a + b;
        System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
        synchronized (queueLock) {
            mainQueue.add(() -> callback.onResultCalculated(result));
        }
    }
}
ログイン後にコピー

onStart ビジネスを入力します:

class Demo {
    public static void onStart() {
        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
        new CalcThread(result -> {
            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
            finish();
        }, 200, 300).start();
    }
}
ログイン後にコピー

Review: Runnable を使用するように最適化します

上で述べたように、ビジネスがタスクの実行だけに集中し、スレッド自体にはあまり関心がない場合は、Runnable を使用できます。

class Demo {
    static class CalcRunnable implements Runnable {
        private final Callback callback;
        private final int a;
        private final int b;
        public CalcRunnable(Callback callback, int a, int b) {
            this.callback = callback;
            this.a = a;
            this.b = b;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            final int result = a + b;
            System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
            synchronized (queueLock) {
                mainQueue.add(() -> callback.onResultCalculated(result));
            }
        }
    }
    public static void onStart() {
        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
        new Thread(new CalcRunnable(result -> {
            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
            finish();
        }, 200, 300)).start();
    }
}
ログイン後にコピー

これは難しくありません。想像してみてください: 特定のスレッド、特定の種類のスレッドがタスクを簡単に受信できるようにするには、

  • が非常に必要です。このシリーズの記事でスレッド プールを振り返ると、スレッド プールが誕生しました。

  • Synchronize よりも軽量なメカニズムを備えています

  • より便利なデータ構造を備えています

この時点で、次のことがわかります。JDK1.5 より前では、JDK の機能が不十分だったため、Java プログラムはスレッドrougher を使用していました。

Future Born for asynchronous

ついに JDK1.5 に新機能が追加されました: Future と前の記事で言及したスレッド プール、時が経つのは早いもので、ほぼ 20 年が経過しました。 。

/**
 * 略
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future&#39;s {@code get} method
 */
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}
ログイン後にコピー

API コメントは削除されていますが、詳しく説明しなくても各 API の意味は理解できます。

明らかに、戻り値を増やすために、Runnable をそのような複雑な インターフェイスに置き換える必要はありません。簡単に考えた後、戻り値を次のように要約できます。

  • #Runnable でのビジネスの結果 (計算、リソースの読み取りなど) を返します。

  • Runnable の実行後に結果を返すだけです

ビジネス層の観点から見ると、次のインターフェイスのみが必要です。これにより、戻り値が増加し、より使いやすくなります。例外:

著者注: 基礎となる実装は脇に置いて、ビジネス側のコーディングのニーズのみに注目してください

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
ログイン後にコピー

明らかに、JDK は下位互換性を提供する必要があります:

  • Runnable は破棄できませんし、破棄すべきでもありません

  • ユーザーにコードを完全に再構築するよう要求することはできません

、アダプターも提供されています ユーザーが新機能を利用するために簡単な部分リファクタリングを実行できるようにします

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
ログイン後にコピー

そしてFutureはその名の通り、「未来」における結果や状態を表し、それを処理するために生まれました非同期でより便利に。

には

FutureTask が組み込まれています。これについては、FutureTask の詳細な説明の章で展開します。

クラス図

JDK1.8に基づいて、合理化されたクラス図の構造を見てみましょう:

Java マルチスレッド Future を使用して非同期タスクを取得する方法##FutureTaskの詳細な説明

Constructor

public class FutureTask {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
}
ログイン後にコピー

Lifecycle

public class FutureTask {
    //新建
    private static final int NEW = 0;
    //处理中
    private static final int COMPLETING = 1;
    //正常
    private static final int NORMAL = 2;
    //异常
    private static final int EXCEPTIONAL = 3;
    //已取消
    private static final int CANCELLED = 4;
    //中断中
    private static final int INTERRUPTING = 5;
    //已中断
    private static final int INTERRUPTED = 6;
}
ログイン後にコピー

可能なライフサイクル変換は次のとおりです:

##NEW -> COMPLETING -> NORMAL
  • 新規 -> 完了 -> 例外的
  • 新規 -> キャンセル済み
  • 新規 -> ; INTERRUPTING -> INTERRUPTED
  • #JDK での元の説明は次のとおりです:

このタスクの実行状態、最初は NEW。 set、setException、および cancel メソッドでのみ終了状態に遷移します。完了中、状態は COMPLETING (結果が設定されている間) または INTERRUPTING (cancel(true) を満たすためにランナーを中断している間のみ) の一時的な値をとることがあります。 ). これらの中間状態から最終状態への遷移では、値が一意であり、それ以上変更できないため、より安価な順序付け/遅延書き込みが使用されます。

コア メソッド

このセクションは次から始まります。ソースコードを読むための 3 つのブロック

ステータス判定
  • キャンセル
  • #結果を取得
  • ステータス判定 API の実装は非常に簡単です
  • public class FutureTask {
        public boolean isCancelled() {
            return state >= CANCELLED;
        }
        public boolean isDone() {
            return state != NEW;
        }
    }
    ログイン後にコピー
キャンセル:

現在のステータスは
    NEW
  • と CAS は状態を正常に変更します。それ以外の場合はキャンセル失敗を返します。

    #If

    mayInterruptIfRunning
  • は実行中のスレッドを中断し、CAS は状態を INTERRUPTED
  • # に変更します。

    ##finishCompletion を呼び出します

    待機中のすべてのスレッドを削除して通知します
  • ##Done()を呼び出します
  • #callable を null に設定します

public class FutureTask {
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) {
            return false;
        }
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }
}
ログイン後にコピー

获取结果: 先判断状态,如果未进入到 COMPLETING(即为NEW状态),则阻塞等待状态改变,返回结果或抛出异常

public class FutureTask {
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }
}
ログイン後にコピー

如何使用

而使用则非常简单,也非常的朴素。

我们以文中的的例子进行改造:

  • 沿用原Runnable逻辑

  • 移除回调,增加 CalcResult

  • CalcResult 对象作为既定返回结果,Runnable中设置其属性

class Demo {
   static class CalcResult {
      public int result;
   }
   public static void onStart() {
      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
      final CalcResult calcResult = new CalcResult();
      Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> {
         try {
            Thread.sleep(10);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         final int result = 200 + 300;
         System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
         calcResult.result = result;
      }, calcResult);
      System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis());
      if (resultFuture.isDone()) {
         try {
            final int ret = resultFuture.get().result;
            System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
         }
      }
      finish();
   }
}
ログイン後にコピー

如果直接使用新特性Callback,则如下:

直接返回结果,当然也可以直接返回Integer,不再包裹一层

class Demo {
   public static void onStart() {
      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());
      ExecutorService executor = Executors.newSingleThreadExecutor();
      Future<CalcResult> resultFuture = executor.submit(() -> {
         try {
            Thread.sleep(10);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         final int result = 200 + 300;
         System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());
         final CalcResult calcResult = new CalcResult();
         calcResult.result = result;
         return calcResult;
      });
      System.out.println("threadId" + Thread.currentThread().getId() + "反正干点什么," + System.currentTimeMillis());
      if (resultFuture.isDone()) {
         try {
            final int ret = resultFuture.get().result;
            System.out.println("threadId" + Thread.currentThread().getId() + ",get result:" + ret + ";" + System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
         }
      }
      executor.shutdown();
      finish();
   }
}
ログイン後にコピー

相信读者诸君会有这样的疑惑:

为何使用Future比原先的回调看起来粗糙?

首先要明确一点:文中前段的回调Demo,虽然达成了既定目标,但效率并不高!!在当时计算很昂贵的背景下,并不会如此莽撞地使用!

而在JDK1.5开始,提供了大量内容支持多线程开发。考虑到篇幅,会在系列文章中逐步展开。

另外,FutureTask中的CAS与Happens-Before本篇中亦不做展开。

接下来,再做一些引申,简单看一看多线程业务模式。

引申,多线程业务模式

常用的多线程设计模式包括:

  • Future模式

  • Master-Worker模式

  • Guarded Suspension模式

  • 不变模式

  • 生产者-消费

Future模式

文中对于Future的使用方式遵循了Future模式。

业务方在使用时,已经明确了任务被分离到其他线程执行时有等待期,在此期间,可以干点别的事情,不必浪费系统资源。

Master-Worker模式

在程序系统中设计两类线程,并相互协作:

  • Master线程(单个)

  • Worker线程

Master线程负责接受任务、分配任务、接收(必要时进一步组合)结果并返回;

Worker线程负责处理子任务,当子任务处理完成后,向Master线程返回结果;

作者按:此时可再次回想一下文章开头的Demo

Guarded Suspension模式

  • 使用缓存队列,使得 服务线程/服务进程 在未就绪、忙碌时能够延迟处理请求。

  • 使用等待-通知机制,将消费 服务的返回结果 的方式规范化

不变模式

在并行开发过程中,为确保数据的一致性和正确性,有必要对对象进行同步,而同步操作会对程序系统的性能产生相当的损耗。

因此,使用状态不可改变的对象,依靠其不变性来确保 并行操作没有同步机制 的情况下,保持一致性和正确性。

  • 对象创建后,其内部状态和数据不再发生改变

  • 对象被共享、被多个线程访问

生产者-消费

设计两类线程:若干个生产者线程和若干个消费者线程。

生产者线程负责提交用户请求,消费者线程负责处理用户请求。生产者和消费者之间通过共享内存缓冲区进行通信。

内存缓冲区的意义:

  • 解决是数据在多线程间的共享问题

  • 缓解生产者和消费者之间的性能差

这几种模式从不同角度出发解决特定问题,但亦有一定的相似之处,不再展开。

以上がJava マルチスレッド Future を使用して非同期タスクを取得する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート