Backend Development 22 min read

Introduction to RxJava: Concepts, Operators, and Backpressure

This article introduces RxJava as a reactive functional programming framework based on the observer pattern, explains its core concepts, stream types, a wide range of operators, backpressure strategies, and demonstrates its practical use for data migration between MongoDB and MySQL.

New Oriental Technology
New Oriental Technology
New Oriental Technology
Introduction to RxJava: Concepts, Operators, and Backpressure

Preface

Improving development efficiency, simplifying logic, and standardizing development processes are top priorities for technical teams. RxJava helps address these issues by providing a reactive programming model.

Using RxJava enables:

Simplified logic and decoupled module dependencies

A rich set of operators covering most scenarios

Built‑in parallel asynchronous operations for higher performance

Elegant error handling

The goal of this article is to give a basic understanding of RxJava and show how it can boost development efficiency while illustrating the power of reactive functional programming.

What Is RxJava?

RxJava is commonly described as a reactive functional programming framework implemented via the Observer pattern .

Observer Pattern

The observer pattern is the core principle of RxJava: when an object changes, all dependent objects receive a notification.

Reactive Functional Programming

Reactive functional programming consists of two concepts: reactive and functional. To illustrate, we compare them with traditional imperative programming.

Reactive vs. Imperative

Reactive: event‑driven processing based on a data stream

Imperative: a sequence of commands telling the computer what to do

Functional vs. Imperative

Functional focuses on data mapping relationships

Imperative focuses on step‑by‑step problem solving

RxJava Detailed Usage

RxJava provides operators to manipulate data streams and uses a backpressure strategy to prevent memory overflow when the consumer is slower than the producer.

The following sections detail streams, operators, and backpressure.

RxJava Streams

Stream Types

Hot streams: may start emitting events regardless of subscribers (e.g., ConnectableObservable ).

Cold streams: start emitting only when there is a subscriber (e.g., streams created with create or from ).

Creating Streams

Streams can be created manually with Observable.create or conveniently with operators such as from , just , and range :

Observable<Integer> createTest = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        Utils.log("before");
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onNext(3);
        Utils.log("after");
    }
});
// from
List<Integer> numbers = new ArrayList<>();
Observable.from(numbers);
// just
Observable.just(1, 2, 3, 4, 5, 6);
// range
Observable.range(5, 3); // emits 5,6,7

Subscribing to Streams

Subscription is performed by calling subscribe on the observable, establishing a relationship with an observer.

private Action1<Integer> observer = x -> {
    System.out.println(x);
};
Observable<Integer> observable = Observable.range(5, 3);
observable.subscribe(observer);
Observable<Integer> observable = Observable.range(5, 3);
observable.subscribe(x -> {
    System.out.println(x);
});

RxJava Operators

Operators are the strength of RxJava, covering most use‑cases. They are grouped into basic, advanced, and multi‑stream operators.

Basic Operators

map : transforms data (A → B)

Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5);
Observable<String> testObsMap = testObs.map(s -> s + "test");
testObs.subscribe(Utils::log);
testObsMap.subscribe(Utils::log);

filter : filters data

Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> testObsFilter = testObs.filter(s -> s.equals(1));
testObsFilter.subscribe(Utils::log);

flatMap : flattens nested data and enables asynchronous parallelism

List<Order> user1Orders = new ArrayList<Order>(){{
  add(Order.builder().id("user1-order1").build());
  add(Order.builder().id("user1-order2").build());
  add(Order.builder().id("user1-order3").build());
}};
List<Order> user2Orders = new ArrayList<Order>(){{
  add(Order.builder().id("user2-order1").build());
  add(Order.builder().id("user2-order2").build());
  add(Order.builder().id("user2-order3").build());
}};
User user1 = User.builder().name("user1").orders(user1Orders).build();
User user2 = User.builder().name("user2").orders(user2Orders).build();
Observable<User> testObs = Observable.just(user1, user2);
Observable<Order> orderObs = testObs.map(User::getOrders).flatMap(Observable::from);
Observable<Order> orderObs2 = testObs.flatMap(User::getOrders);
orderObs.subscribe(o -> Utils.log("订阅order流1:" + o.getId()));
orderObs2.subscribe(o -> Utils.log("订阅order流2:" + o.getId()));

concatMap : similar to flatMap but preserves order (synchronous)

delay : delays emission of items

// delay each item by 5 seconds
Observable<Integer> testObs = Observable.just(1, 2, 3).delay(5000, TimeUnit.MILLISECONDS);
// delay based on a selector function
Observable<Integer> testObs2 = Observable.just(1, 2, 3).delay(i -> timer(i, TimeUnit.SECONDS));

doOnNext : side‑effect for each emitted item (e.g., logging) without altering data

Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5)
    .doOnNext(s -> Utils.log("first doOnNext: " + s))
    .filter(s -> s / 2 == 0)
    .doOnNext(s -> Utils.log("second doOnNext: " + s))
    .map(s -> s + 10)
    .doOnNext(s -> Utils.log("third doOnNext: " + s));
testObs.subscribe(Utils::log);

doOnError : side‑effect for errors

Advanced Operators

scan : accumulates a value and emits each intermediate result

Observable<BigInteger> factorials = Observable.range(2, 10)
    .scan(BigInteger.ONE, (big, current) -> big.add(BigInteger.valueOf(current)));
factorials.subscribe(Utils::log);

reduce : aggregates all items into a single final result

Observable<BigInteger> factorials = Observable.range(2, 10)
    .reduce(BigInteger.ONE, (big, current) -> big.add(BigInteger.valueOf(current)));
factorials.subscribe(Utils::log);

collect : collects items into a mutable container (e.g., a List)

// collect into a List
Observable<List<Integer>> all = Observable.range(2, 10)
    .collect(ArrayList::new, List::add);
all.subscribe(Utils::log);

distinct / distinctUntilChanged : removes duplicate events

Observable<Integer> obs = Observable.just(1, 2, 3, 1, 2);
Observable<Integer> obs2 = obs.distinct(x -> x);
obs2.subscribe(Utils::log);

groupBy : groups items by a key

Order order1 = Order.builder().id("1").build();
Order order2 = Order.builder().id("1").build();
Order order3 = Order.builder().id("2").build();
Observable<Order> obs = Observable.just(order1, order2, order3);
obs.groupBy(order -> {
    if (order.getId().equals("1")) return 1;
    else if (order.getId().equals("2")) return 2;
    else return 3;
}).subscribe(grouped -> grouped.subscribe(o ->
    Utils.log("key is " + grouped.getKey() + " value is " + o)));

publish , replay , refCount , share : convert cold streams to hot streams and manage subscription lifecycles

ConnectableObservable<Integer> testObs = Observable.just(1,2,3,4,5).publish();
testObs.connect();
Observable<Integer> testObs = Observable.just(1,2,3,4,5).publish().refCount();
Observable<Integer> testObs = Observable.just(1,2,3,4,5).share();

Multi‑Stream Operators

merge : merges multiple observables, allowing interleaved emissions

Observable<Integer> testObs1 = Observable.just(1,3,5);
Observable<Integer> testObs2 = Observable.just(2,4,8,9,6);
Observable<Integer> mergeObs = Observable.merge(testObs1, testObs2);
mergeObs.subscribe(Utils::log);

concat : concatenates observables sequentially without interleaving

zip : combines items from multiple observables based on a combinator function

Observable<Integer> testObs1 = Observable.just(1,2,3);
Observable<Integer> testObs2 = Observable.just(4,5,6);
Observable<Integer> testObsZip1 = Observable.zip(testObs1, testObs2, Integer::sum);
testObsZip1.subscribe(Utils::log);

combineLatest : emits when any source emits, using the latest values from all sources

Observable.combineLatest(
    Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x),
    Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x),
    (x, y) -> x + ":" + y)
    .forEach(System.out::println);

withLatestFrom (named testLatestFrom in the article): combines a main stream with the latest value of a secondary stream

Observable<String> slow = Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x);
Observable<String> fast = Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x);
slow.withLatestFrom(fast, (s, f) -> s + ":" + f).forEach(System.out::println);

amb : subscribes to multiple sources and mirrors the first one that emits, cancelling the others

Observable<String> slow = Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x)
    .doOnSubscribe(() -> Utils.log("subscribe to S"));
Observable<String> fast = Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x)
    .delay(300, TimeUnit.MILLISECONDS)
    .doOnSubscribe(() -> Utils.log("subscribe to F"))
    .doOnUnsubscribe(() -> Utils.log("unsubscribe to F"));
slow.ambWith(fast).subscribe(Utils::log);

RxJava Backpressure

When the upstream emits faster than the downstream can consume, memory overflow may occur. RxJava offers backpressure mechanisms to mitigate this.

Backpressure Concepts

Upstream speed > downstream speed leads to accumulation and possible OOM.

In RxJava 1.x, backpressure is handled via buffering, windowing, and a "pull‑push" model.

Buffering : buffer collects items into a List (uncontrolled memory), window collects into an Observable (controlled).

// buffer into lists of size 3
Observable.range(1, 7).buffer(3).subscribe(Utils::log);
// window into observables of size 3
Observable<Observable<Integer>> testObj = Observable.range(1, 7).window(3);
testObj.subscribe(x -> x.subscribe(y -> {
    System.out.println("------------------");
    System.out.println(y);
}));

Backpressure Strategies : onBackpressureDrop discards items when the buffer is full; onBackpressureBuffer buffers with a size limit and optional overflow handling.

private Observable<Integer> myRange(int from, int count) {
    return Observable.unsafeCreate(subscriber -> {
        int i = from;
        while (i < from + count) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext(i++);
            } else {
                return;
            }
        }
        subscriber.onCompleted();
    });
}
myRange(1, 100000000)
    .map(Dish::new)
    .onBackpressureBuffer(100, () -> log.warn("buffer full"))
    .observeOn(Schedulers.io())
    .subscribe(x -> {
        log.info("Washing: {}", x);
        try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
    });
myRange(1, 100000000)
    .map(Dish::new)
    .onBackpressureDrop(dish -> log.info("Throw away {}", dish))
    .observeOn(Schedulers.io())
    .subscribe(x -> {
        log.info("Washing: {}", x);
        try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
    });

Pull‑Push Model : using SyncOnSubscribe.createStateful to request the next batch only after the previous one is processed.

// create a stateful pull‑push observable
Observable.OnSubscribe<Long> onSubscribe = SyncOnSubscribe.createStateful(
    () -> 0L,
    (cur, observer) -> {
        observer.onNext(cur);
        return cur + 1;
    }
);
Observable<Long> naturals = Observable.unsafeCreate(onSubscribe);
naturals.observeOn(schedulerB).subscribe(x -> {
    log.info("Thread-{}-Washing: {}", Thread.currentThread().getName(), x);
    try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
});

RxJava Application Example

The article demonstrates a simple scenario: migrating data from MongoDB to MySQL. The data in MongoDB is nested and needs to be flattened, with some fields requiring mapping.

Analysis:

Three layers: extraction, transformation, and loading.

Use a pull‑push model to respect consumer speed.

Apply flatMap to flatten nested structures.

Apply map for field mapping.

Use doOnNext and doOnError for logging.

By leveraging RxJava, the workflow becomes clear, modular, and loosely coupled compared to traditional loop‑based pagination approaches.

References

Book: "RxJava Reactive Programming"

Official site: https://reactivex.io/

Other Notes

The concepts and most operators described are based on RxJava 1.x, but they remain applicable to RxJava 2.x and later versions.

The next article, "RxJava 2.x – Sharing", will detail operators and backpressure in RxJava 2.

Javareactive programmingRxJavaoperatorsbackpressure
New Oriental Technology
Written by

New Oriental Technology

Practical internet development experience, tech sharing, knowledge consolidation, and forward-thinking insights.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.