이 튜토리얼은 RxJava1을 기반으로 합니다. >8. 연결 가능한 Observable이전 섹션에서는 Observable, Observer 및 Subscription의 세 가지 세부 사항을 언급했습니다. 다음 섹션에서는 구독에 대한 다른 지식 포인트를 계속해서 이해합니다.
여기에서는 ConnectableObservable이라는 연결 가능한 Observable을 소개합니다. 연결 가능한 Observable은 다음을 제외하고 일반 Observable과 같습니다. 연결 가능한 Observable은 구독할 때 데이터 방출을 시작하지 않고 connect()가 호출될 때만 데이터를 방출하기 시작합니다. 이런 방식으로 데이터 방출을 시작하기 전에 모든 잠재적 구독자가 Observable을 구독할 때까지 기다릴 수 있습니다.
이 연산자는 일반 Observable을 연결 가능한 Observable로 변환할 수 있습니다.
연결 가능한 Observable(연결 가능한 Observable)은 일반 Observable과 유사하지만 구독 시 데이터 방출을 시작하지 않지만 Connect 연산자를 사용할 때까지 시작되지 않습니다. 이런 방식으로 Observable이 언제든지 데이터를 방출하기 시작하도록 할 수 있습니다.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
일반적인 Observable 객체 구독과 달리 위 코드에서는 Action1 객체의 call() 메서드가 직접 호출되지 않습니다.
Connect
연결 가능한 Observable(연결 가능한 Observable)은 일반 Observable과 유사하지만 구독할 때 데이터를 방출하기 시작하지 않지만 Connect 연산자가 사용될 때까지는 데이터를 방출하지 않습니다. . 이 방법을 사용하면 데이터 방출을 시작하기 전에 모든 관찰자가 Observable을 구독할 때까지 기다릴 수 있습니다.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } }); //当调用可连接的被观察者connect()方法后 开始发送所有的数据。 connectableObservable.connect(new Action1() { @Override public void call(Subscription subscription) { Log.i(TAG, "call: "+subscription); } });
출력:
IT520: call: OperatorPublish$PublishSubscriber@20dce78 IT520: call: 1 IT520: call: 2 IT520: call: 3
RefCount 연산자는 연결 가능한 Observable을 일반 Observable로 변환할 수 있습니다.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //将一个可链接的被观察者转换成一个普通观察者 Observable<Integer> integerObservable = connectableObservable.refCount(); //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3... integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Replay
먼저 예를 살펴보겠습니다.
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); ConnectableObservable<Integer> connectableObservable =observable.publish(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3
먼저 게시()를 통해 일반 Observable을 ConnectableObservable로 변환합니다. connect()가 호출되면 connect() 위에 구독된 관찰자는 데이터를 수신하게 됩니다. connect() 이후에 구독된 관찰자는 데이터를 수신할 수 없습니다. connect()를 호출할 때 구독 순서에 관계없이 모든 관찰자가 동시에 데이터를 수신하도록 하려면 replay()를 사용해야 합니다.
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //这里将publish()改为replay() ConnectableObservable<Integer> connectableObservable =observable.replay(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
출력:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3 com.m520it.rxjava I/IT520: call--2--: 1 com.m520it.rxjava I/IT520: call--2--: 2 com.m520it.rxjava I/IT520: call--2--: 3
9. "Cold Observable" & "Hot Observable"
앞서 구독할 때(관찰자가 데이터를 보내는 경우) 일부 관찰자는 데이터를 직접 전송하고 일부는 데이터를 수신하기 전에 일정 시간 동안 기다립니다.
Observable에서 발생하는 onError 알림이나 오류로 인해 응답하는 데 많은 연산자를 사용할 수 있습니다.
onErrorReturn 메소드는 원래 Observable의 동작을 미러링하는 새로운 Observable을 반환합니다. 후자는 전자의 onError 호출을 무시하고 관찰자에게 오류를 전달하지 않습니다. 대신 특수 항목을 내보내고 관찰자의 onCompleted 메서드를 호출합니다.
다음 코드는 1, 2, 3을 보내고 전송 과정에서 예외 전송을 시뮬레이션합니다. 예외가 전송되면 onErrorReturn()이 호출되고 44가 전송됩니다. : Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new NullPointerException("mock exception !"));
subscriber.onNext(3);
}
}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 44;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 44
오류가 발생하면 Observable이 두 번째 Observable의 데이터 시퀀스를 내보내도록 합니다.
다음 코드는 전송 시 예외 전송을 시뮬레이션합니다. 그런 다음 onErrorResumeNext가 호출되고 새로운 Observable 객체를 방출하기 시작합니다. Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new NullPointerException("mock exception !"));
subscriber.onNext(3);
}
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
Observable<Integer> innerObservable =
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(4);
subscriber.onNext(5);
}
});
return innerObservable;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: "+integer);
}
});
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 3 com.m520it.rxjava I/IT520: call: 4 com.m520it.rxjava I/IT520: call: 5
오류가 발생할 때 Observable이 계속해서 후속 데이터 항목을 내보내도록 합니다.
//创建一个错误处理的Observable对象 Observable<Integer> exceptionObserver = Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(55); subscriber.onNext(66); } }); Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver .onExceptionResumeNext(exceptionObserver) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
출력: com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66
원래 Observable에 오류가 발생하면 다시 구독하고 정상적으로 종료될 것으로 예상하세요.
RxJava의 구현은 retry와 retryWhen입니다.
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) .retry(3)//重复3次订阅 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Javadoc: retry()) onError 알림을 몇 번이나 수신하더라도 계속 구독됩니다. Primitive Observable을 방출합니다.
위 내용은 RxJava_02 [구독 깊이 및 예외 처리] 코드 예제의 내용을 간단히 정리한 내용이며, 더 많은 관련 내용은 PHP 중국어 홈페이지(www.php.cn)를 참고하시기 바랍니다. )!