java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법
1. Java9 반응형 스트림 API
Java 9는 반응형 스트림 프로그래밍을 정의하는 인터페이스 세트를 제공합니다. 이러한 모든 인터페이스는 java.util.concurrent.Flow
클래스에 정적 내부 인터페이스로 정의되어 있습니다. java.util.concurrent.Flow
类里面。
下面是Java 响应式编程中的一些重要角色和概念,先简单理解一下
发布者(Publisher)是潜在的无限数量的有序数据元素的生产者。 它根据收到的需求(subscription)向当前订阅者发布一定数量的数据元素。
订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建立订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者可以根据自己的处理能力请求发布者发布数据元素的数量。
订阅令牌(subscription)表示订阅者与发布者之间建立的订阅关系。 当建立订阅关系后,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求数据元素的数量或取消订阅。
二、Java响应式编程四大接口
2.1.Subscriber Interface(订阅者订阅接口)
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
onSubscribe
:在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription
订阅令牌对象通过此方法传递给订阅者。
onNext
:下一个待处理的数据项的处理函数
onError
:在发布者或订阅遇到不可恢复的错误时调用
onComplete
:当没有订阅者调用(包括onNext()方法)发生时调用。
2.2.Subscription Interface (订阅令牌接口)
订阅令牌对象通过Subscriber.onSubscribe()
方法传递
public static interface Subscription { public void request(long n); public void cancel();}
request(long n)
是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据。cancel()
由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息。
2.3.Publisher Interface(发布者接口)
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。
2.4.Processor Interface(处理器接口)
处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
二、实战案例
现在我们要去实现上面的四个接口来完成响应式编程
Subscription Interface
订阅令牌接口通常不需要我们自己编程去实现,我们只需要在知道request()方法和cancle()方法含义即可。
Publisher Interface
发布者接口,Java 9 已经默认为我们提供了实现SubmissionPublisher,该实现类除了实现Publisher接口的方法外,提供了一个方法叫做submit()来完成消息数据的发送。
Subscriber Interface
订阅者接口,通常需要我们自己去实现。因为在数据订阅接收之后,不同的业务有不同的处理逻辑。
Processor
实际上是 Publisher Interface和Subscriber Interface的集合体,有需要数据类型转换及数据处理的需求才去实现这个接口
下面的例子实现的式字符串的数据消息订阅处理
实现订阅者Subscriber Interface
import java.util.concurrent.Flow; public class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription; //订阅令牌 @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("订阅关系建立onSubscribe: " + subscription); this.subscription = subscription; subscription.request(2); } @Override public void onNext(String item) { System.out.println("item: " + item); // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送 //subscription.request(n); } @Override public void onError(Throwable throwable) { System.out.println("onError: " + throwable); } @Override public void onComplete() { System.out.println("onComplete"); } }
SubmissionPublisher消息发布者
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; public class SubmissionPublisherExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); SubmissionPublisher<String> sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); sb.subscribe(new MySubscriber()); //建立订阅关系,可以有多个订阅者 sb.submit("数据 1"); //发送消息1 sb.submit("数据 2"); //发送消息2 sb.submit("数据 3"); //发送消息3 executor.shutdown(); } }
控制台打印输出结果
订阅关系建立
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 数据 1
item: 数据 2
请注意:即使发布者submit了3条数据,MySubscriber也仅收到了2条数据进行了处理。是因为我们在MySubscriber#onSubscribe()
方法中使用了subscription.request(2);

2.1.구독자 인터페이스
rrreee🎜onSubscribe
: 게시자가 구독자의 구독 작업을 수락한 후 구독 메시지를 게시하기 전에 호출됩니다. . 새로 생성된 구독
구독 토큰 개체는 이 메서드를 통해 구독자에게 전달됩니다. 🎜🎜onNext
: 처리할 다음 데이터 항목의 처리 함수 🎜🎜onError
: 게시자 또는 구독에서 복구할 수 없는 오류가 발생하면 호출됩니다🎜🎜 onComplete: 구독자 호출(onNext() 메서드 포함)이 발생하지 않을 때 호출됩니다. 🎜<h4 id="구독-인터페이스-구독-토큰-인터페이스">2.2.구독 인터페이스(구독 토큰 인터페이스)</h4>🎜구독 토큰 객체는 <code>Subscriber.onSubscribe()
메소드🎜rrreee🎜request(long n)를 통해 전달됩니다. 는 Non-Blocking BackPressure 개념의 핵심 방법입니다. 구독자는 n개 이상의 소비 아이템을 요청하는 데 사용됩니다. 이러한 방식으로 구독자는 현재 수신할 수 있는 데이터의 양을 제어합니다. <code>cancel()
구독자는 구독을 적극적으로 취소합니다. 취소 후에는 데이터 메시지가 수신되지 않습니다. 🎜2.3.게시자 인터페이스
rrreee🎜이 메서드를 호출하여 구독자와 게시자 간의 메시지 구독 관계를 설정합니다. 🎜2.4.프로세서 인터페이스
🎜프로세서는 구독자와 게시자의 역할을 모두 수행할 수 있으며 게시자-구독자 파이프라인에서 요소를 변환하는 역할을 합니다. 게시자로부터 T 유형의 데이터 요소를 수신하여 R 유형의 데이터로 변환하고 게시하는 데 사용됩니다. 🎜rrreee🎜2. 실제 사례🎜🎜이제 리액티브 프로그래밍을 완료하려면 위의 네 가지 인터페이스를 구현해야 합니다.🎜🎜구독 인터페이스
구독 토큰 인터페이스는 일반적으로 우리가 직접 프로그래밍할 필요가 없습니다. request() 메소드와 cancel() 메소드의 의미만 알면 됩니다. 🎜🎜Publisher 인터페이스
Publisher 인터페이스인 Java 9에서는 기본적으로 SubmissionPublisher 구현을 제공합니다. 게시자 인터페이스를 구현하는 방법 외에도 이 구현 클래스는 submit()이라는 메서드를 제공합니다. 메시지 데이터를 보냅니다. 🎜🎜구독자 인터페이스
구독자 인터페이스는 일반적으로 직접 구현해야 합니다. 데이터 구독이 수신된 후 기업마다 처리 논리가 다르기 때문입니다. 🎜🎜Processor
는 실제로 게시자 인터페이스와 구독자 인터페이스의 모음입니다. 이 인터페이스에는 데이터 유형 변환 및 데이터 처리가 필요합니다. 🎜🎜다음 예에서는 문자열 데이터 메시지 구독 처리를 구현합니다.🎜구독자 인터페이스 구현
rrreeeSubmissionPublisher 메시지 게시자
rrreee🎜콘솔 인쇄 출력 결과🎜🎜구독 관계 설정🎜참고: 게시자가 3개의 데이터를 제출해도 MySubscriber에서는 2개의 데이터만 처리되었습니다.
onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39
item: 데이터 1
item: 데이터 2🎜
MySubscriber#onSubscribe()
메서드에서 subscription.request(2);
를 사용했기 때문입니다. 이것이 '배압'의 반응형 프로그래밍 효과입니다. 내가 처리할 수 있는 데이터의 양만큼 메시지 게시자에게 얼마나 많은 데이터를 제공해야 하는지 알려줍니다. 🎜위 내용은 java9 Reactive Stream 반응형 프로그래밍 API의 새로운 기능을 사용하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

Video Face Swap
완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

인기 기사

뜨거운 도구

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











Java의 Weka 가이드. 여기에서는 소개, weka java 사용 방법, 플랫폼 유형 및 장점을 예제와 함께 설명합니다.

Java의 Smith Number 가이드. 여기서는 정의, Java에서 스미스 번호를 확인하는 방법에 대해 논의합니다. 코드 구현의 예.

이 기사에서는 가장 많이 묻는 Java Spring 면접 질문과 자세한 답변을 보관했습니다. 그래야 면접에 합격할 수 있습니다.

Java 8은 스트림 API를 소개하여 데이터 컬렉션을 처리하는 강력하고 표현적인 방법을 제공합니다. 그러나 스트림을 사용할 때 일반적인 질문은 다음과 같은 것입니다. 기존 루프는 조기 중단 또는 반환을 허용하지만 스트림의 Foreach 메소드는이 방법을 직접 지원하지 않습니다. 이 기사는 이유를 설명하고 스트림 처리 시스템에서 조기 종료를 구현하기위한 대체 방법을 탐색합니다. 추가 읽기 : Java Stream API 개선 스트림 foreach를 이해하십시오 Foreach 메소드는 스트림의 각 요소에서 하나의 작업을 수행하는 터미널 작동입니다. 디자인 의도입니다

Java의 TimeStamp to Date 안내. 여기서는 소개와 예제와 함께 Java에서 타임스탬프를 날짜로 변환하는 방법에 대해서도 설명합니다.

캡슐은 3 차원 기하학적 그림이며, 양쪽 끝에 실린더와 반구로 구성됩니다. 캡슐의 부피는 실린더의 부피와 양쪽 끝에 반구의 부피를 첨가하여 계산할 수 있습니다. 이 튜토리얼은 다른 방법을 사용하여 Java에서 주어진 캡슐의 부피를 계산하는 방법에 대해 논의합니다. 캡슐 볼륨 공식 캡슐 볼륨에 대한 공식은 다음과 같습니다. 캡슐 부피 = 원통형 볼륨 2 반구 볼륨 안에, R : 반구의 반경. H : 실린더의 높이 (반구 제외). 예 1 입력하다 반경 = 5 단위 높이 = 10 단위 산출 볼륨 = 1570.8 입방 단위 설명하다 공식을 사용하여 볼륨 계산 : 부피 = π × r2 × h (4

Java는 초보자와 숙련된 개발자 모두가 배울 수 있는 인기 있는 프로그래밍 언어입니다. 이 튜토리얼은 기본 개념부터 시작하여 고급 주제를 통해 진행됩니다. Java Development Kit를 설치한 후 간단한 "Hello, World!" 프로그램을 작성하여 프로그래밍을 연습할 수 있습니다. 코드를 이해한 후 명령 프롬프트를 사용하여 프로그램을 컴파일하고 실행하면 "Hello, World!"가 콘솔에 출력됩니다. Java를 배우면 프로그래밍 여정이 시작되고, 숙달이 깊어짐에 따라 더 복잡한 애플리케이션을 만들 수 있습니다.
