Heim > Java > javaLernprogramm > Java-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung

Java-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung

WBOY
Freigeben: 2023-09-22 08:30:26
Original
1001 Leute haben es durchsucht

Java开发:如何使用Akka Streams进行流处理和数据传输

Java-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung

Einführung:
Mit der rasanten Entwicklung von Big Data und Echtzeit-Datenverarbeitung steigt die Nachfrage nach Stream-Verarbeitung und Datenübertragung weiter. In der Java-Entwicklung ist Akka Streams eine leistungsstarke Bibliothek, die die Implementierung der Stream-Verarbeitung und Datenübertragung vereinfacht. In diesem Artikel werden die grundlegenden Konzepte und die Verwendung von Akka Streams vorgestellt und detaillierte Codebeispiele bereitgestellt.

1. Übersicht über Akka Streams:
1.1 Was ist Akka Streams:
Akka Streams ist Teil des Akka-Frameworks und bietet ein asynchrones, zusammensetzbares und überwachbares Stream-Verarbeitungsmodell. Es verwendet einen Gegendruckmechanismus, um inkonsistente Geschwindigkeiten von Datenströmen zu bewältigen. Akka Streams ist hoch skalierbar und flexibel und kann problemlos große Datenströme verarbeiten.

1.2 Grundkonzepte:

  • Quelle: die Quelle des Datenflusses, die eine Datei, eine Datenbank, eine Netzwerkverbindung usw. sein kann. Eine Quelle kann null oder mehr Datenelemente ausgeben.
  • Flow: Komponenten, die Datenflüsse betreiben und transformieren, z. B. Filterung, Zuordnung, Aggregation usw. Flow kann ein oder mehrere Datenelemente empfangen und ein oder mehrere Datenelemente ausgeben.
  • Senke: Der Endpunkt des Datenflusses, der eine Datei, eine Datenbank, eine Netzwerkverbindung usw. sein kann. Der Endpunkt empfängt die von Flow verarbeiteten Daten und verarbeitet sie.

2. Verwendung von Akka Streams:
2.1 Einführung von Abhängigkeiten:
Zuerst müssen wir die Abhängigkeiten von Akka Streams in das Java-Projekt einführen. Fügen Sie die folgenden Abhängigkeiten in der pom.xml-Datei hinzu:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.12</artifactId>
    <version>2.6.17</version>
</dependency>
Nach dem Login kopieren

2.2 Implementieren Sie eine einfache Stream-Verarbeitung:
Im Folgenden zeigen wir anhand eines einfachen Beispiels, wie Sie Akka Streams für die Stream-Verarbeitung verwenden.

Erstellen Sie zunächst eine Datenquelle mit Ganzzahlen:

Source<Integer, NotUsed> source = Source.range(1, 10);
Nach dem Login kopieren

Erstellen Sie dann einen Fluss, der die Quelldaten mit 2 multipliziert:

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(i -> i * 2);
Nach dem Login kopieren

Als nächstes erstellen Sie eine Senke, um die streamverarbeiteten Daten zu empfangen:

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);
Nach dem Login kopieren

Platzieren Sie die Quelle. Flow und Sink werden kombiniert, um eine vollständige Stream-Verarbeitung zu erstellen:

RunnableGraph<NotUsed> runnableGraph = source.via(flow).to(sink);
Nach dem Login kopieren

Schließlich führen Sie die Stream-Verarbeitung aus:

CompletionStage<NotUsed> completionStage = runnableGraph.run(materializer);
Nach dem Login kopieren

Im obigen Code verwenden wir verschiedene von Akka Streams bereitgestellte Komponenten, um eine einfache Stream-Verarbeitung zu implementieren, einschließlich Datenquellen, Flow und Waschbecken. Durch die Verbindung dieser Komponenten können wir einen vollständigen Stream-Verarbeitungsprozess definieren und ausführen.

2.3 Datenübertragung implementieren:
Neben der Stream-Verarbeitung können Akka Streams auch zur Datenübertragung verwendet werden. Im Folgenden nehmen wir die TCP-Übertragung als Beispiel, um zu demonstrieren, wie Akka Streams für die Datenübertragung verwendet wird.

Erstellen Sie zunächst eine serverseitige Stream-Verarbeitung:

final Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);
Nach dem Login kopieren

Dann starten Sie den Server:

final Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> serverSource =
    Tcp().bind("localhost", 8888);

final Flow<Tcp.IncomingConnection, Tcp.IncomingConnection, NotUsed> handler = Flow.<Tcp.IncomingConnection>create()
    .mapAsync(1, connection -> {
        connection.handleWith(serverFlow, materializer);
        return CompletableFuture.completedFuture(connection);
    });

final CompletionStage<Tcp.ServerBinding> binding =
    serverSource.via(handler).to(Sink.ignore()).run(materializer);
Nach dem Login kopieren

Als nächstes erstellen Sie eine clientseitige Stream-Verarbeitung:

final Sink<ByteString, CompletionStage<Done>> clientSink = Sink.ignore();

final Flow<String, ByteString, CompletionStage<OutgoingConnection>> connectionFlow =
    Tcp().outgoingConnection("localhost", 8888);

final Flow<ByteString, ByteString, CompletionStage<Done>> clientFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

final Flow<String, ByteString, CompletionStage<Tcp.OutgoingConnection>> flow =
    Flow.fromSinkAndSourceMat(clientSink, clientFlow, Keep.right());

CompletableFuture<Tcp.OutgoingConnection> connection =
    Source.single("data").viaMat(connectionFlow, Keep.right()).toMat(flow, Keep.left()).run(materializer);
Nach dem Login kopieren

Mit dem obigen Code erstellen wir eine serverseitige Stream-Verarbeitung und eine Client-Stream-Verarbeitung am Ende und Datenübertragung über TCP. Bei der serverseitigen Stream-Verarbeitung verarbeiten wir den empfangenen String und senden ihn an den Client. Bei der clientseitigen Stream-Verarbeitung verarbeiten wir den empfangenen String und senden ihn an den Server.

Zusammenfassung:
Dieser Artikel stellt die grundlegenden Konzepte und die Verwendung von Akka Streams vor und bietet detaillierte Codebeispiele. Durch Akka Streams können wir Stream-Verarbeitung und Datenübertragung einfach implementieren und so die Effizienz und Leistung der Datenverarbeitung verbessern. Ich hoffe, dieser Artikel hilft Ihnen dabei, Akka Streams für die Stream-Verarbeitung und Datenübertragung in der Java-Entwicklung zu verwenden.

Das obige ist der detaillierte Inhalt vonJava-Entwicklung: So verwenden Sie Akka Streams für die Stream-Verarbeitung und Datenübertragung. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage