In this article we will look into the ReactiveX count()
operator and the internals involved. We will come to know how the Observables and Observers get together to transform an Observable that emits a sequence of items into an Observable that emits a single value representing the count of items emitted by the source Observable.
Count
ObservableCountSingle
represents the Observable count. Since we cannot provide the count until the source sequence completes, ObservableCountSingle
notifies the subscribers only on successful completion or when there is an error.
If the source Observable terminates with an error, Count will pass this error notification along without emitting an item first. If the source Observable terminates successfully then it will be provide the subscribers with the count of items emitted.
Single
We first create an Observable source.
String[] persons = new String[] {"Joe", "Jane", "John", "Phil"}; Observable.fromArray(persons)
Next, we apply count()
operator on the source Observable to get the count Observable which is not exactly an Observable but a variant of it called “Single” as it returns a single element response or an error notification in case of any error.
Observable.fromArray(items).count();
Sequence flow of ReactiveX Count
In order to get the count of items, we call blockingGet
on the single observable.
String[] persons = new String[] {"Joe", "Jane", "John", "Phil"}; long count = Observable.fromArray(persons).count().blockingGet();
Internally, we deal with two observers, one is CountObserver
which maintains the count of the emitted item by subscribing to onNext()
. The second observer is BlockingMultiObserver
which is nothing but an extension of CountDownLatch
and awaits till all elements are emitted or an error signal is notified.
In the below diagram we see the sequence of methods called when blockingGet()
is called.
In the below example, we create an Observable array consisting of person names. We then get a count of it. We also filter the person names starting with “J” and then apply count on the final sequence of items.
ObservableFromArrayCountExample:
package com.javarticles.rx;
import io.reactivex.Observable;
public class ObservableFromArrayCountExample {
public static void main(String[] args) {
String[] persons = new String[] {"Joe", "Jane", "John", "Phil"};
long count = Observable.fromArray(persons)
.count().blockingGet();
System.out.println("Total number of persons: " + count);
count = Observable.fromArray(persons).filter((person)->person.startsWith("J")).count().blockingGet();
System.out.println("Total number of persons whose name starts with J: " + count);
System.out.println(Observable.fromArray(persons).count().filter((c)->c > 2).blockingGet());
System.out.println(Observable.fromArray(persons).count().filter((c)->c > 5).blockingGet());
}
}
Output:
Total number of persons: 4 Total number of persons whose name starts with J: 3 4 null
Download the source code
This was an example about RxJava Observable count operator.