RxJava Observable Defer Example

0

In this article we will look into an observable that defers the creation of the actual Observable till an Observer subscribes to it.
Imagine the actual source of observable is coming from a blocking queue. Whenever an observer subscribes to the observable, the observable needs to emit the items available in the queue. As soon as the observer subscribes, a new Observable is created based on the items available in the queue. Below is the marblem diagram.

A rough implementation of ObservableDefer:

class ObservableDefer {
...
void subscribe(Observer observer){
Observable newObservable = callable.call();
newObservable.subscribe(observer);
}
...}


In the below example, the source of observable is a blocking queue. We have items being added to the queue. Whenever an observer subscribes to the observable, we want it to build a new Observable based on the items available in the queue at that moment.

deferObservable = Observable.defer(new Callable<Observable<String>>() {

            @Override
            public Observable<String> call() throws Exception {
                List<String> list = new ArrayList<>();
                queue.drainTo(list);
                return Observable.fromIterable(list);
            }});

Here is the complete example. We add three items (“one”, “two” and “three”) and then we get the count of items emitted by the observable.
Once a new Observable is built based on the items drained from the queue, the queue becomes empty. Next time when we want to know get the count, it will be based on the fresh stream of items.

ObservableDeferExample:

package com.javarticles.rx;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;

import io.reactivex.Observable;
import io.reactivex.Single;

public class ObservableDeferExample {
    private Observable<String> deferObservable;
    private Single<Long> observableCount;
    private BlockingQueue<String> queue = new LinkedBlockingQueue<>();

    public ObservableDeferExample() {
        createDeferObservable();
    }

    public static void main(String[] args) {
        ObservableDeferExample ode = new ObservableDeferExample();
        ode.offer("One", "Two", "Three");
        System.out.println(ode.getCount());
        ode.offer("Four", "Five");
        System.out.println(ode.getCount());
    }

    private long getCount() {
        return observableCount.blockingGet();
    }

    private void offer(String...data) {
        for (String e : data) {
            queue.offer(e);
        }
    }

    private void createDeferObservable() {
        queue = new LinkedBlockingQueue<>();
        System.out.println("Create defer observable");
        deferObservable = Observable.defer(new Callable<Observable<String>>() {

            @Override
            public Observable<String> call() throws Exception {
                List<String> list = new ArrayList<>();
                queue.drainTo(list);
                return Observable.fromIterable(list);
            }});
        observableCount = deferObservable.count();
    }
}

As you can see the first time when we get the count, it shows a count of three. Next call shows count of two.

Output:

Create defer observable
3
2

Download the source code

This was an example about RxJava Defer where the actual creation of Observable is deferred till the subscriber subscribes.

You can download the source code here: RxJavaObservableDefer.zip
Share.

Comments are closed.