RxJava Observable Count Operator

0

In this article we will look into the ReactiveX count() operator and the internals involved. We will come to know how the Observables and Observers get together to transform an Observable that emits a sequence of items into an Observable that emits a single value representing the count of items emitted by the source Observable.

Count

ObservableCountSingle represents the Observable count. Since we cannot provide the count until the source sequence completes, ObservableCountSingle notifies the subscribers only on successful completion or when there is an error.
If the source Observable terminates with an error, Count will pass this error notification along without emitting an item first. If the source Observable terminates successfully then it will be provide the subscribers with the count of items emitted.

Single

We first create an Observable source.

String[] persons = new String[] {"Joe", "Jane", "John", "Phil"};
Observable.fromArray(persons) 

Next, we apply count() operator on the source Observable to get the count Observable which is not exactly an Observable but a variant of it called “Single” as it returns a single element response or an error notification in case of any error.

Observable.fromArray(items).count();

Sequence flow of ReactiveX Count

In order to get the count of items, we call blockingGet on the single observable.

String[] persons = new String[] {"Joe", "Jane", "John", "Phil"};
long count = Observable.fromArray(persons).count().blockingGet();

Internally, we deal with two observers, one is CountObserver which maintains the count of the emitted item by subscribing to onNext(). The second observer is BlockingMultiObserver which is nothing but an extension of CountDownLatch and awaits till all elements are emitted or an error signal is notified.

In the below diagram we see the sequence of methods called when blockingGet() is called.


In the below example, we create an Observable array consisting of person names. We then get a count of it. We also filter the person names starting with “J” and then apply count on the final sequence of items.


ObservableFromArrayCountExample:

package com.javarticles.rx;

import io.reactivex.Observable;

public class ObservableFromArrayCountExample {
    public static void main(String[] args) {
        String[] persons = new String[] {"Joe", "Jane", "John", "Phil"};
        long count = Observable.fromArray(persons)
                .count().blockingGet();      
        System.out.println("Total number of persons: " + count);
        count = Observable.fromArray(persons).filter((person)->person.startsWith("J")).count().blockingGet();
        System.out.println("Total number of persons whose name starts with J: " + count);
        System.out.println(Observable.fromArray(persons).count().filter((c)->c > 2).blockingGet());
        System.out.println(Observable.fromArray(persons).count().filter((c)->c > 5).blockingGet());
    }
}

Output:

Total number of persons: 4
Total number of persons whose name starts with J: 3
4
null

Download the source code

This was an example about RxJava Observable count operator.

You can download the source code here: RxJavaObservableFromArrayCount.zip
Share.

Comments are closed.