Rumah > Java > javaTutorial > Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data

Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data

WBOY
Lepaskan: 2023-09-22 08:30:26
asal
972 orang telah melayarinya

Java开发:如何使用Akka Streams进行流处理和数据传输

Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data

Pengenalan:
Dengan data besar dan sebenar- data masa Dengan perkembangan pesat pemprosesan, permintaan untuk pemprosesan aliran dan penghantaran data semakin meningkat. Dalam pembangunan Java, Akka Streams ialah perpustakaan berkuasa yang memudahkan pelaksanaan pemprosesan aliran dan pemindahan data. Artikel ini akan memperkenalkan konsep asas dan penggunaan Akka Streams, dan memberikan contoh kod terperinci.

1. Gambaran keseluruhan Aliran Akka:
1.1 Apakah Aliran Akka:
Aliran Akka ialah sebahagian daripada rangka kerja Akka dan menyediakan model Proses tak segerak, boleh digubah dan boleh dipantau. Ia menggunakan mekanisme tekanan belakang untuk mengendalikan kelajuan aliran data yang tidak konsisten. Akka Streams sangat berskala dan fleksibel serta boleh mengendalikan aliran data berskala besar dengan mudah.

1.2 Konsep asas:

  • Sumber: Sumber aliran data, yang boleh menjadi fail, pangkalan data, sambungan rangkaian, dsb. Sumber boleh mengeluarkan sifar atau lebih elemen data.
  • Aliran: Komponen yang mengendalikan dan mengubah aliran data, seperti penapisan, pemetaan, pengagregatan, dsb. Aliran boleh menerima satu atau lebih elemen data dan mengeluarkan satu atau lebih elemen data.
  • Sink: Titik akhir aliran data, yang boleh menjadi fail, pangkalan data, sambungan rangkaian, dsb. Titik akhir menerima data yang diproses oleh Flow dan memprosesnya.

2. Penggunaan Aliran Akka:
2.1 Memperkenalkan kebergantungan:
Pertama, kita perlu memperkenalkan kebergantungan Aliran Akka ke dalam projek Java. Tambahkan kebergantungan berikut dalam fail pom.xml:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.12</artifactId>
    <version>2.6.17</version>
</dependency>
Salin selepas log masuk

2.2 Laksanakan pemprosesan strim mudah:
Di bawah kami menggunakan contoh mudah untuk menunjukkan cara menggunakan Strim Akka untuk pemprosesan strim.

Pertama, cipta sumber data yang mengandungi integer:

Source<Integer, NotUsed> source = Source.range(1, 10);
Salin selepas log masuk

Kemudian, buat Aliran yang mendarabkan data sumber dengan 2:

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(i -> i * 2);
Salin selepas log masuk
#🎜Teruskan Seterusnya🎜 , cipta Sink untuk menerima data yang diproses strim:

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);
Salin selepas log masuk

Gabungkan Sumber, Aliran dan Sink untuk membina pemprosesan strim lengkap:

RunnableGraph<NotUsed> runnableGraph = source.via(flow).to(sink);
Salin selepas log masuk

Akhir sekali, jalankan pemprosesan Strim: #🎜 🎜#
CompletionStage<NotUsed> completionStage = runnableGraph.run(materializer);
Salin selepas log masuk

Dalam kod di atas, kami menggunakan komponen berbeza yang disediakan oleh Akka Streams untuk melaksanakan pemprosesan strim mudah, termasuk sumber data, Aliran dan Sink. Dengan menyambungkan komponen ini, kami boleh menentukan dan menjalankan proses pemprosesan strim yang lengkap.

2.3 Laksanakan penghantaran data:

Selain pemprosesan strim, Akka Streams juga boleh digunakan untuk penghantaran data. Di bawah ini kami mengambil penghantaran TCP sebagai contoh untuk menunjukkan cara menggunakan Akka Streams untuk penghantaran data.


Pertama, buat pemprosesan strim sebelah pelayan:

final Flow<ByteString, ByteString, NotUsed> serverFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);
Salin selepas log masuk

Kemudian, mulakan pelayan:

final Source<Tcp.IncomingConnection, CompletionStage<Tcp.ServerBinding>> serverSource =
    Tcp().bind("localhost", 8888);

final Flow<Tcp.IncomingConnection, Tcp.IncomingConnection, NotUsed> handler = Flow.<Tcp.IncomingConnection>create()
    .mapAsync(1, connection -> {
        connection.handleWith(serverFlow, materializer);
        return CompletableFuture.completedFuture(connection);
    });

final CompletionStage<Tcp.ServerBinding> binding =
    serverSource.via(handler).to(Sink.ignore()).run(materializer);
Salin selepas log masuk

Seterusnya, buat strim sebelah pelanggan pemprosesan:

final Sink<ByteString, CompletionStage<Done>> clientSink = Sink.ignore();

final Flow<String, ByteString, CompletionStage<OutgoingConnection>> connectionFlow =
    Tcp().outgoingConnection("localhost", 8888);

final Flow<ByteString, ByteString, CompletionStage<Done>> clientFlow = Flow.of(ByteString.class)
    .via(Tcp().delimiter(ByteString.fromString("
"), 256, true))
    .map(ByteString::utf8String)
    .map(s -> s + " processed")
    .map(ByteString::fromString);

final Flow<String, ByteString, CompletionStage<Tcp.OutgoingConnection>> flow =
    Flow.fromSinkAndSourceMat(clientSink, clientFlow, Keep.right());

CompletableFuture<Tcp.OutgoingConnection> connection =
    Source.single("data").viaMat(connectionFlow, Keep.right()).toMat(flow, Keep.left()).run(materializer);
Salin selepas log masuk

Melalui kod di atas, kami mencipta pemprosesan strim sebelah pelayan dan pemprosesan strim sebelah klien, dan menghantar data melalui TCP. Dalam pemprosesan strim sebelah pelayan, kami memproses rentetan yang diterima dan menghantarnya kepada pelanggan. Dalam pemprosesan strim sebelah pelanggan, kami memproses rentetan yang diterima dan menghantarnya ke pelayan.

Ringkasan:

Artikel ini memperkenalkan konsep asas dan penggunaan Akka Streams, dan menyediakan contoh kod terperinci. Melalui Akka Streams, kami boleh melaksanakan pemprosesan strim dan penghantaran data dengan mudah, meningkatkan kecekapan dan prestasi pemprosesan data. Saya harap artikel ini akan membantu anda menggunakan Akka Streams untuk pemprosesan strim dan penghantaran data dalam pembangunan Java.

Atas ialah kandungan terperinci Pembangunan Java: Cara menggunakan Akka Streams untuk pemprosesan strim dan pemindahan data. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:php.cn
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan