RxJava Observable from Iterable Example

0

Observable is the main class that the reactive pattern is built on. In this article we will build an Observable object from a list of objects and then a subscriber that subscribes to it.

As soon as the subscriber subscribes to it, the Observable starts emitting the items in the sequence. The source of items is the list of objects that we have passed on to create the Observable.

Below code converts an Iterable sequence into Observable.

 Observable.from(list);

from() operator is used to convert an Iterable, a Future, or an Array into an Observable.

Simple Observable Example

Let’s first create a simple Observable from a list of values and then subscribe to it. The subscriber simply prints the values emitted by the Observable.

ObservableExample:

package com.javarticles.rx;

import java.util.Arrays;
import java.util.List;

import rx.Observable;
import rx.Subscriber;


public class ObservableExample {
    public static void main(String[] args) {
        List<String> list = Arrays
                .asList("One", "Two", "Three", "Four", "Five");

        Observable<String> observable = Observable.from(list);

        observable.subscribe(new Subscriber<String>() {
            public void onStart() {
                System.out.println("onStart");
            }

            public void onCompleted() {
                System.out.println("Completed!");
            }

            public void onError(Throwable e) {
                System.out.println("Exception thrown: " + e);
            }

            public void onNext(String s) {
                System.out.println("Next element: " + s);
            }
        });
    }
}

Output:

onStart
Next element: One
Next element: Two
Next element: Three
Next element: Four
Next element: Five
Completed!

Observable with an RxJava Hook

In this example, we will plug in an execution hook just to get a feel of the different lifecycle points of Observable execution. The default implementation will be a pass through one which will not do anything. One can use execution hook for metrics or extra logging.

CustomRxObservableExecutionHook:

package com.javarticles.rx;

import rx.Observable;
import rx.Subscriber;
import rx.plugins.RxJavaObservableExecutionHook;

public class CustomRxObservableExecutionHook extends
        RxJavaObservableExecutionHook {
    public <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> f) {
        System.out.println("Observable is about to be created for " + f);
        return new MyOnSubscribe<T>(f);
    }

    class MyOnSubscribe<T> implements Observable.OnSubscribe<T> {
        private Observable.OnSubscribe<T> actualOnSubscribe;

        MyOnSubscribe(Observable.OnSubscribe<T> actualOnSubscribe) {
            this.actualOnSubscribe = actualOnSubscribe;
        }

        public void call(Subscriber<? super T> subscriber) {
            System.out.println("OnSubscribe.call called for subscriber "
                    + subscriber);
            actualOnSubscribe.call(subscriber);
        }
    }

}

ObservableWithRxHookExample:

package com.javarticles.rx;

import java.util.Arrays;
import java.util.List;

import rx.Observable;
import rx.Subscriber;

public class ObservableWithRxHookExample {
    public static void main(String[] args) {
        System.setProperty(
                "rxjava.plugin.RxJavaObservableExecutionHook.implementation",
                "com.javarticles.rx.CustomRxObservableExecutionHook");
        List<String> list = Arrays
                .asList("One", "Two", "Three", "Four", "Five");

        Observable<String> observable = Observable.from(list);

        observable.subscribe(new Subscriber<String>() {
            public void onStart() {
                System.out.println("onStart");
            }

            public void onCompleted() {
                System.out.println("Completed!");
            }

            public void onError(Throwable e) {
                System.out.println("Exception thrown: " + e);
            }

            public void onNext(String s) {
                System.out.println("Next element: " + s);
            }
        });
    }
}

Output:

Observable is about to be created for rx.internal.operators.OnSubscribeFromIterable@85ede7b
onStart
OnSubscribe.call called for subscriber rx.observers.SafeSubscriber@681a9515
Next element: One
Next element: Two
Next element: Three
Next element: Four
Next element: Five
Completed!

Backpressure vis request

In our next example, we will see how to request a certain maximum number of emitted items from the Observable. This way we can control the number of items an Observable is emitting.

BackpressureViaRequestExample:

package com.javarticles.rx;

import java.util.Arrays;
import java.util.List;

import rx.Observable;
import rx.Subscriber;


public class BackpressureViaRequestExample {
    public static void main(String[] args) {
        List list = Arrays
                .asList("One", "Two", "Three", "Four", "Five");

        Observable observable = Observable.from(list);

        observable.subscribe(new Subscriber() {
            public void onStart() {
                System.out.println("onStart");
                request(2);
            }

            public void onCompleted() {
                System.out.println("Completed!");
            }

            public void onError(Throwable e) {
                System.out.println("Exception thrown: " + e);
            }

            public void onNext(String s) {
                System.out.println("Next element: " + s);
            }
        });
        
    }
}

Output:

onStart
Next element: One
Next element: Two

Collaboration between components

In the below diagram we show the different RxJava components involved and how they interact with each other as soon as an Observer subscribes to an Observable.

Collaboration

Collaboration

Sequence of flow

The above interaction between components can also be viewed in form of a sequence diagram.

Sequence of flow

Sequence of flow

As you can see from the above sequence diagram, there can be zero or more onNext calls to the subscribed observer instance, depending on the number of items emitted, optionally followed by a single success or failure resulting in either onCompleted or onError. For simplicity sake, we have only shown the success scenario in the above flow.

Download the source code

This was an example of RxJava creating Observable from Iterable.

Download the source code here: RxJavaObservableFromIterable.zip

About Author

Ram's expertise lies in test driven development and re-factoring. He is passionate about open source technologies and loves blogging on various java and open-source technologies like spring. You can reach him at rsatish.m@gmail.com

Comments are closed.