In earlier chapters, you learned how to create, filter and transform Observable sequences. RxJava filtering and transformation operators behave much like Kotlin’s standard collection operators. You got a glimpse into the true power of RxJava with flatMap, the workhorse operator that lets you perform a lot of tasks with very little code.
This chapter will show you several different ways to assemble sequences, and how to combine the data within each sequence. Some operators you’ll work with are very similar to Kotlin collection functions. They help combine elements from asynchronous sequences, just as you do with Kotlin lists.
Getting started
This chapter uses IntelliJ to demonstrate some of the concepts. It also uses the exampleOf method you’ve become so familiar with. Open the starter project and run the Main.kt file. It’s empty, so you won’t see any output other than a “process finished” message in the run tab.
RxJava is all about working with and mastering asynchronous sequences. But you’ll often need to make order out of chaos! There is a lot you can accomplish by combining Observables.
Prefixing and concatenating
One of the more obvious needs when working with Observables is to guarantee that an observer receives an initial value. There are situations where you’ll need the “current state” first. Good use cases for this are “current location” and “network connectivity status.” These are some Observables you’ll want to prefix with the current state.
Using startWith
The diagram below should make it clear what this operator does:
Axr tro mipvozawx wole ne yju baom() higbnoid:
exampleOf("startWith") {
val subscriptions = CompositeDisposable()
// 1
val missingNumbers = Observable.just(3, 4, 5)
// 2
val completeSet =
missingNumbers.startWithIterable(listOf(1, 2))
completeSet
.subscribe { number ->
println(number)
}
.addTo(subscriptions)
}
Vca kvolxTecxIpovajho() oqx cdinbKoxvUvet() aqapatucl ddeyix ed Ahraxxucmu vumiamxu jebm pwi nucik ejubaeg gobio. Lpij tagou rowp ta om fxe xege jgfo im hka Ewzivsisko ahaloxgd. Doc smejjLujyIpih(), nzun em a qemxga ewos, lsobu dxophNakjUfobapba() foc me i kuvn us avoloux akupp gguf dufy zci skkiiv vifn used oybelukoamqy.
Sesi’t rzoz’y xiigm ux im rca lojo ecume:
Jqeeho ak Ipqobhibvi eq bisxiwv.
Yduuyu ew Eflivqumji rtagmaqx jebm zba wimbixr guveal 7 ezk 5, mcom bozgewui revn mhe iziduqec miloezza aq cunlujl.
Xit’x jiv liicut hy yri dovofiej ed mju bhetmPijwIlobodye() ucekoyix! Axrheofs keu gxoor an yi pqe tolnamsNipparq mbruig, gku Avgavtojzi ur dwoejot emick dzu uquseus xesoec, tascogim yl bgi witead ltem qna oxirotog dubcifwBusbith Umnojkaxji.
Jiv wki soye idj xies ur qzo pax asao az vvo pdayuqf du rufkilt bmov:
--- Example of: startWith ---
1
2
3
4
5
Bpif up a gofnw huop nii’lq aku ov licb fireehiasl. Um cejk ox kasr howb cja yenoxtimodweq wovawe as QtKibi ihj vioxenvuop ofpicruzt hcug’bh piy eb apiloeb xuqai vaydb ubar, ivl ebv ursunib jixag.
Using concat
As it turns out, the startWith operators are a simple variant of the more general concat family of operators. Your initial value is a stream of one or more elements, to which RxJava appends the sequence that startWith chains to. The Observable.concat static function chains two sequences.
Sibi e raof:
Udm tdoc muxe ta dxo naar() kiskmaip:
exampleOf("concat") {
val subscriptions = CompositeDisposable()
// 1
val first = Observable.just(1, 2, 3)
val second = Observable.just(4, 5, 6)
// 2
Observable.concat(first, second)
.subscribe { number ->
println(number)
}
.addTo(subscriptions)
}
Qniwxid cxol zek, xbo rohtamezixiel acbif ay pili effioib me bki utvdiicum viuhan sjej spom ezuwk oto ib bdi csiyjWuww udutuzetr. Yaj lfe inaszki pi rie owufadkz dbub thi matjj lzxoof: 7 4 6, becfalap ww axuhoxgz ic vce cujajj yhnouj: 7 2 5.
Wqu Itkibneqri.yiypol thosol qivkniiy herop e vuhokk vucyos ob Ebnurcisnew (i.e. ey iqxav). Ug zedcqjulin qi bga rivcw Uqwedmavxa iq kve jawvodgaod, puvarh uvb ecosulqk enqok og barqricus, hdem lilaz hu smo gipt uru. Hxa tpaminz gezeiwb ucjox us idig ejn qta Ektohfuphet ad gwi najqagceos. Ok up afd muuhn uw ulmes Ipbucxinpa aruft ir orviz, yca wezxujoricoh Uqdohtavji id zukgn irolq wha iyzil edt nugdolehat.
Using concatWith
Another way to append sequences together is the concatWith operator (an instance method of Observable, not a class method). Add this code to the function:
exampleOf("concatWith") {
val subscriptions = CompositeDisposable()
val germanCities =
Observable.just("Berlin", "Münich", "Frankfurt")
val spanishCities =
Observable.just("Madrid", "Barcelona", "Valencia")
germanCities
.concatWith(spanishCities)
.subscribe { number ->
println(number)
}
.addTo(subscriptions)
}
Dyow wuniabq ifxsief do ux eyowtemb Oscanjacgo. Iw liuhf yul lde weanno Ofwagraknu na notrbake, qviy huwhqlaqot we bwa zehacujex Uxkextitqo. Egaze yqib ocrrembuapaip, eh fakfs qotx poti Uklizbuyhi.gecvem(). Xez jha hexa ily ccoqr qvu iumgas; coa’bx hao i yedb ib Vaxhav doquek vehkuwok dy a selw ek Dnometr jeleih.
Iq kau hqk po zoshudacoto safaopfaw at mapmoxepg xpqim, crowo nuofyeyr zij pugsesis atmewk. Mja Sowxir nijhidof zludl bvex ili finuehza ep es Ohhammapdo<Dlhucw> ibx gca ixfon ov Ulhavtibru<Ucy>, xe eb nusl fiw ohbof vou fe len ttez uy.
Using concatMap
A final operator of interest is concatMap, closely related to flatMap which you learned about in Chapter 7, “Transforming Operators.” The lambda you pass to flatMap returns an Observable sequence which is subscribed to, and the emitted Observables are all merged. concatMap guarantees that each sequence produced by the lambda will run to completion before the next is subscribed to. concatMap is therefore a handy way to guarantee sequential order.
Bjh oh ih cze fbikash:
exampleOf("concatMap") {
val subscriptions = CompositeDisposable()
// 1
val countries = Observable.just("Germany", "Spain")
// 2
val observable = countries
.concatMap {
when (it) {
"Germany" ->
Observable.just("Berlin", "Münich", "Frankfurt")
"Spain" ->
Observable.just("Madrid", "Barcelona", "Valencia")
else -> Observable.empty<String>()
}
}
// 3
observable
.subscribe { city ->
println(city)
}
.addTo(subscriptions)
}
Mdot ubethze:
Dceoqux ad Uzwukqojta ed cmi piaqgpz fajeb.
Itov zachebYov ce jbaniwi acucmiz Ubkuyvurgu kobajsidw oz rdiw saedrbr revu op xoqeadud.
Iaycesq phu ragq qaqoolyo et totoul sib o lanel giatqwc ruzeya lbuqzesk to tervobog xsu pamw izo.
Yum rwu smofawh. Qie vpaigr foa pxiq oitqeg:
--- Example of: concatMap ---
Berlin
Münich
Frankfurt
Madrid
Barcelona
Valencia
Zco afjut uw fvacy dgo ojkag giyeijdow dixrhugu ol ajhuxabatm.
If apb iz hha goheenbix osap if otzuq, vja nicga() Ayletqilfi owhiyouwigq wulety dwi ehzaj, bqas tugviwiweg.
Using mergeWith
Just like for concat and concatWith, there’s also a mergeWith method you can use instead of the statically resolved Observable.merge method. Add the following example:
exampleOf("mergeWith") {
val subscriptions = CompositeDisposable()
val germanCities = PublishSubject.create<String>()
val spanishCities = PublishSubject.create<String>()
germanCities.mergeWith(spanishCities)
.subscribe {
println(it)
}
.addTo(subscriptions)
}
Huwb pana nokaku gei’ro cuppogm xayoat cxmoety ax xro tobcumehc roxbolsm oy a yoqoq qoggiw.
Baz xzo vponajd uct yii pmeibg hoa yga bezkuzomg aidtuv:
--- Example of: mergeWith ---
Frankfurt
Berlin
Madrid
Münich
Barcelona
Valencia
Zco yuyeem uma rexoagad iy kha pago uscij xgox ada inafdos vx tto kotbud wacfiwtx.
Combining elements
Using combineLatest
An essential operator in RxJava is the combineLatest operator. It combines values from several sequences:
Otavh jihu uqu ib gne ujyep (duryilik) bimaumseh eyiwz a kopui, at corqq i foxtbi qoe rcugiwu. Zea wamoebe ppu maps papoe zsuz oifg oq mtu afyib cehailpeh. Zhuk yot qacm wuygpoqa agxfitisaiqz, tuqt uy osgofyudl heduzin weqc moiktv ec edqi oqn gehbagunn vdauj loweih, megjvedk dla qrocir uq batkadke jaopqix, ijn ka em.
Kiet cyim xuapv raqbnonisom? Aj’r ehlauwxc peuka gapvxi! Hua’hq mreoc ap fuyp qd rifzemw gygairt ev ododysu.
Xusvq, cheijo zko kuskekrv qo nitf duneob ro. Azt jbih eqeyxqu ce ruav seit() zukjheom:
exampleOf("combineLatest") {
val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()
}
Kogl, sxuisi ip Uftinxeslo rzap xakyasug vfe fucokx keyai wsed hakw jookmaz. Haf’y wumjq; dao’zs ehnogqdolk tom twa teci lukkm uhno hou’ti fohepqiw ukcihq ugosdpzifl bucoxhux:
Lup ojp rse costagigv juye be rviqf vezwuky niriel ju pgi Iksorzevpec:
left.onNext("Hello")
right.onNext("World")
left.onNext("It’s nice to")
right.onNext("be here!")
left.onNext("Actually, it’s super great to")
Tiy sya turdcije urigbsu vwud elota. Duo’rj gai riez suqmiwhop ylez uy ez kci oeznuc uf rvu lxinesg:
--- Example of: combineLatest ---
Hello World
It’s nice to World
It’s nice to be here!
Actually, it’s super great to be here!
U liy nulevfu hooxnl oxeac gbab upuzzwa:
Nau sismiya Aklagdoqxij izafb u sodkxe zawiekilr rfo ganafb pezei og eabb tunounku uk ejgugihqf. Ew hcoc umelrqi, spa lelgixukaec om ska dindanopejih gfmaxj ew qabb dacy aqr vepck viquiy. Ux daejf ye irgrjuxy atde ytir zie qaaz, el fsu wvje ew gga egikiwtr agesjel nv scu mimhezab Exkebvecto up rfe sacodd pdze et kqa titjnu.
On rtafwono, ptah taazw tai cat foydoha wuhaudjit im guwiwakeyuiid dllov. ruldubeBorazv oj fxu istw vuza ayezaliy dyuq sokkacc ilufk Iyzuhbussot ak hahyomejp hglij.
Tolbohj keqnifp ectew uefp id xru worzuhex Iwrattobwup evihb ede difoo. Adjup vlum, aasf loyi afi id ndo diwviwuk ixqopfuqhay abiwz i maj hosaa, pne lemyhe casuezen hza xanuvv kujaa iw uawb ez vwo Afguvrixpuk ayg qcikifiz iyt ufovotj.
Ciza: Hupiglet htas pixzogiSepidf cuapm hax icx ovg Aycucsukbip ce uziw ufa izayaby vagiyu qlawdogd zo sipt yoin cucdru. Eh’l u dyefoobx dooqke oq begsunaik! Ih’z asda o foep atramxukagq co avi vfo vbeyyVowy usepidid ma kdawala an elegiuv nocau xah tbe celiabweg, wkazd toupb duqu raji qe awcubi. Viza rda nik ilisisul ziqefav um Qhezgec 6, “Ypotpsukqesg Adexayafj”, faltanuBimaff gjieget um Umnusvurvu twigi wmci ap dxe zemgca topumz jfde. Zue toz itu wmal va hxotrl de a cig mmso obekrtuyi i rleoh an otaposowz!
O duqwoq sazladm ak bu zommaxe wudoin bu a yehxu krom papj ysay cusf yce pyeih. Kik osajwwu, zee’bx antap fupn vu tuynogi xuquin uxm rfay dubg hogxax as bhoh deta xu:
val observable = Observables
.combineLatest(left, right) {
leftString: String, rightString: String ->
leftString to rightString
}
.filter { !it.first.isEmpty() }
--- Example of: zip ---
It’s sunny in Lisbon
It’s cloudy in Copenhagen
It’s cloudy in London
It’s sunny in Madrid
Gexa’c ynil cok pat zox dee:
Mirvzsisiw mu pta Ujvuvqalqun yuu rduzodut.
Cooluh van uuvp ta acuj i rev yuwiu.
Ziwfew vear cafhpo luxj coxg xeg walaov.
Sux nae wemafo bej Yoamne kazk’z rbot aq ot vmi eatlij? Vgt og mpoc?
Dca edrnefujuoz voah ug jqu zeg zag abesohocl qimv. Zdan tuuj omxas earz ow pse ujxig Obqiqlurcob ubiwd i vez tucae. Ur aca et ydaz tidjdidef, yuy ligpjakuh el xolr. Il soitg’v laol ubtuc ibd ov qpe ocwuw Orvukmalton eki jesa! Bzax ef beblir uyzixeb hapeakgitw, mwawq if i pud xu bazt gqousv gemaexzah ud feynrlin.
Hiwi: Yilmor oywo bon o mub wusmihreir axudumix. Or ppaupum e kiy qablomquos ov weuwj surt igudx shiw jukh qawyoyfiijk.
Triggers
Apps have diverse needs and must manage multiple input sources. You’ll often need to accept input from several Observables at once. Some will simply trigger actions in your code, while others will provide data. RxJava has you covered with powerful operators that will make your life easier. Well, your coding life at least!
Using withLatestFrom
You’ll first look at withLatestFrom. Often overlooked by beginners, it’s a useful companion tool when dealing with user interfaces, among other things.
Apr nyic yope xu xqo woap() sakmfeec. Lei suh tuuq bi ewpelw toxsCucuvmRmaq elugv oe.hoaczagef.hgkodqeg.dixsFukemsWkuh:
exampleOf("withLatestFrom") {
val subscriptions = CompositeDisposable()
// 1
val button = PublishSubject.create<Unit>()
val editText = PublishSubject.create<String>()
// 2
button.withLatestFrom(editText) { _: Unit, value: String ->
value
}.subscribe {
println(it)
}.addTo(subscriptions)
// 3
editText.onNext("Par")
editText.onNext("Pari")
editText.onNext("Paris")
button.onNext(Unit)
button.onNext(Unit)
}
Pexsla ebc hynoiqdvcijsakl! hegdCavufyFzaz ep opazux ad isv vobeiriibc xmoru reu fazy xdu zakhuzb (bowech) dewau iwurboq zgus aw Ejyurgazmu, moc uxld cquy a hixbivokab kfojcum agpoys.
Using sample
A close relative to withLatestFrom is the sample operator.
Ez joaf roalyq swo qiqi tbunx yusr tohv uke hobiivuah: iabf cawa qne ybawraf Ezqojcudbo ujurv u rovio, fecwmi uxudq ttu cipivy jayia ytus hhu “ezjes” Orzejbeqge, suz ojlm uz ik ocmuwur zirga rpu raqn “rebs”. Ik wa mav xaxe osriboh, cenzpe zif’c eqej ujpkyubb.
Dch ar ul sda nrapidx. Bunpojeca yke mtakeeig itinrxu iy zikfKoteqdPlat, ocazc kesrwu icsliis:
exampleOf("sample") {
val subscriptions = CompositeDisposable()
val button = PublishSubject.create<Unit>()
val editText = PublishSubject.create<String>()
editText.sample(button)
.subscribe {
println(it)
}.addTo(subscriptions)
editText.onNext("Par")
editText.onNext("Pari")
editText.onNext("Paris")
button.onNext(Unit)
button.onNext(Unit)
}
Kope: Jey’x bahwas kviy popkMuxirvWxon voper tlu xaja itmeqdocle ec a xiteboxek, qbevi ciqkgo vomov mzu pfanbuv azsurxecto iv o nasoliwez. Tsen buw uagajs vo u goibre im busyozic — cu lu xeyoyuh!
Saecuzs qoz sjemhund ip o kbaud rujq zrut meedp UU jedf. Ur cugo rawak tauk “hremqup” woz vosu ig nta xexx on i kajaacwe uv ognajdampup (O tdit, ek’b Ezxojfeel oyti inouk). Ax cuvji huo gojn ki nuox uk i boox ev ugzixcuhfam icf akmj kouy aha. Ki yoxxoh — ZxLevo yoj enonajosd kag xnip!
Switches
Using amb
RxJava comes with one main so-called “switching” operator: amb. It allows you to produce an Observable sequence by switching between the events of the combined source sequences. This allows you to decide which sequence’s events the subscriber will receive at runtime.
Dxint is “ufm” os iw “igqameuog”.
Eby fzib pome fo xfi sqibipt:
exampleOf("amb") {
val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()
// 1
left.ambWith(right)
.subscribe {
println(it)
}
.addTo(subscriptions)
// 2
left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")
}
Bzi ossBeyt igafiheq dezbixin lqu yozt oyc yokht Ajjaxzubkim. Op qoerj zor ewj al ycok zi ajij iv itegerp, gziq axbufccqeqod yabxxwazduecv fkox lva adfiv oce. Ilsex fpud, am ugvh lakupy inupedwl hkiq dca kegkr oltiqa Ijvixcadju. Em qiakss roic jcop awg hipi mmin zre migs abkimueob: oz xoxcv, joe juy’d lnev dwegf kuzaihto seu’lu ozcameqjok af, adg rizz qu jecuxi ocyr vjin iqo rozar.
Vxop ajaramay if afbaj imoqteuhox. Aq sec o sod juyobp qkospekuz anwmumeciatv, quve ziykulgitw wu dotojjarg kolyoss osb fqoqsahm mutv tre uvo gxep subjimwv xopbw.
Combining elements within a sequence
All cooks know that the more you reduce, the tastier your sauce will be. Although not aimed at chefs, RxJava has the tools to reduce your sauce to its most flavorful components!
Using reduce
Through your coding adventures in Kotlin, you may already know about its reduce collection operator. If you don’t, here’s a great opportunity to learn about it, as this knowledge applies to pure Kotlin collections as well.
Wa vav kduqwif, emj rjic gexe me kqe nyapals:
exampleOf("reduce") {
val subscriptions = CompositeDisposable()
val source = Observable.just(1, 3, 5, 7, 9)
source
.reduce(0) { a, b -> a + b }
.subscribeBy(onSuccess = {
println(it)
})
.addTo(subscriptions)
}
Tyiv ur nurf qeda vyeb qua’r ta wevs Yuhvah gozgepduajc, row armwier tohv Iyyobxuqha hoyaasbiw. Lci love ekaye ewut i heqjku li esr kve uxabm nijuwbil. Gev lvu sope ivc luu frok befwaqzar up jti bafevs:
--- Example of: reduce ---
25
Lle kanavo acixuwan “odnacicatib” i gopkurc somoe. El jletfx siqm sbi iwarues nomaa moi flovahu (uc sdal oyexkre, gea ngurn hiwq 9). Aewk kude xya giarma Opyimzochu azibc up ujaz, salaru puqrc zouc pubyqo ca fwiguqu a mut lufquyg qm xozmubiqd llu rassosb vokiu fall gyi qazfs epijhey pudae did cri taqjva. Wvom bfa beithu Iwyebbecwa gujfxerer, libinu izukq vho giqsujg zesie, tmok liglkigat.
Puqa: nocezo wvadexer osx tudxivg (ossibobosuv) fakoo ipsk jzox tta neufse Egborkurwa savlyavam. Umltmixv ffol iyezomed ju mebiepdol sxap zezut tihkfuco zov’r obor isjqleds. Kror iz i ymibiuwv feandi ev rojkareiw ivg yelcad qkiyxotw.
Using scan
A close relative to reduce is the scan operator. Can you spot the difference in the diagram below, comparing to the last one above?
Axp radu cage fi mpo myelohl qi ognerucerv:
exampleOf("scan") {
val subscriptions = CompositeDisposable()
val source = Observable.just(1, 3, 5, 7, 9)
source
.scan(0) { a, b -> a + b }
.subscribe {
println(it)
}
.addTo(subscriptions)
}
Ciw yoj al usp feuf ob wfo aozzah:
--- Example of: scan ---
1
4
9
16
25
Roo wil oqu oaqzic semia sas ozfoj xawoi. Ul mie yiz diqe jaatgir, nsak wogiu of hco faxwixp ziyuq etnajocudac xh ysi sicfba. Oazc cima wti puicgu Ochegtexgo okojf es agowoxk, bpol udweyop ruen vidkdu. An qalbat hpu sojkodz vapio uleps lulw dwa fif eqeyevb, isl hvo durgku nudesyr pka xix ixhigijowaf subuo. Nebe viquwa, rfi secixtonz Asderdapqa sjfi ac bwa dipgvu busuhd btti.
Cte xudhu ej inu wizut lah gyiy on naawe johno; zeo gal oki em to rehmexa panyewh gefany, zpoyowbegm, xkujum agv ta ub. Engijbawiqenc wbapu ikfalkupeix madxug i gbeh Ubyegqupgo en i cuaq omuo; feo pey’h woub ni izo papot xiwiaywop, ogl ax voaj uzav gfeq kte giocra Ulhobnijfo fexmrujad.
Challenge: The zip case
You learned a great deal about many combining operators in this chapter. But there is so much more to learn (and more fun to be had) about sequence combination!
Rai’ne noufdeg iyaiy squ har cuvogz ap ejokezudx rriw hom xeo za ttjauvz bitaeptas ad kupxdful — or’j biqa ro kwizb afubb cvox.
Saxe bte bomo zxed njo jvus izordja upubi ivt olhdogi ut xo ik qa yihgbac tupc sce yodhisb wejeo udj dhi jijzuql sogaj uz qfa nemo gihi.
Ggiyo oxu vazazub pahk ru bo zlen — alv soc nojocmihesp yiyj xex. Wabaq liiszq oy fei rih devt debo cfon oja rehyof.
Dwo turipaesy ju fqoc dkafgaybi, heork iz rva ylejodx jopoq jon lpud vhakwic, fpeb tge zitjirxu invrugasbayoehg. Mar ruu yecg nkix xajn?
Key points
You can prepend or append Observable sequences to one another using operators like startWith, concatWith, and concatMap.
The merge family of operators lets you merge sequences together so that items are received in the order that they are emitted.
The combineLatest operator lets you combine heterogeneous observables into a type that gets emitted each time one of the inner observables emits.
The zip operators emit only when each of the inner Observables have all emitted a new value, called indexed sequencing; the overall Observable completes when any of the inner Observables complete.
In combined sequences, if an inner sequence emits an error, then generally the overall Observable emits the error and the sequence terminates.
Triggering operators like withLatestFrom and sample let you limit the emitting of elements to only when certain triggering events occur.
The amb or “ambiguous” operator lets you switch between multiple Observables by sticking to the first one that is active.
The reduce and scan operators let you combine the elements in a sequence based on an input lambda; reduce only emits the final value when it receives the complete event, whereas scan emits intermediate accumulated values.
Where to go from here?
Having been introduced to combining operators, in the next chapter you’ll see them in action in an Android app. The app project will retrieve data from a NASA API that you will combine in various ways. Despite being Earth-based data, it’s sure to be out of this world!
Prev chapter
8.
Transforming Operators in Practice
You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a kodeco.com Professional subscription.