Maison > Java > javaDidacticiel > le corps du texte

Comment utiliser la nouvelle fonctionnalité de l'API de programmation réactive Java9 Reactive Stream

王林
Libérer: 2023-05-10 13:46:06
avant
1516 Les gens l'ont consulté

1. API Java9 Reactive Stream

Java 9 fournit un ensemble d'interfaces qui définissent la programmation de flux réactif. Toutes ces interfaces sont définies dans la classe java.util.concurrent.Flow en tant qu'interfaces internes statiques. java.util.concurrent.Flow类里面。

Comment utiliser la nouvelle fonctionnalité de lAPI de programmation réactive Java9 Reactive Stream

下面是Java 响应式编程中的一些重要角色和概念,先简单理解一下

发布者(Publisher)是潜在的无限数量的有序数据元素的生产者。 它根据收到的需求(subscription)向当前订阅者发布一定数量的数据元素。

订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建立订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者可以根据自己的处理能力请求发布者发布数据元素的数量。

订阅令牌(subscription)表示订阅者与发布者之间建立的订阅关系。 当建立订阅关系后,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求数据元素的数量或取消订阅。

二、Java响应式编程四大接口

2.1.Subscriber Interface(订阅者订阅接口)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}
Copier après la connexion

onSubscribe:在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription订阅令牌对象通过此方法传递给订阅者。

onNext:下一个待处理的数据项的处理函数

onError:在发布者或订阅遇到不可恢复的错误时调用

onComplete:当没有订阅者调用(包括onNext()方法)发生时调用。

2.2.Subscription Interface (订阅令牌接口)

订阅令牌对象通过Subscriber.onSubscribe()方法传递

public static interface Subscription {    public void request(long n);    public void cancel();}
Copier après la connexion

request(long n)是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据。cancel()由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息。

2.3.Publisher Interface(发布者接口)

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}
Copier après la connexion

调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。

2.4.Processor Interface(处理器接口)

处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
Copier après la connexion

二、实战案例

现在我们要去实现上面的四个接口来完成响应式编程

Subscription Interface订阅令牌接口通常不需要我们自己编程去实现,我们只需要在知道request()方法和cancle()方法含义即可。

Publisher Interface发布者接口,Java 9 已经默认为我们提供了实现SubmissionPublisher,该实现类除了实现Publisher接口的方法外,提供了一个方法叫做submit()来完成消息数据的发送。

Subscriber Interface订阅者接口,通常需要我们自己去实现。因为在数据订阅接收之后,不同的业务有不同的处理逻辑。

Processor实际上是 Publisher Interface和Subscriber Interface的集合体,有需要数据类型转换及数据处理的需求才去实现这个接口

下面的例子实现的式字符串的数据消息订阅处理

实现订阅者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<String> {
  private Flow.Subscription subscription;  //订阅令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("订阅关系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}
Copier après la connexion

SubmissionPublisher消息发布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立订阅关系,可以有多个订阅者
      sb.submit("数据 1");  //发送消息1
      sb.submit("数据 2"); //发送消息2
      sb.submit("数据 3"); //发送消息3
      executor.shutdown();
  }
}
Copier après la connexion

控制台打印输出结果

订阅关系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 数据 1
item: 数据 2

请注意:即使发布者submit了3条数据,MySubscriber也仅收到了2条数据进行了处理。是因为我们在MySubscriber#onSubscribe()方法中使用了subscription.request(2);

Comment utiliser la nouvelle fonctionnalité de l'API de programmation réactive Java9 Reactive Stream 🎜🎜 Voici quelques rôles et concepts importants dans la programmation réactive Java. Comprenons-le brièvement d'abord. L'éditeur est le producteur d'un nombre potentiellement illimité d'éléments de données ordonnés. Il publie un certain nombre de données aux abonnés actuels en fonction de la demande reçue (abonnement). 🎜🎜L'abonné s'abonne et reçoit des éléments de données de l'éditeur. Après avoir établi une relation d'abonnement avec l'éditeur, l'éditeur envoie un jeton d'abonnement (abonnement) à l'abonné, et l'abonné peut demander le nombre d'éléments de données publiés par l'éditeur en fonction de ses propres capacités de traitement. 🎜🎜Le jeton d'abonnement (abonnement) représente la relation d'abonnement établie entre l'abonné et l'éditeur. Lorsqu'une relation d'abonnement est établie, l'éditeur la transmet à l'abonné. Les abonnés utilisent le jeton d'abonnement pour interagir avec l'éditeur, par exemple en demandant le nombre d'éléments de données ou en se désabonnant. 🎜🎜2. Les quatre interfaces principales de la programmation réactive Java🎜

2.1.Interface d'abonné

rrreee🎜onSubscribe : après que l'éditeur a accepté l'action d'abonnement de l'abonné, appelé avant de publier des messages d'abonnement . L'objet jeton d'abonnement Subscription nouvellement créé est transmis à l'abonné via cette méthode. 🎜🎜onNext : La fonction de traitement de la prochaine donnée à traiter 🎜🎜onError : Appelée lorsque l'éditeur ou l'abonnement rencontre une erreur irrécupérable🎜🎜 onComplete : appelé lorsqu'aucun appel d'abonné (y compris la méthode onNext()) ne se produit. 🎜<h4>2.2.Interface d'abonnement (interface de jeton d'abonnement)</h4>🎜L'objet jeton d'abonnement est transmis via la méthode <code>Subscriber.onSubscribe()🎜rrreee🎜request(long n) est la méthode clé derrière le concept de contre-pression non bloquante. Les abonnés l'utilisent pour demander plus de n éléments de consommation. De cette manière, l'abonné contrôle la quantité de données qu'il peut actuellement recevoir. cancel()L'abonné annule activement son abonnement. Après annulation, il ne recevra aucun message de données. 🎜

2.3.Interface de l'éditeur

rrreee🎜Appelez cette méthode pour établir la relation d'abonnement aux messages entre l'abonné et l'éditeur. 🎜

2.4.Interface du processeur

🎜Le processeur peut agir à la fois en tant qu'abonné et en tant qu'éditeur, et joue le rôle de conversion des éléments dans le pipeline éditeur-abonné. Utilisé pour recevoir et convertir des éléments de données de type T de l'éditeur en données de type R et les publier. 🎜rrreee🎜2. Cas pratique🎜🎜Nous devons maintenant implémenter les quatre interfaces ci-dessus pour terminer la programmation réactive🎜🎜Interface d'abonnement L'interface du jeton d'abonnement ne nous oblige généralement pas à la programmer nous-mêmes, nous juste Vous il suffit de connaître la signification de la méthode request() et de la méthode Cancel(). 🎜🎜Publisher InterfaceInterface Publisher, Java 9 nous a fourni l'implémentation de SubmissionPublisher par défaut En plus de la méthode d'implémentation de l'interface Publisher, cette classe d'implémentation fournit une méthode appelée submit() pour compléter. les données du message envoyées. 🎜🎜Interface d'abonné L'interface d'abonné doit généralement être implémentée par nous-mêmes. Parce qu'une fois l'abonnement aux données reçu, différentes entreprises ont une logique de traitement différente. 🎜🎜Processeur est en fait une collection d'interfaces d'éditeur et d'interface d'abonné. Cette interface nécessite une conversion de type de données et un traitement de données. 🎜🎜L'exemple suivant implémente un traitement d'abonnement de message de données de chaîne🎜

Implémenter l'interface d'abonné.

rrreee

Éditeur de messages SubmissionPublisher

rrreee🎜Résultats de sortie d'impression de la console🎜
🎜Établissement de la relation d'abonnement
onSubscribe : java util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item : Données 1
item : Données 2🎜
🎜Veuillez noter : même si l'éditeur a soumis 3 éléments de données, MySubscriber n'en a reçu que 2. Les données ont été traitées. C'est parce que nous avons utilisé subscription.request(2); dans la méthode MySubscriber#onSubscribe(). Il s'agit de l'effet de programmation réactive de la « contre-pression ». Autant de données que j'ai la capacité de traiter, j'informerai l'éditeur du message de la quantité de données à fournir. 🎜

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal