Dieses Tutorial basiert auf RxJava1 >8.In den vorherigen Abschnitten wurden drei Details im Bewerbungsprozess erwähnt: Observables, Observers und Abonnements. Im nächsten Abschnitt werden weitere Wissenspunkte zum Abonnement erläutert.
Hier stellen wir ein verbindbares Observable namens ConnectableObservable vor. Ein verbindbares Observable ist genau wie ein normales Observable, mit der Ausnahme: Ein verbindbares Observable beginnt nicht mit der Ausgabe von Daten, wenn es abonniert wird, sondern erst, wenn sein connect() aufgerufen wird. Auf diese Weise können Sie warten, bis alle potenziellen Abonnenten das Observable abonniert haben, bevor Sie mit der Datenausgabe beginnen.
Dieser Operator kann ein normales Observable in ein verbindbares Observable umwandeln.
Ein verbindbares Observable (verbindbares Observable) ähnelt einem normalen Observable, beginnt jedoch nicht mit der Ausgabe von Daten, wenn es abonniert wird, sondern erst, wenn der Connect-Operator verwendet wird. Auf diese Weise können Sie ein Observable jederzeit mit der Ausgabe von Daten beginnen lassen.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
RefCount
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } }); //当调用可连接的被观察者connect()方法后 开始发送所有的数据。 connectableObservable.connect(new Action1() { @Override public void call(Subscription subscription) { Log.i(TAG, "call: "+subscription); } });
IT520: call: OperatorPublish$PublishSubscriber@20dce78 IT520: call: 1 IT520: call: 2 IT520: call: 3
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //将一个可链接的被观察者转换成一个普通观察者 Observable<Integer> integerObservable = connectableObservable.refCount(); //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3... integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Zuerst konvertieren wir ein gewöhnliches Observable in ein ConnectableObservable durch Publish(). Wenn connect() aufgerufen wird, erhalten die oben abonnierten Beobachter connect() die Daten. Beobachter, die nach connect() abonniert wurden, können keine Daten empfangen. Wenn wir möchten, dass alle Beobachter beim Aufruf von connect() unabhängig von der Reihenfolge der Abonnements gleichzeitig Daten empfangen, müssen wir replay() verwenden.
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); ConnectableObservable<Integer> connectableObservable =observable.publish(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
Ausgabe:
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3
9. „Cold Observable“ und „Hot Observable“
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //这里将publish()改为replay() ConnectableObservable<Integer> connectableObservable =observable.replay(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3 com.m520it.rxjava I/IT520: call--2--: 1 com.m520it.rxjava I/IT520: call--2--: 2 com.m520it.rxjava I/IT520: call--2--: 3
Viele Operatoren können verwendet werden, um auf onError-Benachrichtigungen zu reagieren, die von einem Observable oder aus Fehlern ausgegeben werden. Die Wiederherstellung
Die onErrorReturn-Methode gibt ein neues Observable zurück, das das Verhalten des ursprünglichen Observable widerspiegelt Letzterer ignoriert den onError-Aufruf des ersteren und gibt den Fehler nicht an den Beobachter weiter. Stattdessen wird ein spezielles Element ausgegeben und die onCompleted-Methode des Beobachters aufgerufen.
Der folgende Code sendet 1, 2, 3 und simuliert das Senden einer Ausnahme während des Sendevorgangs. Solange eine Ausnahme gesendet wird, wird onErrorReturn() aufgerufen und 44 gesendet. Der Code lautet wie folgt :
Ausgabe: onErrorResumeNextObservable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 44; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 44
Der folgende Code simuliert das Senden einer Ausnahme beim Senden. Dann wird onErrorResumeNext aufgerufen und beginnt mit der Ausgabe neuer Observable-Objekte.
Ausgabe: onExceptionResumeNextObservable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { Observable<Integer> innerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(4); subscriber.onNext(5); } }); return innerObservable; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 3 com.m520it.rxjava I/IT520: call: 4 com.m520it.rxjava I/IT520: call: 5
Ausgabe:
Wiederholungsmechanismus wiederholen
//创建一个错误处理的Observable对象 Observable<Integer> exceptionObserver = Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(55); subscriber.onNext(66); } }); Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver .onExceptionResumeNext(exceptionObserver) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 55 com.m520it.rxjava I/IT520: call: 66
Ähnliche Funktionen umfassen:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) .retry(3)//重复3次订阅 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Javadoc: retry()) Unabhängig davon, wie viele onError-Benachrichtigungen empfangen werden, wird das Abonnement fortgesetzt und das ursprüngliche Observable ausgeben.
Das Obige ist der Inhalt des Codebeispiels von RxJava_02 [Abonnementtiefe und Ausnahmebehandlung] in einfachen Worten. Weitere verwandte Inhalte finden Sie auf der chinesischen PHP-Website (www.php.cn). )!