Dieses Tutorial basiert auf RxJava1. >1 Was ist RxJava?
Das Framework wurde von einem Team unter der Leitung des Microsoft-Architekten Erik Meijer entwickelt und wurde im November 2012 als Open Source veröffentlicht.
Die Rx-Bibliothek unterstützt .NET, JavaScript, C++ usw. und unterstützt mittlerweile fast alle gängigen Programmiersprachen.
Die meisten Sprachbibliotheken von Rx werden von der Organisation ReactiveX verwaltet. Die beliebtesten sind RxJava/RxJS/Rx.NET und die Community-Website ist reactivex.io.
RxJava ist ein beliebtes Framework, dessen Quellcode auf GitHub basiert. Zusätzlich zur Unterstützung von RxJava gibt es auch ein Support-Framework für Android-Systeme, RxAndroid
2. Vereinfachter RxJava-Code
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start();
Observable.from(folders) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1<File, Bitmap>() { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } });
Derzeit wurde RxJava auf Version 2.0 aktualisiert, aber um RxJava besser zu verstehen, können wir mit dem Lernen ab Version 1.0 beginnen. Damit unser Android-Projekt RxJava besser nutzen kann, können wir die Gradle-Skriptabhängigkeit in das Projekt einführen:
Jetzt unterstützt unser Projekt bereits die Funktionen von RxJava. 4. Der Kern der Reaktionsfähigkeitcompile 'io.reactivex:rxandroid:1.2.1' compile 'io.reactivex:rxjava:1.1.6'
Wenn wir früher Nachrichten lesen wollten, mussten wir normalerweise Zeitungen lesen. Wenn Sie beispielsweise an einer Zeitung oder Zeitschrift interessiert sind, müssen Sie zunächst drei Dinge tun:
Geben Sie Ihre Privatadresse an
Finden Sie die entsprechende Zeitung
Gehen Sie zur Zeitung und abonnieren Sie die Zeitung für den ganzen Monat
Nachdem Sie den oben genannten Prozess durchlaufen haben, werden neue Zeitungsmaterialien angezeigt erscheint jeden Tag, die Zeitung schickt Ihnen das Magazin nach Hause.
Codeabstraktion des obigen Beispiels, die Schritte sind wie folgt:
Stellen Sie Beobachter bereit (weil Sie sind eine Person, der der Inhalt des Magazins am Herzen liegt, also sind Sie die Person, die das Ereignis beobachtet)
Geben Sie die Person an, die beobachtet wird (solange ein neues Magazin herauskommt, Die Personen, die sich darum kümmern, müssen benachrichtigt werden, damit das Zeitungsbüro beobachtet wird. Objekt)
Abonnement (das heißt, der Beobachter und das Beobachtete müssen miteinander in Beziehung gesetzt werden, damit so bald wenn sich das beobachtete Objekt ändert, wird das Objekt, das das Ereignis beobachtet, sofort benachrichtigt)
//1.创建被观察者 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //4.开始发送事件 //事件有3个类型 分别是onNext() onCompleted() onError() //onCompleted() onError() 一般都是用来通知观察者 事件发送完毕了,两者只取其一。 subscriber.onNext("Hello Android !"); subscriber.onNext("Hello Java !"); subscriber.onNext("Hello C !"); subscriber.onCompleted(); } }); //2.创建观察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; //3.订阅 observable.subscribe(subscriber);
com.m520it.rxjava I/IT520: onNext: Hello Android ! com.m520it.rxjava I/IT520: onNext: Hello Java ! com.m520it.rxjava I/IT520: onNext: Hello C ! com.m520it.rxjava I/IT520: onCompleted
In der call()-Methodenentität von Observable, nach dem Senden von Ereignissen wie onNext/onCompleted/onError.
Dann kann der Teilnehmer zur entsprechenden Methode zurückrufen.
5. Observable-Varianten
onSuccess – Single gibt einen einzelnen Wert an diese Methode aus
onError – Wenn der erforderliche Wert nicht ausgegeben werden kann, gibt Single ein Throwable-Objekt an diese Methode aus
Single ruft nur eine dieser beiden Methoden auf, und zwar nur einmal. Nach dem Aufruf einer beliebigen Methode wird die Abonnementbeziehung beendet.
final Single<String> single = Single.create(new Single.OnSubscribe<String>() { @Override public void call(SingleSubscriber<? super String> singleSubscriber) { //先调用onNext() 最后调用onCompleted() //singleSubscriber.onSuccess("Hello Android !"); //只调用onError(); singleSubscriber.onError(new NullPointerException("mock Exception !")); } }); Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; single.subscribe(observer);
Wenn Sie sich im obigen Code nur um das onNext()-Ereignis kümmern, aber onCompleted()- und onError()-Ereignisse implementieren müssen, erscheint dieser Code sehr aufgebläht. Angesichts dieser Nachfrage hat das RxJava-Framework spezifische Anpassungen im Abonnement vorgenommen. Der Code lautet wie folgt:
//创建观察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; //订阅 observable.subscribe(subscriber);
//为指定的onNext事件创建独立的接口 Action1<String> onNextAction = new Action1<String>() { @Override public void call(String s) { Log.i(TAG, "call: "+s); } }; //订阅 observable.subscribe(onNextAction);
public Subscription subscribe(final Observer Observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)
这里还有一个forEach函数有类似的功能:
public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)
上面2节中既介绍了被观察者变种,又介绍了观察者变种,这里再介绍一种雌雄同体的对象(既作为被观察者使用,也可以作为观察者)。
针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在。
一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。它会把这最后一个值发射给任何后续的观察者。
以下贴出代码:
//创建被观察者 final AsyncSubject<String> subject = AsyncSubject.create(); //创建观察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(String s) { Log.i(TAG, "s:" + s); } }; //订阅事件 subject.subscribe(subscriber); //被观察者发出事件 如果调用onCompleted(),onNext()则会打印最后一个事件;如果没有,onNext()则不打印任何事件。 subject.onNext("Hello Android "); subject.onNext("Hello Java "); subject.onCompleted();
输出:
s:Hello Java onCompleted
然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
上面的观察者被观察者代码相同,现在发出一系列信号,并在最后发出异常 代码如下:
subject.onNext("Hello Android "); subject.onNext("Hello Java "); //因为发送了异常 所以onNext()无法被打印 subject.onError(null);
当观察者订阅BehaviorSubject时,他会将订阅前最后一次发送的事件和订阅后的所有发送事件都打印出来,如果订阅前无发送事件,则会默认接收构造器create(T)里面的对象和订阅后的所有事件,代码如下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL"); Subscriber subscriber = new Subscriber() { @Override public void onCompleted() { Log.i(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(Object o) { Log.i(TAG, "onNext: " + o); } }; //subject.onNext("Hello Android !"); //subject.onNext("Hello Java !"); //subject.onNext("Hello C !"); //这里开始订阅 如果上面的3个注释没去掉,则Hello C的事件和订阅后面的事件生效 //如果上面的三个注释去掉 则打印构造器NORMAL事件生效后和订阅后面的事件生效 subject.subscribe(subscriber); subject.onNext("Hello CPP !"); subject.onNext("Hello IOS !");
PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据,因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。
代码如下:
PublishSubject subject= PublishSubject.create(); Action1<String> onNextAction1 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction1 call: "+s); } }; Action1<String> onNextAction2 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction2 call: "+s); } }; subject.onNext("Hello Android !"); subject.subscribe(onNextAction1); subject.onNext("Hello Java !"); subject.subscribe(onNextAction2); subject.onNext("Hello IOS !");
输出如下:
onNextAction1 call: Hello Java ! onNextAction1 call: Hello IOS ! onNextAction2 call: Hello IOS !
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。
代码如下:
ReplaySubject subject= ReplaySubject.create(); Action1<String> onNextAction1 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction1 call: "+s); } }; Action1<String> onNextAction2 = new Action1<String>(){ @Override public void call(String s) { Log.i(TAG, "onNextAction2 call: "+s); } }; subject.onNext("Hello Android !"); subject.subscribe(onNextAction1); subject.onNext("Hello Java !"); subject.subscribe(onNextAction2); subject.onNext("Hello IOS !");
输出如下:
onNextAction1 call: Hello Android ! onNextAction1 call: Hello Java ! onNextAction2 call: Hello Android ! onNextAction2 call: Hello Java ! onNextAction1 call: Hello IOS ! onNextAction2 call: Hello IOS !
AsyncSubject无论何时订阅 只会接收最后一次onNext()事件,如果最后出现异常,则不会打印任何onNext()
BehaviorSubject会从订阅前最后一次oNext()开始打印直至结束。如果订阅前无调用onNext(),则调用默认creat(T)传入的对象。如果异常后才调用,则不打印onNext()
PublishSubject只会打印订阅后的任何事件。
ReplaySubject无论订阅在何时都会调用发送的事件。
以上就是深入浅出RxJava_01[什么是RxJava] 的详细介绍的内容,更多相关内容请关注PHP中文网(www.php.cn)!