Mastering Reactor: From Mono & Flux Basics to Advanced Async Patterns
This article explains the fundamentals of Reactor's reactive programming model—including Mono and Flux types, map and flatMap operators, asynchronous execution, scheduler choices, and error handling—while providing practical code examples to help developers efficiently use Spring WebFlux.
Reactor Reactive Programming Overview
Reactor is a fully non‑blocking reactive programming foundation for the JVM that efficiently manages back‑pressure and integrates directly with Java 8 functional APIs such as
CompletableFuture,
Stream, and
Duration. It provides composable asynchronous sequence APIs—
Fluxfor N elements and
Monofor 0 or 1 element—implementing the Reactive Streams specification.
Background
Our backend projects mainly use Spring WebFlux, whose core reactive library is Reactor. Because Reactor has a learning curve, newcomers often encounter bugs related to
map,
flatMap, asynchronous processing, and concurrency. This guide focuses on those confusing points to help developers get up to speed with Spring WebFlux.
Mono
Mono<T>is a special
Publisher<T>that emits at most one item followed by either
onComplete(successful) or
onError. Certain operators can convert a
Monoto a
Flux, e.g.,
Mono.concatWith(Publisher)returns a
Flux, while
Mono.then(Mono)returns another
Mono.
Flux
Flux<T>is a standard
Publisher<T>that can emit 0 to N elements, terminating with either
onCompleteor
onError. All signals correspond to downstream method calls (
onNext,
onComplete,
onError). An empty
Fluxcan be turned into an infinite empty sequence by removing
onComplete. Infinite sequences such as
Flux.interval(Duration)produce unbounded elements.
map, flatMap and flatMapSequential
Method signatures
<code><span>// Map signature</span>
<V> Flux<V> map(Function<? super T, ? extends V> mapper)</code> <code><span>// FlatMap signature</span>
<R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)</code> mapapplies a synchronous function to each element, producing a one‑to‑one transformation.
flatMaptransforms each element into a
Publisherand merges the inner publishers, resulting in a one‑to‑many transformation; the order of emitted items is not guaranteed.
flatMapSequentialbehaves like
flatMapbut preserves the original order when merging.
Example 1
<code>void demo() {
// 1. Produce an infinite stream of Long values every 100 ms
final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
.map(log()) // log runs on the current thread
.flatMap(logOfFlux()); // flatMap runs on the current thread
flux.subscribe();
}
Function<Long, Long> log() {
return aLong -> {
log.info("num is {}", aLong);
return aLong;
};
}
Function<Long, Flux<Long>> logOfFlux() {
return aLong -> {
log.info("num is {}", aLong);
return Flux.just(aLong);
};
}</code>In this example the
flatMapoperation is synchronous because
Flux.just()emits the element immediately. The article later shows how to make
flatMapasynchronous.
Empty Mono Handling
<code>void monoOfEmpty() {
Mono.empty()
.map(m -> func())
.subscribe(message -> {
responseObserver.onNext(message);
responseObserver.onCompleted();
});
}</code>When
Mono.empty()is used, the
onCompletesignal prevents the
subscribeblock from executing. The correct approach is:
<code>void monoOfEmpty() {
Mono.empty()
.map(m -> func())
.doOnSuccess(v -> responseObserver.onCompleted())
.subscribe(responseObserver::onNext);
}</code>Asynchronous Execution and Multithreading
Reactor operators do not automatically switch threads; most operators run on the thread that invoked the previous operator unless a scheduler is specified. Two main ways to change execution context are
publishOnand
subscribeOn.
publishOn
<code>void demo() {
final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
.map(log()) // runs on caller thread
.publishOn(Schedulers.parallel()) // subsequent operators run on parallel pool
.map(log()) // runs on parallel thread
.publishOn(Schedulers.elastic()) // switch to elastic pool
.flatMap(logOfMono());
flux.subscribe();
}</code> publishOnaffects downstream operators until another
publishOnappears.
subscribeOn
<code>void demo() {
final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
.map(log())
.publishOn(Schedulers.parallel())
.map(log())
.subscribeOn(Schedulers.elastic()) // entire upstream runs on elastic pool
.flatMap(logOfMono());
flux.subscribe();
}</code> subscribeOninfluences the source subscription thread; later
publishOncan override it for downstream operators.
Parallel Execution
To run each element on a separate thread, convert a
Fluxto
ParallelFluxand specify a scheduler with
runOn. After parallel processing,
sequential()restores ordered processing.
<code>void demo() {
final Flux<Long> flux = Flux.fromIterable(Lists.newArrayList(3L, 1L, 2L))
.parallel()
.runOn(Schedulers.elastic())
.flatMap(logOfMono())
.sequential();
flux.subscribe();
}</code>Custom Thread Pools
<code>final Executor executor = new ThreadPoolExecutor(...);
Mono.create(sink -> executor.execute(() -> {
sink.success(blockFunction());
}));</code>Creating
Monoor
Fluxwith a custom executor gives fine‑grained control over the execution context and reduces cognitive load for callers.
Scheduler Options in Reactor
Current thread –
Schedulers.immediate()Single reusable thread –
Schedulers.single()(or
Schedulers.newSingle()for a dedicated thread)
Unbounded elastic pool –
Schedulers.elastic()(creates threads as needed, may be unsafe)
Bounded elastic pool –
Schedulers.boundedElastic()(limits thread count, suitable for blocking I/O)
Parallel pool –
Schedulers.parallel()(threads equal to CPU cores)
Error Handling
In reactive streams, an error terminates the sequence and propagates downstream until a subscriber handles it. Reactor provides operators that correspond to traditional try/catch patterns:
onErrorReturn– return a static fallback value.
onErrorResume– switch to another
Publisherbased on the exception.
doOnError– execute side‑effects such as logging.
doFinally– run cleanup logic regardless of termination type.
<code>Flux.just(10)
.flatMap(this::function)
.onErrorReturn("Error");
Flux.just(10)
.flatMap(this::function)
.onErrorResume(e -> handleErr(e));
Flux.just(10)
.flatMap(this::function)
.doOnError(e -> log.error(e.getMessage(), e));
</code>Choosing the Right Operator
Depending on the use case, select operators that preserve order (
flatMapSequential), provide parallelism (
parallel()+
runOn), or handle back‑pressure (
onBackpressureDrop,
onBackpressureBuffer, etc.).
GrowingIO Tech Team
The official technical account of GrowingIO, showcasing our tech innovations, experience summaries, and cutting‑edge black‑tech.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.