Java 9 は、リアクティブ ストリーム プログラミングを定義する一連のインターフェイスを提供します。これらすべてのインターフェースは、java.util.concurrent.Flow
クラスで静的内部インターフェースとして定義されます。
Java リアクティブ プログラミングにおける重要な役割と概念を以下に示します。まず簡単に理解しましょう。
パブリッシャー (発行者) は、潜在的に無制限の数のプロデューサーです。順序付けされたデータ要素。受信した要求 (サブスクリプション) に基づいて、特定の数のデータ要素を現在のサブスクライバーに公開します。
サブスクライバーは、パブリッシャーからデータ要素をサブスクライブして受信します。パブリッシャーとのサブスクリプション関係を確立した後、パブリッシャーはサブスクリプション トークン (サブスクリプション) をサブスクライバーに送信し、サブスクライバーは自身の処理能力に基づいてパブリッシャーによってパブリッシュされたデータ要素の数を要求できます。
サブスクリプション トークン (サブスクリプション) は、サブスクライバーとパブリッシャーの間に確立されたサブスクリプション関係を表します。サブスクリプション関係が確立されると、パブリッシャーはそれをサブスクライバーに渡します。サブスクライバーはサブスクリプション トークンを使用して、データ要素の数の要求やサブスクライブの解除など、パブリッシャーと対話します。
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
onSubscribe
: サブスクライバーのサブスクリプション後にパブリッシャーが Called を受け入れるときアクションを実行し、サブスクリプション メッセージを公開する前に実行します。新しく作成された Subscription
サブスクリプション トークン オブジェクトは、このメソッドを通じてサブスクライバーに渡されます。
onNext
: 次に処理されるデータ項目の処理関数
onError
: パブリッシャーまたはサブスクリプションで回復不能なエラーが発生したとき Call
onComplete
: サブスクライバ呼び出し (onNext() メソッドを含む) が発生しないときに呼び出されます。
サブスクリプション トークン オブジェクトは、Subscriber.onSubscribe()
メソッド
public static interface Subscription { public void request(long n); public void cancel();}
## を通じて渡されます。 # request(long n) は、ノンブロッキング バックプレッシャーの概念の背後にある重要なメソッドです。加入者はこれを使用して、n 個を超える消費アイテムをリクエストします。このようにして、加入者は現在受信できるデータ量を制御します。
cancel()サブスクライバは自らサブスクリプションをキャンセルします。キャンセル後はデータ メッセージを受信しなくなります。
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
サブスクリプション インターフェイスサブスクリプション トークン インターフェイス通常はこれを実行します。これを実装するために自分自身をプログラムする必要はありません。 request() メソッドと cancel() メソッドの意味を知るだけで済みます。
Publisher インターフェイスPublisher インターフェイスである Java 9 では、デフォルトで SubmissionPublisher の実装が提供されています。Publisher インターフェイスを実装するメソッドに加えて、この実装クラスは submit( ) メッセージデータの送信が完了します。
サブスクライバ インターフェイスサブスクライバ インターフェイスは通常、自分で実装する必要があります。データサブスクリプションを受け取った後の処理ロジックは企業ごとに異なるためです。
Processor は、実際にはパブリッシャー インターフェイスとサブスクライバー インターフェイスのコレクションです。このインターフェイスは、データ型の変換とデータ処理が必要な場合にのみ実装する必要があります。
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription; //订阅令牌 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("订阅关系建立onSubscribe: " + subscription); this.subscription = subscription; subscription.request(2); } @Override public void onNext(String item) { System.out.println("item: " + item); // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送 //subscription.request(n); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class SubmissionPublisherExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); sb.subscribe(new MySubscriber()); //建立订阅关系,可以有多个订阅者 sb.submit("数据 1"); //发送消息1 sb.submit("数据 2"); //发送消息2 sb.submit("数据 3"); //发送消息3 executor.shutdown(); } }
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39MySubscriber#onSubscribe()item: データ 1
注意: パブリッシャーが 3 つの部分を送信した後であったとしてものデータのうち、MySubscriber は処理のために 2 つのデータのみを受信しました。これは、
item: データ 2
メソッドで subscription.request(2);
を使用したためです。これは「バック プレッシャー」のリアクティブ プログラミング効果です。処理できるデータ量に応じて、メッセージ発行者にどのくらいのデータを提供するかを通知します。
以上がjava9 Reactive Stream 応答性プログラミング API の新機能の使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。