Ce tutoriel est basé sur RxJava1 >1 Qu'est-ce que RxJava
Le framework a été développé par une équipe dirigée par l'architecte Microsoft Erik Meijer et a été open source en novembre 2012.
La bibliothèque Rx prend en charge .NET, JavaScript, C, etc., et prend désormais en charge presque tous les langages de programmation populaires.
La plupart des bibliothèques de langage de Rx sont gérées par l'organisation ReactiveX. Les plus populaires sont RxJava/RxJS/Rx.NET, et le site Web communautaire est reactivex.io.
RxJava est un framework populaire, et son code source est basé sur GitHub. En plus de prendre en charge RxJava, il existe également un framework de support pour les systèmes Android, RxAndroid
2. Code simplifié RxJava
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });
Actuellement, RxJava a été mis à niveau vers la version 2.0, mais afin de mieux comprendre RxJava, nous pouvons commencer à apprendre à partir de la version 1.0. Afin de rendre notre projet Android plus capable d'utiliser RxJava, nous pouvons introduire la dépendance du script Gradle dans le projet :
Maintenant, notre projet prend déjà en charge les fonctions de RxJava. 4. Le noyau de la réactivitécompile 'io.reactivex:rxandroid:1.2.1' compile 'io.reactivex:rxjava:1.1.6'
Dans le passé, si nous voulions lire les informations, nous devions généralement lire les journaux. Par exemple, si vous êtes intéressé par un journal ou un magazine, vous devez d'abord faire 3 choses :
Fournir l'adresse de votre domicile
Trouver le journal correspondant
Allez au journal et abonnez-vous au journal pour tout le mois
Après avoir suivi le processus ci-dessus, de nouveaux documents de journal seront sera publié chaque jour, le journal enverra le magazine à votre domicile.
Abstraction du code de l'exemple ci-dessus, les étapes sont les suivantes :
Fournir des observateurs (car vous Vous êtes une personne soucieuse du contenu du magazine, vous êtes donc la personne qui observe l'événement)
Fournir la personne observée (à condition qu'un nouveau magazine sorte, les personnes qui s'en soucient doivent être informées, donc le bureau du journal est observé Objet)
Abonnement (c'est-à-dire que l'observateur et l'observé doivent être liés l'un à l'autre afin que dès que à mesure que l'objet observé change, l'objet observant l'événement sera immédiatement averti)
//1.创建被观察者 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //4.开始发送事件 //事件有3个类型 分别是onNext() onCompleted() onError() //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。 subscriber.onNext("Hello Android !"); subscriber.onNext("Hello Java !"); subscriber.onNext("Hello C !"); subscriber.onCompleted(); } }); //2.创建观察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; //3.订阅 observable.subscribe(subscriber);
com.m520it.rxjava I/IT520: onNext: Hello Android ! com.m520it.rxjava I/IT520: onNext: Hello Java ! com.m520it.rxjava I/IT520: onNext: Hello C ! com.m520it.rxjava I/IT520: onCompleted
Dans l'entité de méthode call() de l'observable, après avoir envoyé des événements tels que onNext/onCompleted/onError.
Ensuite l'abonné peut rappeler vers la méthode correspondante.
5. Variantes observables
onSuccess - Single émet une valeur unique à cette méthode
onError - Si la valeur requise ne peut pas être émise, Single émet un objet Throwable à cette méthode
Single n'appellera qu'une de ces deux méthodes, et elle ne sera appelée qu'une seule fois. Après avoir appelé une méthode, la relation d'abonnement prendra fin.
final Single<String> single = Single.create(new Single.OnSubscribe<String>() { @Override public void call(SingleSubscriber<? super String> singleSubscriber) { //先调用onNext() 最后调用onCompleted() //singleSubscriber.onSuccess("Hello Android !"); //只调用onError(); singleSubscriber.onError(new NullPointerException("mock Exception !")); } }); Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; single.subscribe(observer);
Dans le code ci-dessus, si vous vous souciez uniquement de l'événement onNext(), mais que vous devez implémenter les événements onCompleted()&onError(), un tel code apparaîtra très volumineux. Face à cette demande, le framework RxJava a procédé à des ajustements spécifiques en matière d'abonnement. Le code est le suivant :
//创建观察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; //订阅 observable.subscribe(subscriber);
//为指定的onNext事件创建独立的接口 Action1<String> onNextAction = new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: "+s); } }; //订阅 observable.subscribe(onNextAction);
abonnement public s'abonner (observateur final de l'observateur)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)
这里还有一个forEach函数有类似的功能:
public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)
上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。
针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。
一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。
以下贴出代码:
//创建被观察者 final AsyncSubject<String> subject = AsyncSubject.create(); //创建观察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(String s) { Log.i(TAG, "s:" + s); } }; //订阅事件 subject.subscribe(subscriber); //被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。 subject.onNext("Hello Android "); subject.onNext("Hello Java "); subject.onCompleted();
输出:
s:Hello Java onCompleted
然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:
subject.onNext("Hello Android "); subject.onNext("Hello Java "); //因为发送了异常 所以onNext()无法被打印 subject.onError(null);
当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL"); Subscriber subscriber = new Subscriber() { @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(Object o) { Log.i(TAG, "onNext: " + o); } }; //subject.onNext("Hello Android !"); //subject.onNext("Hello Java !"); //subject.onNext("Hello C !"); //这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效 //如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效 subject.subscribe(subscriber); subject.onNext("Hello CPP !"); subject.onNext("Hello IOS !");
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。
代码如下:
PublishSubject subject= PublishSubject.create(); Action1<String> onNextAction1 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction1 call: "+s); } }; Action1<String> onNextAction2 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction2 call: "+s); } }; subject.onNext("Hello Android !"); subject.subscribe(onNextAction1); subject.onNext("Hello Java !"); subject.subscribe(onNextAction2); subject.onNext("Hello IOS !");
输出如下:
onNextAction1 call: Hello Java ! onNextAction1 call: Hello IOS ! onNextAction2 call: Hello IOS !
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
代码如下:
ReplaySubject subject= ReplaySubject.create(); Action1<String> onNextAction1 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction1 call: "+s); } }; Action1<String> onNextAction2 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction2 call: "+s); } }; subject.onNext("Hello Android !"); subject.subscribe(onNextAction1); subject.onNext("Hello Java !"); subject.subscribe(onNextAction2); subject.onNext("Hello IOS !");
输出如下:
onNextAction1 call: Hello Android ! onNextAction1 call: Hello Java ! onNextAction2 call: Hello Android ! onNextAction2 call: Hello Java ! onNextAction1 call: Hello IOS ! onNextAction2 call: Hello IOS !
AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()
BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()
PublishSubject只会打印订阅后的任何事件。
ReplaySubject无论订阅在何时都会调用发送的事件。
以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!