이 기사에서는 RxJava의 기본 사용법을 자세히 설명합니다. RxJava는 사용법이 매우 간단한 마법의 프레임워크이지만 내부 구현이 약간 복잡하고 코드 로직이 약간 복잡합니다. RxJava 소스 코드 분석에 대한 온라인 기사, 소스 코드 게시물은 거의 없고 불완전합니다. 전체 소스 코드 분석은 참고용으로 아래에 나열되어 있습니다.
1. RxJava의 기본 사용법
Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { } }).subscribe(new Observer<Object>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { } });
2 먼저 .subscribe(new Observer
public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber<T>(observer)); }
이것은 관찰자 매개변수(ObserverableSubscriber)에 전달된 간단한 캡슐화일 뿐입니다. 구독 방법 읽어보기
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
여기만 주의하시면 됩니다
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象, 所以在subscribe 方法里面调用了 onSubscribe.call(subscriber)方法
여기의 구독자는 전달된 매개변수입니다
protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
onSubscribe 객체가 create에서 전달된 매개변수임을 알 수 있습니다. 그러면 전체 프로세스가 매우 명확해집니다.
subscribe 메소드만 호출됩니다. 전체 프로세스가 실행됩니다. subscribe ===> onSubscribe.call(observer) 메소드를 호출하고 관찰자도 전달합니다.
관련 권장 사항:
RxJava Operator (8) Aggregate_PHP 튜토리얼 JavaScript Basic 강화 동영상 튜토리얼위 내용은 RxJava 기본 사용 사례 공유 소스 코드 분석 [코드 포함]의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!