RxJava Combining Operators
In this tutorial, you’ll use RxJava combining operators to merge, filter and transform your data into succinct and reusable streams. By Prashant Barahi.
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Sign up/Sign in
With a free Kodeco account you can download source code, track your progress, bookmark, personalise your learner profile and more!
Create accountAlready a member of Kodeco? Sign in
Contents
RxJava Combining Operators
25 mins
- Getting Started
- What Are RxJava Combining Operators?
- Using the amb Operator to Load the Fastest Source
- Using the zip Operator to Load All Sources At Once
- Using the merge Operator to Load Data ASAP
- Using the startWith Operator to Emit an Item Immediately
- Using the concat Operator
- Using the combineLatest Operator to Correct the UI Behavior
- Using the mergeDelayError Operator
- Assessing the Switch Operator
- Assessing the join Operator
- Assessing the groupJoin Operator
- Where to Go From Here?
Assessing the Switch Operator
switchOnNext subscribes to an Observable that emits Observables. It unsubscribes from the previously emitted source when a new Observable is emitted from the source, and it starts emitting items from the new source instead. Any emissions from the previous Observable are dropped.
Open PlaceListViewModel.kt and put the below snippet inside switchOnNext():
disposeCurrentlyRunningStreams()
// 1
val outerSource = Observable.interval(3, TimeUnit.SECONDS)
.doOnNext {
Log.i(LOG_TAG, "Emitted by OuterSource: $it")
}
// 2
val innerSource = Observable.interval(1, TimeUnit.SECONDS)
.doOnSubscribe {
Log.i(LOG_TAG, "Starting InnerSource")
}
// 3
Observable.switchOnNext(
outerSource.map { return@map innerSource }
)
.doOnNext {
Log.i(LOG_TAG, "Relayed items $it")
}
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.subscribe().addTo(disposable)
Here’s what the code above does. It:
- Creates a source,
outerSource, that emits an item every three seconds - Creates another source,
innerSource, that emits an item every second - Uses
switchOnNext(), which causes every item emitted by the outer source (i.e.onNext) to create the inner source. When the outerObservableemits another item, the inner one gets discarded and a new one is created, which is then used to emit items.For every item emitted by the outerObservable(i.e. every three seconds), the inner observable manages to emit three items (one item per second). The last item isn’t relayed downstream, as the outer source has already emitted an item (similar to in the marble diagram above).
Build and run, and tap Switch On Next located inside Demo. Note that every feature inside Demo is printed in Logcat and isn’t visible in TouRx’s UI. So open up Logcat to assess the printed logs and input –> to filter out the noise. You’ll get something like this:
2020-08-21 22:02:18.964 I/-->: Emitted by OuterSource: 0 2020-08-21 22:02:18.964 I/-->: Starting InnerSource 2020-08-21 22:02:19.965 I/-->: Relayed items 0 2020-08-21 22:02:20.964 I/-->: Relayed items 1 2020-08-21 22:02:21.964 I/-->: Emitted by OuterSource: 1 2020-08-21 22:02:21.964 I/-->: Starting InnerSource 2020-08-21 22:02:22.964 I/-->: Relayed items 0 2020-08-21 22:02:23.964 I/-->: Relayed items 1 2020-08-21 22:02:24.964 I/-->: Emitted by OuterSource: 2 2020-08-21 22:02:24.964 I/-->: Starting InnerSource 2020-08-21 22:02:25.964 I/-->: Relayed items 0 2020-08-21 22:02:26.964 I/-->: Relayed items 1 2020-08-21 22:02:27.964 I/-->: Emitted by OuterSource: 3
Next, you’ll learn about the join operator.
Assessing the join Operator
The join operator combines the items emitted by two sources whenever an item emitted by one falls under the duration window. In other words, it selects which items to combine based on overlaps between the streams. The windows are implemented as Observables whose lifespans begin with each item emitted by either Observable and end when the window-defining Observable completes emiting. As long as the item’s window is open, it can combine with any item emitted by the other source.
To see the join operator in action, open PlaceListViewModel.kt, and inside demonstrateJoinBehavior(), paste the following:
disposeCurrentlyRunningStreams()
// 1
val firstObservable = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map {
return@map "SOURCE-1 $it"
}
// 2
val secondObservable = Observable.interval(3000, TimeUnit.MILLISECONDS)
.map { return@map "SOURCE-2 $it" }
// 3
val firstWindow = Function<String, Observable<Long>> {
Observable.timer(0, TimeUnit.SECONDS)
}
val secondWindow = Function<String, Observable<Long>> {
Observable.timer(0, TimeUnit.SECONDS)
}
// 4
val resultSelector = BiFunction<String, String, String> { t1, t2 ->
return@BiFunction "$t1, $t2"
}
//5
firstObservable.join(secondObservable, firstWindow, secondWindow, resultSelector)
.doOnNext {
Log.i(LOG_TAG, it)
}
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.subscribe().addTo(disposable)
Now, build and run. Tap Demo from top-right menu, and click Join. This will fire up demonstrateJoinBehavior() in PlaceListViewModel. Open the Logcat to assess the logs. Your logs will look similar to this:
2020-08-21 22:18:20.562 I/-->: SOURCE-1 4, SOURCE-2 1 2020-08-21 22:18:23.562 I/-->: SOURCE-1 7, SOURCE-2 2 2020-08-21 22:18:26.562 I/-->: SOURCE-1 10, SOURCE-2 3 2020-08-21 22:18:29.562 I/-->: SOURCE-1 13, SOURCE-2 4 2020-08-21 22:18:32.562 I/-->: SOURCE-1 16, SOURCE-2 5 2020-08-21 22:18:35.562 I/-->: SOURCE-1 19, SOURCE-2 6 2020-08-21 22:18:38.562 I/-->: SOURCE-1 22, SOURCE-2 7 2020-08-21 22:18:41.562 I/-->: SOURCE-1 25, SOURCE-2 8 2020-08-21 22:18:44.562 I/-->: SOURCE-1 28, SOURCE-2 9
It’s time to break down the reason for this log! The code above:
- Creates a source,
firstObservable, that emits items every second - Creates a second source,
secondObservable, that emits items every three seconds - Initializes two windows —
firstWindowandsecondWindow— which define the lifespan of the window forfirstObservableandsecondObservable, respectively - Declares a
resultSelectorthat couples the emitted items into aString - Uses
join()to perform a join operation on theObservables created in the first two steps. Since the window duration is zero-seconds wide, bothfirstWindowandsecondWindowmust emit at the same time in order for them to be coupled and relayed down. With the specified intervals of sources and the length of the window, the overlaps occur every three seconds.
Try experimenting with different window lengths to learn more about the join operator. Next, you’ll learn about a similar operator to join: groupJoin.
Assessing the groupJoin Operator
The groupJoin operator is similar to the join operator, except the argument that defines how the items should be combined — i.e. resultSelector — pairs individual items emitted from the left source with another source that holds all the values emitted within the window.
Time to see the GroupJoin operator in action! Open PlaceListViewModel, and in demonstrateGroupJoin(), place the following code:
disposeCurrentlyRunningStreams()
// 1
val leftSource = Observable.interval(1, TimeUnit.SECONDS)
.map { return@map "SOURCE-1 $it" }
val rightSource = Observable.interval(5, TimeUnit.SECONDS)
.map { return@map "SOURCE-2 $it" }
// 2
val leftWindow = Function<String, Observable<Long>> {
Observable.timer(0, TimeUnit.SECONDS)
}
val rightWindow = Function<String, Observable<Long>> {
Observable.timer(3, TimeUnit.SECONDS)
}
// 3
val resultSelector = BiFunction<String, Observable<String>, Observable<Pair<String, String>>> { t1, t2 ->
return@BiFunction t2.map {
return@map Pair(t1, it)
}
}
leftSource.groupJoin(rightSource, leftWindow, rightWindow, resultSelector)
.concatMap {
return@concatMap it
}
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single())
.doOnNext {
Log.i(LOG_TAG, it.toString())
}
.subscribe().addTo(disposable)
Finally, build and run. Click on the top-right menu and select Group Join inside Demo. Then, open the Logcat to see the printed logs:
2020-08-21 23:17:17.756 I/-->: (SOURCE-1 4, SOURCE-2 0) 2020-08-21 23:17:18.756 I/-->: (SOURCE-1 5, SOURCE-2 0) 2020-08-21 23:17:19.756 I/-->: (SOURCE-1 6, SOURCE-2 0) 2020-08-21 23:17:20.756 I/-->: (SOURCE-1 7, SOURCE-2 0) 2020-08-21 23:17:22.756 I/-->: (SOURCE-1 9, SOURCE-2 1) 2020-08-21 23:17:23.756 I/-->: (SOURCE-1 10, SOURCE-2 1) 2020-08-21 23:17:24.756 I/-->: (SOURCE-1 11, SOURCE-2 1) 2020-08-21 23:17:25.756 I/-->: (SOURCE-1 12, SOURCE-2 1) 2020-08-21 23:17:27.756 I/-->: (SOURCE-1 14, SOURCE-2 2) 2020-08-21 23:17:28.756 I/-->: (SOURCE-1 15, SOURCE-2 2) 2020-08-21 23:17:29.756 I/-->: (SOURCE-1 16, SOURCE-2 2) 2020-08-21 23:17:30.756 I/-->: (SOURCE-1 17, SOURCE-2 2) 2020-08-21 23:17:32.756 I/-->: (SOURCE-1 19, SOURCE-2 3) 2020-08-21 23:17:33.756 I/-->: (SOURCE-1 20, SOURCE-2 3) 2020-08-21 23:17:34.756 I/-->: (SOURCE-1 21, SOURCE-2 3) 2020-08-21 23:17:35.756 I/-->: (SOURCE-1 22, SOURCE-2 3)
Here’s a breakdown of what the code above is doing:
-
leftSourceemits an item every second, whereasrightSourceemits an item every five seconds. -
leftWindowandrightWindoware windows forleftSourceandrightSource, respectively. Note the difference in the lifespan of these windows. - The signature of
resultSelectoris an important distinction between theGroupJoinand the Join operators. SincerightWindowhas a lifespan of three seconds andleftSourceemits every second, the second argument in the lambda collects all the emissions in that three-second window while mapping each into aPairbefore sending it downstream.
Experiment with varying window sizes to get more familiar with the GroupJoin operator.


