Empfohlenes Tutorial: „Java-Video-Tutorial“
Was ist reaktive Java-Programmierung?
Java Reaktive Programmierung ist
Reaktive Programmierung
als erster in Richtung reaktiver Programmierung Im ersten Schritt erstellte Microsoft die Rx-Bibliothek (Reactive Extensions) im .NET-Ökosystem. RxJava ist seine Implementierung auf der JVM.
Reaktive Programmierung ist ein asynchrones Programmierparadigma, das in objektorientierten Sprachen häufig als Erweiterung des Observer-Musters erscheint.
Es konzentriert sich auf den Datenfluss und die Verbreitung von Änderungen. Dies bedeutet, dass statische (z. B. Arrays) oder dynamische (z. B. Ereignisemitter) Datenflüsse problemlos mithilfe von Programmiersprachen dargestellt werden können.
Reaktives Streaming
Im Laufe der Zeit entstand eine Standardisierung speziell für Java. Dabei handelt es sich um eine Spezifikation, die einige Schnittstellen und Interaktionsregeln für reaktive Bibliotheken auf der JVM-Plattform definiert. Es handelt sich um Reactive Streams, und seine Schnittstellen wurden in Java 9 in die übergeordnete Klasse java.util.concurrent.Flow integriert. Reaktive Streams ähneln Iteratoren, aber Iteratoren basieren auf „Pull“, während reaktive Streams auf „Push“ basieren. Die Verwendung von Iteratoren ist eigentlich eine zwingende Programmierung, da es Sache des Entwicklers ist, zu entscheiden, wann next() aufgerufen wird, um das nächste Element abzurufen. Beim reaktiven Streaming ist das Äquivalent des oben Gesagten ein Herausgeber-Abonnent. Wenn jedoch neue Elemente verfügbar sind, werden sie vom Herausgeber an die Abonnenten weitergegeben. Dieser „Push“ ist der Schlüssel zur Reaktionsfähigkeit.
Darüber hinaus werden die Operationen an den gepushten Elementen auch deklarativ ausgeführt. Der Programmierer muss nur angeben, was zu tun ist, und muss sich nicht darum kümmern, wie er es tun soll.
Der Herausgeber verwendet die onNext-Methode, um neue Elemente an Abonnenten zu übertragen, die onError-Methode, um einen Fehler zu benachrichtigen, und die onComplete-Methode, um zu informieren, dass der Fehler beendet wurde.
Es ist ersichtlich, dass auch die Fehlerbehandlung und der Abschluss (Ende) gut gehandhabt werden. Sowohl Fehler als auch Ende können die Sequenz beenden.
Diese Methode ist sehr flexibel. Dieser Modus unterstützt die Fälle von 0 (keine) Elemente / 1 Element / n (viele) Elemente (einschließlich unendlicher Sequenzen, wenn die Uhr tickt).
Reactor-Debüts
Reactor ist die reaktive Bibliothek der vierten Generation und eine Implementierung des reaktiven Programmierparadigmas, die zum Aufbau auf der JVM-Plattform basierend auf der reaktiven Flussspezifikation Non verwendet wird -Blockierung asynchroner Anwendungen.
Es implementiert die Spezifikation reaktiver Streams auf der JVM (http://www.reactive-streams.org/) weitgehend.
Es ist ein Eckpfeiler einer vollständig nicht blockierenden reaktiven Programmierung mit effizientem Nachfragemanagement (in Form der Verwaltung von „Gegendruck“).
Es integriert direkt die Java-Funktions-API, insbesondere CompletableFuture, Stream und Duration.
Es unterstützt die Verwendung des Reactor-Netty-Projekts, um eine nicht blockierende prozessübergreifende Kommunikation zu erreichen, ist für die Microservice-Architektur geeignet und unterstützt HTTP (einschließlich Websockets), TCP und UDP.
Hinweis: Reactor erfordert Java 8+
Nachdem wir so viel gesagt haben, sollten wir zuerst darüber nachdenken, warum wir eine solche asynchrone reaktive Bibliothek benötigen?
Blockieren ist Verschwendung
Moderne Anwendungen können eine sehr große Anzahl gleichzeitiger Benutzer erreichen. Auch wenn die Fähigkeiten moderner Hardware kontinuierlich verbessert werden, gilt dies auch für die Leistung moderner Software immer noch ein wichtiger Punkt, der Anlass zur Sorge gibt
Es gibt im Allgemeinen zwei Möglichkeiten, die Leistung eines Programms zu verbessern:
1. Parallelisierung, Verwendung von mehr Threads und mehr Hardwareressourcen
2 , Verbessern Sie die Effizienz und streben Sie eine höhere Effizienz bei der aktuellen Ressourcennutzung an
Normalerweise verwenden Java-Entwickler Blockierungscode, um Programme zu schreiben. Dies funktioniert in der Praxis gut, bis ein Leistungsengpass auftritt.
Zu diesem Zeitpunkt werden zusätzliche Threads eingeführt, um ähnlichen Blockierungscode auszuführen. Dieser Skalierungsansatz kann jedoch umstritten sein und zu Parallelitätsproblemen bei der Ressourcennutzung führen.
Noch schlimmer: Blockieren verschwendet Ressourcen. Wenn Sie genau hinschauen, werden Ressourcen verschwendet, sobald ein Programm eine gewisse Verzögerung aufweist (insbesondere E/A, wie eine Datenbankanforderung oder ein Netzwerkaufruf), da der Thread jetzt im Leerlauf ist und auf Daten wartet.
Parallelisierung ist also kein Allheilmittel. Es ist notwendig, dass die Hardware ihre volle Leistung entfalten kann, aber auch die Auswirkungen und Ursachen der Ressourcenverschwendung sind sehr kompliziert.
Asynchronität zur Rettung
Der zweite zuvor erwähnte Weg besteht darin, nach höherer Effizienz zu streben, was eine Lösung für das Problem der Ressourcenverschwendung sein kann.
Durch das Schreiben von asynchronem, nicht blockierendem Code können Sie die Ausführung auf andere aktive Aufgaben verlagern, die gleichen zugrunde liegenden Ressourcen nutzen und später zum aktuellen Prozess zurückkehren.
Aber wie generiert man asynchronen Code für die JVM? Java bietet zwei asynchrone Programmiermodelle:
1. Asynchrone Methoden haben keinen Rückgabewert, aber der Rückruf wird aufgerufen, wenn das Ergebnis verfügbar ist.
2. Futures, die asynchrone Methode gibt sofort einen Future
Sind beide Technologien gut genug? Es ist nicht für jede Situation perfekt und beide Methoden haben Einschränkungen.
Callbacks sind schwer zusammenzustellen und können schnell zu Code führen, der schwer zu lesen und zu warten ist (die bekannte „Callback-Hölle“).
Futures sind etwas besser als Callbacks, leisten aber immer noch keine gute Kompositionsarbeit. Das Kombinieren mehrerer Futures-Objekte ist möglich, aber nicht einfach.
Future
Es gibt auch andere Probleme, die durch den Aufruf der get()-Methode verursacht werden können.
Darüber hinaus unterstützt es keine verzögerten Berechnungen, keine Unterstützung für mehrere Werte und keine erweiterte Fehlerbehandlung.
Von der imperativen zur reaktiven Programmierung
Das Ziel reaktiver Bibliotheken wie Reactor besteht darin, die Mängel des „traditionellen“ asynchronen Ansatzes auf der JVM zu beheben und gleichzeitig darauf zu achten zu einigen zusätzlichen Aspekten:
Zusammensetzbarkeit und Lesbarkeit.
Daten als Stream, manipuliert von reichen Betreibern, nichts passiert, bis Sie ein Abonnement abschließen, den Druck weiterverarbeiten und der Verbraucher den Produzenten darüber informiert, dass die Emissionsrate zu hoch ist, hohes Niveau statt hoher numerischer Abstraktion.
Zusammensetzbarkeit und Lesbarkeit
Zusammensetzbarkeit ist eigentlich die Fähigkeit, mehrere asynchrone Aufgaben so zu orchestrieren, dass die Ergebnisse der vorherigen Aufgabe als Eingabe für nachfolgende Aufgaben verwendet werden können, oder Führen Sie mehrere Aufgaben im Fork-Join-Verfahren (Fork-Merge) aus oder verwenden Sie diese asynchronen Aufgaben auf einer höheren Ebene wieder.
Die Fähigkeit, Aufgaben zu orchestrieren, ist eng mit der Lesbarkeit und Wartbarkeit des Codes verbunden. Da die Anzahl und Komplexität der asynchronen Verarbeitung zunimmt, wird es immer schwieriger, Code zu verfassen und zu lesen.
Wie wir sehen können, ist das Callback-Modell zwar einfach, aber wenn Callbacks in Callbacks verschachtelt sind und mehrere Ebenen erreichen, wird es zur Callback-Hölle.
Reactor bietet umfangreiche Kombinationsmöglichkeiten, um die Verschachtelungsebene zu minimieren, sodass die Organisationsstruktur des Codes widerspiegeln kann, welche Art von Abstraktion verarbeitet wird, und normalerweise auf derselben Ebene gehalten wird.
Analogie zum Fließband
Sie können sich eine reaktive Anwendung vorstellen, die Daten so verarbeitet, als ob sie ein Fließband (Produktionsband) durchlaufen würden. Der Reaktor ist sowohl ein Förderband als auch eine Arbeitsstation.
Rohstoffe werden kontinuierlich von einer Quelle (Originalverleger) bezogen und schließlich wird ein fertiges Produkt an Verbraucher (Abonnenten) weitergegeben.
Rohstoffe können viele verschiedene Umwandlungen durchlaufen, andere Zwischenschritte sein oder Teil einer größeren Montagelinie sein.
Wenn irgendwo ein Fehler oder eine Blockade vorliegt, kann die betroffene Arbeitsstation eine Benachrichtigung vorgelagert senden, um den Fluss (die Rate) der Rohstoffe zu begrenzen.
Bediener
In Reactor sind Bediener Arbeitsstationen in der Analogie zum Fließband. Jeder Operator fügt einem Herausgeber ein Verhalten hinzu, indem er den vorherigen Herausgeber in eine neue Instanz einschließt. So ist die gesamte Kette verknüpft.
Die Daten kommen also zunächst vom ersten Herausgeber, bewegen sich dann entlang der Kette nach unten und werden von jedem Glied transformiert. Schließlich beendet ein Abonnent den Vorgang.
Die reaktive Stream-Spezifikation legt die Operatoren nicht klar fest, aber Reactor bietet einen umfangreichen Satz an Operatoren, die viele Aspekte umfassen, von einfacher Konvertierung und Filterung bis hin zu komplexer Orchestrierung und Fehlerbehandlung.
Solange Sie sich nicht anmelden, passiert nichts
Wenn Sie eine Verlagskette schreiben, werden standardmäßig keine Daten in die Kette eingegeben. Stattdessen erstellen Sie lediglich eine abstrakte Beschreibung der asynchronen Verarbeitung.
Durch das Abonnieren dieses Verhaltens (Aktion) werden Herausgeber und Abonnenten verbunden, und dann wird der Datenfluss in der Kette ausgelöst.
Dies wird intern implementiert und verbreitet sich stromaufwärts über das Anforderungssignal vom Abonnenten bis hin zum ursprünglichen Herausgeber.
Kernfunktionen von Reactor
Reactor führt zusammensetzbare reaktive Typen ein und implementiert die Publisher-Schnittstelle, bietet aber auch umfangreiche Operatoren, nämlich Flux und Mono.
Flux
, fließend, repräsentiert 0 bis N Elemente.
Mono
, einzeln, repräsentiert 0 oder 1 Element.
Der Unterschied zwischen ihnen ist hauptsächlich semantischer Natur und stellt eine grobe Grundlage für die asynchrone Verarbeitung dar.
Wenn eine HTTP-Anfrage nur eine Antwort erzeugt, ist es offensichtlich sinnvoller, sie als Mono
-Operator kann die maximale Kardinalität der Verarbeitung ändern und wechselt auch zu verwandten Typen. Obwohl der Zähloperator beispielsweise auf Flux
Flux
Ein Flux
Genau wie in der reaktiven Streaming-Spezifikation werden diese drei Arten von Signalen in Aufrufe an die Methoden onNext, onComplete und onError eines Downstream-Abonnenten umgewandelt.
Diese drei Methoden können auch als Ereignisse/Rückrufe verstanden werden und sind alle optional.
Wenn es kein onNext, aber onComplete gibt, stellt es eine leere endliche Sequenz dar. Es gibt weder onNext noch onComplete, was auf eine leere unendliche Sequenz hinweist (ohne praktischen Nutzen und kann zum Testen verwendet werden).
Die unendliche Sequenz muss nicht leer sein. Beispielsweise generiert Flux.interval(Duration) einen Flux
Mono
Ein Mono
Die von ihm bereitgestellten Operatoren sind nur eine Teilmenge der von Flux bereitgestellten Operatoren. Ebenso können einige Operatoren (z. B. die Kombination von Mono mit Publisher) zu einem Flux wechseln.
Zum Beispiel gibt Mono#concatWith(Publisher) einen Flux zurück, aber Mono#then(Mono) gibt ein anderes Mono zurück.
Mono kann verwendet werden, um eine asynchrone Verarbeitung ohne Rückgabewert darzustellen (ähnlich wie Runnable), dargestellt durch Mono
Erstellen Sie ein Flux oder Mono und abonnieren Sie sie
Am einfachsten ist es, die jeweiligen Factory-Methoden zu verwenden:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo");
Wenn es um Abonnements geht, können Sie Java verwenden 8er Es gibt viele verschiedene Variationen von Lambda-Ausdrücken und Abonnementmethoden mit unterschiedlichen Rückrufen.
Das Folgende ist die Methodensignatur:
//订阅并触发序列 subscribe(); //可以对每一个产生的值进行处理 subscribe(Consumer<? super T> consumer); //还可以响应一个错误 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); //还可以在成功结束后执行一些代码 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); //还可以对Subscription执行一些操作 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
Abbestellen mit „Disposable“
Diese Lambda-basierten Abonnementmethoden geben alle einen Disposible-Typ zurück, indem sie dessen aufrufen dispose(), um dieses Abonnement zu kündigen.
Bei Flux und Mono ist die Löschung ein Signal dafür, dass die Quelle die Produktion von Elementen einstellen sollte. Eine sofortige Wirkung kann jedoch nicht garantiert werden, und einige Quellen produzieren Elemente möglicherweise so schnell, dass sie fertig sind, bevor ein Abbruchsignal empfangen wird.
Empfohlene verwandte Artikel: „Java-Entwicklungs-Tutorial“
Das obige ist der detaillierte Inhalt vonWas ist reaktive Java-Programmierung?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!