Tutoriel recommandé : "Tutoriel vidéo Java"
Qu'est-ce que la programmation réactive Java ?
la programmation réactive Java est
la programmation réactive
comme la première dans le sens de la programmation réactive En tant que première étape, Microsoft a créé la bibliothèque Rx (Reactive Extensions) dans l'écosystème .NET. RxJava est son implémentation sur la JVM.
La programmation réactive est un paradigme de programmation asynchrone qui apparaît souvent dans les langages orientés objet comme une extension du modèle Observer.
Il se concentre sur le flux de données et la propagation des changements. Cela signifie que les flux de données statiques (tels que les tableaux) ou dynamiques (tels que les émetteurs d'événements) peuvent facilement être représentés à l'aide de langages de programmation.
Reactive Streaming
Au fil du temps, une standardisation a émergé spécifiquement pour Java. Il s'agit d'une spécification qui définit certaines interfaces et règles d'interaction pour les bibliothèques réactives sur la plateforme JVM. Il s'agit de Reactive Streams, et ses interfaces ont été intégrées à Java 9, dans la classe parent java.util.concurrent.Flow. Les flux réactifs sont similaires aux itérateurs, mais les itérateurs sont basés sur le « pull », tandis que les flux réactifs sont basés sur le « push ». L'utilisation d'itérateurs est en fait une programmation impérative, car c'est au développeur de décider quand appeler next() pour obtenir l'élément suivant. En streaming réactif, l’équivalent de ce qui précède est un éditeur-abonné. Mais lorsque de nouveaux éléments sont disponibles, ils sont poussés vers les abonnés par l'éditeur. Ce « push » est la clé de la réactivité.
De plus, les opérations sur les éléments poussés sont également effectuées de manière déclarative. Le programmeur n'a qu'à exprimer ce qu'il doit faire, et n'a pas à se soucier de comment le faire.
L'éditeur utilise la méthode onNext pour envoyer de nouveaux éléments aux abonnés, la méthode onError pour notifier une erreur et la méthode onComplete pour informer qu'elle est terminée.
On peut voir que la gestion des erreurs et l'achèvement (fin) sont également gérés de manière positive. L'erreur et la fin peuvent mettre fin à la séquence.
Cette méthode est très flexible. Ce mode prend en charge les cas de 0 (aucun) élément / 1 élément / n (plusieurs) éléments (y compris les séquences infinies, si l'horloge tourne).
Reactor fait ses débuts
Reactor est la bibliothèque réactive de quatrième génération et une implémentation du paradigme de programmation réactive, utilisée pour s'appuyer sur la plate-forme JVM basée sur la spécification de flux réactif Non -blocage des applications asynchrones.
Il implémente grandement la spécification des flux réactifs sur la JVM (http://www.reactive-streams.org/).
C'est la pierre angulaire d'une programmation réactive totalement non bloquante avec une gestion efficace de la demande (sous forme de gestion de la « contre-pression »).
Il intègre directement l'API fonctionnelle Java, notamment CompletableFuture, Stream et Duration.
Il prend en charge l'utilisation du projet Reactor-Netty pour obtenir une communication inter-processus non bloquante, convient à l'architecture de microservices et prend en charge HTTP (y compris Websockets), TCP et UDP.
Remarque : Reactor nécessite Java 8+
Cela dit, devrions-nous d'abord réfléchir à la raison pour laquelle nous avons besoin d'une bibliothèque réactive aussi asynchrone ?
Le blocage est un gaspillage
Les applications modernes peuvent atteindre un très grand nombre d'utilisateurs simultanés. Même si les capacités du matériel moderne sont continuellement améliorées, les performances des logiciels modernes le sont. toujours un sujet de préoccupation clé
Il existe généralement deux façons d'améliorer les performances d'un programme :
1 La parallélisation, en utilisant plus de threads et plus de ressources matérielles
2 , Améliorez l'efficacité et recherchez une plus grande efficacité dans le cadre de l'utilisation actuelle des ressources
Habituellement, les développeurs Java utilisent du code de blocage pour écrire des programmes. Cela fonctionne bien jusqu'à ce que vous rencontriez un goulot d'étranglement en termes de performances.
À ce stade, des threads supplémentaires seront introduits pour exécuter un code de blocage similaire. Mais cette approche de mise à l’échelle peut être controversée et entraîner des problèmes de concurrence en termes d’utilisation des ressources.
Pire encore, le blocage gaspille des ressources. Si vous regardez attentivement, dès qu'un programme implique un certain retard (en particulier des E/S, comme une requête de base de données ou un appel réseau), les ressources sont gaspillées car le thread est désormais inactif, en attente de données.
La parallélisation n’est donc pas une solution miracle. Il est nécessaire de permettre au matériel d’exercer toute sa puissance, mais l’impact et les causes du gaspillage des ressources sont également très complexes.
L'asynchronicité à la rescousse
La deuxième voie évoquée plus haut est de rechercher une plus grande efficacité, ce qui peut être une solution au problème du gaspillage des ressources.
En écrivant du code asynchrone et non bloquant, vous pouvez basculer l'exécution vers d'autres tâches actives, en utilisant les mêmes ressources sous-jacentes, et revenir au processus en cours plus tard.
Mais comment générer du code asynchrone vers la JVM ? Java propose deux modèles de programmation asynchrone :
1. Les méthodes asynchrones n'ont pas de valeur de retour, mais apporteront un rappel lorsque le résultat sera disponible.
2. Futures, la méthode asynchrone renvoie immédiatement un Future
Les deux technologies sont-elles suffisamment performantes ? Cela n’est pas vrai dans toutes les situations et les deux méthodes ont leurs limites.
Les rappels sont difficiles à mettre en place et peuvent rapidement conduire à un code difficile à lire et à maintenir (le fameux « enfer des rappels »).
Les futures sont légèrement meilleurs que les rappels, mais ne font toujours pas un bon travail de composition. Combiner plusieurs objets Futures ensemble est possible mais pas facile.
Future
Il existe également d'autres problèmes. Il est facile de provoquer un autre blocage en raison de l'appel de la méthode get().
De plus, il ne prend pas en charge les calculs paresseux, ne prend pas en charge plusieurs valeurs et ne dispose pas de gestion avancée des erreurs.
De la programmation impérative à la programmation réactive
Le but des librairies réactives comme Reactor est de résoudre les défauts de l'approche asynchrone "traditionnelle" sur la JVM, tout en faisant également attention à Quelques aspects supplémentaires :
Composabilité et lisibilité.
Les données sous forme de flux, manipulées par de riches opérateurs, rien ne se passe jusqu'à ce que vous vous abonniez, post-presse, le consommateur informe le producteur que le taux d'émission est trop rapide, un niveau élevé plutôt qu'une abstraction numérique élevée.
Composabilité et lisibilité
La composabilité est en fait la capacité d'orchestrer plusieurs tâches asynchrones afin que les résultats de la tâche précédente puissent être utilisés comme entrée des tâches suivantes, ou exécuter plusieurs tâches de manière fork-join (fork-merge), ou réutiliser ces tâches asynchrones à un niveau supérieur.
La capacité à orchestrer des tâches est étroitement liée à la lisibilité et à la maintenabilité du code. À mesure que le nombre et la complexité des traitements asynchrones augmentent, il devient plus difficile de composer et de lire du code.
Comme nous pouvons le voir, bien que le modèle de rappel soit simple, lorsque les rappels sont imbriqués dans les rappels et atteignent plusieurs niveaux, cela deviendra un enfer de rappel.
Reactor fournit de riches options de combinaison pour minimiser le niveau d'imbrication, de sorte que la structure organisationnelle du code puisse refléter le type d'abstraction en cours de traitement et soit généralement conservée au même niveau.
Analogie de la chaîne d'assemblage
Vous pouvez penser à une application réactive traitant les données comme si elles passaient par une chaîne d'assemblage (production). Reactor est à la fois un tapis roulant et un poste de travail.
Les matières premières sont obtenues en continu à partir d'une source (éditeur original), pour finir par un produit fini poussé vers les consommateurs (abonnés).
Les matières premières peuvent subir de nombreuses transformations différentes, constituer d'autres étapes intermédiaires ou faire partie d'une chaîne d'assemblage plus vaste.
S'il y a un bug ou un blocage quelque part, le poste incriminé peut envoyer une notification en amont pour limiter le flux (débit) de matières premières.
Opérateurs
Dans Reactor, les opérateurs sont des postes de travail dans l'analogie de la chaîne d'assemblage. Chaque opérateur ajoute un comportement à un éditeur, en encapsulant l'éditeur précédent dans une nouvelle instance. C’est ainsi que toute la chaîne est liée.
Ainsi, les données sortent d'abord du premier éditeur, puis remontent la chaîne en aval, et sont transformées par chaque maillon. Enfin, un abonné met fin au processus.
La spécification du flux réactif ne stipule pas clairement les opérateurs, mais Reactor fournit un riche ensemble d'opérateurs, qui impliquent de nombreux aspects, de la simple conversion et filtrage à l'orchestration complexe et à la gestion des erreurs.
Tant que vous ne vous abonnez pas, rien ne se passe
Lorsque vous écrivez une chaîne d'éditeurs, par défaut, les données ne commenceront pas à entrer dans la chaîne. Au lieu de cela, vous créez simplement une description abstraite du traitement asynchrone.
En s'abonnant à ce comportement (action), les éditeurs et les abonnés sont connectés, puis les données sont déclenchées pour circuler dans la chaîne.
Ceci est implémenté en interne, se propageant en amont via le signal de demande de l'abonné, jusqu'en amont jusqu'à l'éditeur d'origine.
Fonctionnalités principales de Reactor
Reactor introduit des types réactifs composables, implémente l'interface éditeur, mais fournit également des opérateurs riches, à savoir Flux et Mono.
Flux
, fluide, représentant 0 à N éléments.
Mono
, unique, représentant 0 ou 1 élément.
La différence entre eux est principalement sémantique, représentant une base approximative pour le traitement asynchrone.
Si une requête http ne produit qu'une seule réponse, il est évidemment plus significatif de l'exprimer sous la forme Mono
peut modifier la cardinalité maximale du traitement et passera également aux types associés. Par exemple, bien que l'opérateur de comptage existe sur Flux
Flux
Un Flux
Tout comme dans la spécification de streaming réactif, ces trois types de signaux sont convertis en appels aux méthodes onNext, onComplete et onError d'un abonné en aval.
Ces trois méthodes peuvent également être comprises comme des événements/rappels, et elles sont toutes facultatives.
S'il n'y a pas de onNext mais qu'il y a onComplete, cela représente une séquence finie vide. Il n'y a ni onNext ni onComplete, indiquant une séquence infinie vide (sans utilité pratique et pouvant être utilisée pour des tests).
La séquence infinie n'a pas besoin d'être vide. Par exemple, Flux.interval(Duration) génère un Flux
Mono
Un Mono
Les opérateurs qu'il fournit ne sont qu'un sous-ensemble de ceux fournis par Flux. De même, certains opérateurs (comme la combinaison de Mono avec Publisher) peuvent le faire basculer vers un Flux.
Par exemple, Mono#concatWith(Publisher) renvoie un Flux, mais Mono#then(Mono) renvoie un autre Mono.
Mono peut être utilisé pour représenter un traitement asynchrone sans valeur de retour (similaire à Runnable), représenté par Mono
Créez un Flux ou un Mono, et abonnez-vous.
Le moyen le plus simple est d'utiliser leurs méthodes d'usine respectives :
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo");
En ce qui concerne les abonnements, vous pouvez utiliser Java 8 Il existe de nombreuses variantes d'expressions lambda, de méthodes d'abonnement, avec différents rappels.
Ce qui suit est la signature de la méthode :
//订阅并触发序列 subscribe(); //可以对每一个产生的值进行处理 subscribe(Consumer<? super T> consumer); //还可以响应一个错误 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //还可以在成功结束后执行一些代码 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //还可以对Subscription执行一些操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
Se désabonner à l'aide de jetable
Ces méthodes d'abonnement basées sur lambda renvoient toutes un type jetable, en appelant son dispose() pour annuler cet abonnement.
Pour Flux et Mono, l'annulation est un signal indiquant que la source doit cesser de produire des éléments. Cependant, l'effet immédiat n'est pas garanti et certaines sources peuvent produire des éléments si rapidement qu'ils sont terminés avant la réception d'un signal d'annulation.
Articles connexes recommandés : "Tutoriel de développement Java"
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!