Observable Interval Example

0

Observable.interval() returns an observable that emits numbers in sequence on a specified scheduler. In this article, we will look into the the default version of the interval() that runs on the computational scheduler.

We will get a glimpse of the internals of both the interval observable as well as the computational scheduler.

Let’s first look into the computational scheduler.


As you can see computational schedule creates a pool of workers where each worker is defined by a single-threaded ScheduledExecutorService instances. One can control the number of computational scheduler threads using the setting rx2.computation-threads but the number is capped by the number of available processors.

This kind of scheduler is best suited for event-loops, processing callbacks and other computational work and is not recommended to perform blocking or IO-bound work.

Now let’s get to the interval observable. The below code creates an interval observable that will start emitting the sequences after a delay of five seconds and will emit after the next sequence after a period of two seconds.

Observable.interval(5, 2, TimeUnit.SECONDS);

The disposable object is nothing but a wrapper around the actual observer that knows the current scheduled task which can be canceled at any point of time. The onSubscribe(disposable) callback receives this disposable object. This is disposable as thru this the one can cancel the current scheduled task.

In the below example, we create a couple of interval observables where the subscriber prints the thread name and the sequence.

ObservableIntervalExample:

package com.javarticles.rx;

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;

public class ObservableIntervalExample {
    public static void main(String[] args) {
        Observable<Long> observable1 = Observable.interval(5, 2, TimeUnit.SECONDS);
        observable1.subscribe((l)->{
            System.out.println(Thread.currentThread().getName() + ":" + l);
        });
        
        Observable<Long> observable2 = Observable.interval(5, 2, TimeUnit.SECONDS);
        observable2.subscribe((l)->{
            System.out.println(Thread.currentThread().getName() + ":" + l);
        });
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output:

RxComputationThreadPool-1:0
RxComputationThreadPool-2:0
RxComputationThreadPool-1:1
RxComputationThreadPool-2:1
RxComputationThreadPool-1:2
RxComputationThreadPool-2:2
RxComputationThreadPool-1:3
RxComputationThreadPool-2:3
RxComputationThreadPool-1:4
RxComputationThreadPool-2:4
RxComputationThreadPool-1:5
RxComputationThreadPool-2:5
RxComputationThreadPool-2:6
RxComputationThreadPool-1:6
RxComputationThreadPool-1:7
RxComputationThreadPool-2:7
</pre

Download the source code

This was an example about Interval Observable.

You can download the source code here: RxJavaObservableInterval.zip

Share.

Comments are closed.