Detailed Source‑Code Analysis of RxJava Observable Subscription Flow
This article provides a comprehensive source‑code walkthrough of RxJava’s Observable subscription flow, detailing object creation, reverse subscription, thread scheduling, and operator handling with step‑by‑step code analysis.
In this article we dissect the complete RxJava Observable subscription process, covering three main stages: object creation, reverse subscription, and task execution. The analysis is based on the sample code that creates an Observable , applies map , doOnNext , subscribeOn , observeOn , and finally subscribes with Consumer callbacks.
1. Object Creation
The chain starts with Observable.create(new ObservableOnSubscribe () { ... }) . Inside subscribe() the emitter calls onNext(1) . The create operator is implemented as:
public static
Observable
create(ObservableOnSubscribe
source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate
(source));
}The ObservableCreate constructor simply stores the source:
public ObservableCreate(ObservableOnSubscribe
source) {
this.source = source;
}2. Operator map
The map operator wraps the upstream source with ObservableMap :
public final
Observable
map(Function
mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap
(this, mapper));
}ObservableMap stores the mapping function:
public ObservableMap(ObservableSource
source, Function
function) {
super(source);
this.function = function;
}The actual mapping occurs in MapObserver.onNext() where mapper.apply(t) is called and the result is forwarded downstream.
public void onNext(T t) {
if (done) return;
if (sourceMode != NONE) { actual.onNext(null); return; }
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}3. Operator doOnNext
doOnNext creates an ObservableDoOnEach that invokes a side‑effect consumer before passing the item downstream:
public final Observable
doOnNext(Consumer
onNext) {
return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}Its observer implementation runs the consumer and then forwards the item:
public void onNext(T t) {
if (done) return;
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
actual.onNext(t);
}4. Thread Scheduling – subscribeOn and observeOn
subscribeOn schedules the subscription side on the given Scheduler :
public final Observable
subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn
(this, scheduler));
}Its subscribeActual creates a SubscribeOnObserver , calls s.onSubscribe(parent) , and schedules the upstream source.subscribe(parent) on the scheduler’s worker.
public void subscribeActual(final Observer
s) {
final SubscribeOnObserver
parent = new SubscribeOnObserver
(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}observeOn moves downstream emissions to another scheduler:
public final Observable
observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}Its observer stores a Scheduler.Worker and queues items; the run() method drains the queue and calls actual.onNext() on the target thread.
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}5. Final Subscription
The terminal subscribe creates a LambdaObserver that forwards onNext , onError , and onComplete to the user‑provided callbacks:
public final Disposable subscribe(Consumer
onNext, Consumer
onError, Action onComplete, Consumer
onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver
ls = new LambdaObserver
(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}When the upstream finally emits the integer 1 , the chain transforms it to the string "1 is String!!!" via map , logs it in doOnNext , and delivers it to the subscriber’s onNext callback, after which onComplete is called.
6. Overall Flow Diagram
The article concludes with a flowchart (image) that visualises the reverse‑order subscription from downstream observers back to the upstream source, illustrating how each operator wraps the previous one and how thread scheduling points are inserted.
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.