RxJava basic usage case sharing source code analysis [with code]
This article details the basic usage of RxJava. RxJava is a magical framework with very simple usage, but the internal implementation is a bit complicated and the code logic is a bit convoluted. Online articles about RxJava source code analysis, source code posts are few and incomplete. The complete source code analysis is listed below for reference.
1. Basic usage of RxJava
Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { } }).subscribe(new Observer<Object>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Object o) { } });
2. First look at the .subscribe(new Observer
public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber<T>(observer)); }
This is just the passed in observer parameter A simple encapsulation (ObserverableSubscriber)
Continue to look at the subscribe method
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber<T>(subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
You only need to pay attention here
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象, 所以在subscribe 方法里面调用了 onSubscribe.call(subscriber)方法
The subscriber here is the parameter passed in
protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
It can be seen that the onSubscribe object is the parameter passed in by create, then the whole process is very clear
The entire process will only be executed when the subscribe method is called: subscribe ===>Call onSubscribe.call(observer ) method also passes the observer in
Related recommendations:
RxJava Operator (8) Aggregate_PHP Tutorial
Video:JavaScript Basic strengthening video tutorial
The above is the detailed content of RxJava basic usage case sharing source code analysis [with code]. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

This article analyzes the top four JavaScript frameworks (React, Angular, Vue, Svelte) in 2025, comparing their performance, scalability, and future prospects. While all remain dominant due to strong communities and ecosystems, their relative popul

This article addresses the CVE-2022-1471 vulnerability in SnakeYAML, a critical flaw allowing remote code execution. It details how upgrading Spring Boot applications to SnakeYAML 1.33 or later mitigates this risk, emphasizing that dependency updat

Java's classloading involves loading, linking, and initializing classes using a hierarchical system with Bootstrap, Extension, and Application classloaders. The parent delegation model ensures core classes are loaded first, affecting custom class loa

The article discusses implementing multi-level caching in Java using Caffeine and Guava Cache to enhance application performance. It covers setup, integration, and performance benefits, along with configuration and eviction policy management best pra

Node.js 20 significantly enhances performance via V8 engine improvements, notably faster garbage collection and I/O. New features include better WebAssembly support and refined debugging tools, boosting developer productivity and application speed.

Iceberg, an open table format for large analytical datasets, improves data lake performance and scalability. It addresses limitations of Parquet/ORC through internal metadata management, enabling efficient schema evolution, time travel, concurrent w

This article explores integrating functional programming into Java using lambda expressions, Streams API, method references, and Optional. It highlights benefits like improved code readability and maintainability through conciseness and immutability

This article explores methods for sharing data between Cucumber steps, comparing scenario context, global variables, argument passing, and data structures. It emphasizes best practices for maintainability, including concise context use, descriptive
