Ce tutoriel est basé sur RxJava1 >8. Connectable ObservablesLes sections précédentes mentionnaient trois détails dans le processus de candidature : Observables, Observateurs et Abonnements. La section suivante continue de comprendre d'autres points de connaissances sur l'abonnement.
Nous introduisons ici un observable connectable appelé ConnectableObservable. Un Observable connectable est comme un Observable normal, à l'exception de ceci : un Observable connectable ne commence pas à émettre de données lorsqu'il est abonné, uniquement lorsque son connect() est appelé. De cette façon, vous pouvez attendre que tous les abonnés potentiels se soient abonnés à l'Observable avant de commencer à émettre des données.
Cet opérateur peut convertir un Observable normal en un Observable connectable.
Un Observable connectable (connectable Observable) est similaire à un Observable normal, mais il ne commence pas à émettre de données lorsqu'il est abonné, mais ne démarre pas tant que l'opérateur Connect n'est pas utilisé. De cette façon, vous pouvez demander à un Observable de commencer à émettre des données à tout moment.
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
RefCount
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //为可连接的被观察者订阅事件,但这里并不会马上发送事件 connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } }); //当调用可连接的被观察者connect()方法后 开始发送所有的数据。 connectableObservable.connect(new Action1() { @Override public void call(Subscription subscription) { Log.i(TAG, "call: "+subscription); } });
IT520: call: OperatorPublish$PublishSubscriber@20dce78 IT520: call: 1 IT520: call: 2 IT520: call: 3
//创建了一个普通的Observable对象 Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //将一个被观察者转换成一个可连接的被观察者 ConnectableObservable<Integer> connectableObservable =observable.publish(); //将一个可链接的被观察者转换成一个普通观察者 Observable<Integer> integerObservable = connectableObservable.refCount(); //为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3... integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Nous convertissons d'abord un Observable ordinaire en un ConnectableObservable via publi(). Lorsque connect() est appelé, les observateurs abonnés ci-dessus connect() recevront les données. Les observateurs abonnés après connect() ne peuvent pas recevoir de données. Si nous voulons que tous les observateurs reçoivent des données en même temps lors de l'appel de connect(), quel que soit l'ordre des abonnements, nous devons utiliser replay().
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); ConnectableObservable<Integer> connectableObservable =observable.publish(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
Sortie :
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3
9. "Froid Observable" & "Hot Observable"
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); } }); //这里将publish()改为replay() ConnectableObservable<Integer> connectableObservable =observable.replay(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--1--: "+integer); } }); connectableObservable.connect(); connectableObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call--2--: "+integer); } });
com.m520it.rxjava I/IT520: call--1--: 1 com.m520it.rxjava I/IT520: call--1--: 2 com.m520it.rxjava I/IT520: call--1--: 3 com.m520it.rxjava I/IT520: call--2--: 1 com.m520it.rxjava I/IT520: call--2--: 2 com.m520it.rxjava I/IT520: call--2--: 3
De nombreux opérateurs peuvent être utilisés pour répondre aux notifications onError émises par un Observable ou aux erreurs La récupération
La méthode onErrorReturn renvoie un nouvel observable qui reflète le comportement de l'observable d'origine. ce dernier ignorera l'appel onError du premier et ne transmettra pas l'erreur à l'observateur, il émettra plutôt un élément spécial et appellera la méthode onCompleted de l'observateur.
Le code suivant envoie 1, 2, 3 et simule l'envoi d'une exception pendant le processus d'envoi. Tant qu'une exception est envoyée, onErrorReturn() sera appelé et 44 sera envoyé. :
Sortie : onErrorResumeNextObservable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 44; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 44
Le code suivant simule l'envoi d'une exception lors de l'envoi. Ensuite, onErrorResumeNext sera appelé et commencera à émettre de nouveaux objets observables.
Sortie : onExceptionResumeNextObservable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { Observable<Integer> innerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(4); subscriber.onNext(5); } }); return innerObservable; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 3 com.m520it.rxjava I/IT520: call: 4 com.m520it.rxjava I/IT520: call: 5
Sortie :
Mécanisme de nouvelle tentative
//创建一个错误处理的Observable对象 Observable<Integer> exceptionObserver = Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(55); subscriber.onNext(66); } }); Observable .create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver .onExceptionResumeNext(exceptionObserver) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
com.m520it.rxjava I/IT520: call: 1 com.m520it.rxjava I/IT520: call: 2 com.m520it.rxjava I/IT520: call: 55 com.m520it.rxjava I/IT520: call: 66
Les fonctions similaires incluent :
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new NullPointerException("mock exception !")); subscriber.onNext(3); } }) .retry(3)//重复3次订阅 .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "call: "+integer); } });
Javadoc : retry()) Quel que soit le nombre de notifications onError reçues, l'abonnement continuera et émettent l'Observable original.
Ce qui précède est le contenu de l'exemple de code de RxJava_02 [profondeur d'abonnement et gestion des exceptions] en termes simples. Pour plus de contenu connexe, veuillez faire attention au site Web PHP chinois (www.php.cn). )!