Observable is the main class that the reactive pattern is built on. In this article we will build an Observable object from a list of objects and then a subscriber that subscribes to it.
As soon as the subscriber subscribes to it, the Observable starts emitting the items in the sequence. The source of items is the list of objects that we have passed on to create the Observable.
Below code converts an Iterable sequence into Observable.
Observable.from(list);
from()
operator is used to convert an Iterable, a Future, or an Array into an Observable.
Simple Observable Example
Let’s first create a simple Observable from a list of values and then subscribe to it. The subscriber simply prints the values emitted by the Observable.
ObservableExample:
package com.javarticles.rx; import java.util.Arrays; import java.util.List; import rx.Observable; import rx.Subscriber; public class ObservableExample { public static void main(String[] args) { List<String> list = Arrays .asList("One", "Two", "Three", "Four", "Five"); Observable<String> observable = Observable.from(list); observable.subscribe(new Subscriber<String>() { public void onStart() { System.out.println("onStart"); } public void onCompleted() { System.out.println("Completed!"); } public void onError(Throwable e) { System.out.println("Exception thrown: " + e); } public void onNext(String s) { System.out.println("Next element: " + s); } }); } }
Output:
onStart Next element: One Next element: Two Next element: Three Next element: Four Next element: Five Completed!
Observable with an RxJava Hook
In this example, we will plug in an execution hook just to get a feel of the different lifecycle points of Observable
execution. The default implementation will be a pass through one which will not do anything. One can use execution hook for metrics or extra logging.
CustomRxObservableExecutionHook:
package com.javarticles.rx; import rx.Observable; import rx.Subscriber; import rx.plugins.RxJavaObservableExecutionHook; public class CustomRxObservableExecutionHook extends RxJavaObservableExecutionHook { public <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> f) { System.out.println("Observable is about to be created for " + f); return new MyOnSubscribe<T>(f); } class MyOnSubscribe<T> implements Observable.OnSubscribe<T> { private Observable.OnSubscribe<T> actualOnSubscribe; MyOnSubscribe(Observable.OnSubscribe<T> actualOnSubscribe) { this.actualOnSubscribe = actualOnSubscribe; } public void call(Subscriber<? super T> subscriber) { System.out.println("OnSubscribe.call called for subscriber " + subscriber); actualOnSubscribe.call(subscriber); } } }
ObservableWithRxHookExample:
package com.javarticles.rx; import java.util.Arrays; import java.util.List; import rx.Observable; import rx.Subscriber; public class ObservableWithRxHookExample { public static void main(String[] args) { System.setProperty( "rxjava.plugin.RxJavaObservableExecutionHook.implementation", "com.javarticles.rx.CustomRxObservableExecutionHook"); List<String> list = Arrays .asList("One", "Two", "Three", "Four", "Five"); Observable<String> observable = Observable.from(list); observable.subscribe(new Subscriber<String>() { public void onStart() { System.out.println("onStart"); } public void onCompleted() { System.out.println("Completed!"); } public void onError(Throwable e) { System.out.println("Exception thrown: " + e); } public void onNext(String s) { System.out.println("Next element: " + s); } }); } }
Output:
Observable is about to be created for [email protected] onStart OnSubscribe.call called for subscriber [email protected] Next element: One Next element: Two Next element: Three Next element: Four Next element: Five Completed!
Backpressure vis request
In our next example, we will see how to request a certain maximum number of emitted items from the Observable. This way we can control the number of items an Observable is emitting.
BackpressureViaRequestExample:
package com.javarticles.rx; import java.util.Arrays; import java.util.List; import rx.Observable; import rx.Subscriber; public class BackpressureViaRequestExample { public static void main(String[] args) { List list = Arrays .asList("One", "Two", "Three", "Four", "Five"); Observable observable = Observable.from(list); observable.subscribe(new Subscriber() { public void onStart() { System.out.println("onStart"); request(2); } public void onCompleted() { System.out.println("Completed!"); } public void onError(Throwable e) { System.out.println("Exception thrown: " + e); } public void onNext(String s) { System.out.println("Next element: " + s); } }); } }
Output:
onStart Next element: One Next element: Two
Collaboration between components
In the below diagram we show the different RxJava components involved and how they interact with each other as soon as an Observer subscribes to an Observable.
Sequence of flow
The above interaction between components can also be viewed in form of a sequence diagram.
As you can see from the above sequence diagram, there can be zero or more onNext calls to the subscribed observer instance, depending on the number of items emitted, optionally followed by a single success or failure resulting in either onCompleted or onError. For simplicity sake, we have only shown the success scenario in the above flow.
Download the source code
This was an example of RxJava creating Observable from Iterable.