Reactive Streams on Kotlin: SharedFlow and StateFlow

In this tutorial, you’ll learn about reactive streams in Kotlin and build an app using two types of streams: SharedFlow and StateFlow. By Ricardo Costeira.

5 (20) · 3 Reviews

Download materials
Save for later
Share
You are currently viewing page 2 of 4 of this article. Click here to view the first page.

Replay and Buffering

MutableSharedFlow() accepts three parameters:

public fun <T> MutableSharedFlow(
  replay: Int = 0, // 1
  extraBufferCapacity: Int = 0, // 2
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND // 3
): MutableSharedFlow<T>

Here’s what they’re used for:

  1. replay: The number of values replayed to new subscribers. It can’t be negative and it defaults to zero.
  2. extraBufferCapacity: The number of values buffered. It can’t be negative and it defaults to zero. The sum of this value plus replay comprises the total buffer of the shared flow.
  3. onBufferOverflow: Action to take when buffer overflow is reached. It can have three values: BufferOverflow.SUSPEND, BufferOverflow.DROP_OLDEST or BufferOverflow.DROP_LATEST. It defaults to BufferOverflow.SUSPEND.

Default Behavior

This can get quite tricky to understand, so here’s a short animation of a possible interaction with a shared flow built with the default values. Assume the shared flow uses emit(value: T).

SharedFlow with default constructor parameters

Going step by step:

  1. This shared flow has three events and two subscribers. The first event is emitted when there are no subscribers yet, so it gets lost forever.
  2. By the time the shared flow emits the second event, it already has one subscriber, which gets said event.
  3. Before reaching the third event, another subscriber appears, but the first one gets suspended and remains like that until reaching the event. This means emit() won’t be able to deliver the third event to that subscriber. When this happens, the shared flow has two options: It either buffers the event and emits it to the suspended subscriber when it resumes or it reaches buffer overflow if there’s not enough buffer left for the event.
  4. In this case, there’s a total buffer of zero — replay + extraBufferCapacity. In other words, buffer overflow. Because onBufferOverflow is set with BufferOverflow.SUSPEND, the flow will suspend until it can deliver the event to all subscribers.
  5. When the subscriber resumes, so does the stream, delivering the event to all subscribers and carrying on its work.
Note: The shared flow specification forbids you from using anything other than onBufferOverflow = BufferOverflow.SUSPEND when the total buffer value amounts to zero. Because tryEmit(value: T) doesn’t suspend, it won’t work if you use it with the default replay and extraBufferCapacity values. In other words, the only way to emit events with tryEmit(value: T) is by having, at least, a total buffer of one.

With Replay

OK, that wasn’t so bad. What happens if there’s a buffer, though? Here’s an example with replay = 1:

SharedFlow with replay = 1

Breaking it down:

  1. When the shared flow reaches the first event without any active subscribers, it doesn’t suspend anymore. With replay = 1, there’s now a total buffer size of one. As such, the flow buffers the first event and keeps going.
  2. When it reaches the second event, there’s no more room in the buffer, so it suspends.
  3. The flow remains suspended until the subscriber resumes. As soon as it does, it gets the buffered first event, along with the latest second event. The shared flow resumes, and the first event disappears forever because the second one now takes its place in the replay cache.
  4. Before reaching the third event, a new subscriber appears. Due to replay, it also gets a copy of the latest event.
  5. When the flow finally reaches the third event, both subscribers get a copy of it.
  6. The shared flow buffers this third event while discarding the previous one. Later, when a third subscriber shows up, it also gets a copy of the third event.

With extraBufferCapacity and onBufferOverflow

The process is similar with extraBufferCapacity, but without the replay-like behavior. This third example shows a shared flow with both extraBufferCapacity = 1 and onBufferOverflow = BufferOverflow.DROP_OLDEST:

SharedFlow with extraBufferCapacity = 1 and onBufferOverflow = DROP_LATEST

In this example:

  1. The behavior is the same at first: With a suspended subscriber and a total buffer size of one, the shared flow buffers the first event.
  2. The different behavior starts on the second event emission. With onBufferOverflow = BufferOverflow.DROP_OLDEST, the shared flow drops the first event, buffers the second one and carries on. Also, notice how the second subscriber does not get a copy of the buffered event: Remember, this shared flow has extraBufferCapacity = 1, but replay = 0.
  3. The flow eventually reaches the third event, which the active subscriber receives. The flow then buffers this event, dropping the previous one.
  4. Shortly after, the suspended subscriber resumes, triggering the shared flow to emit the buffered event to it and cleaning up the buffer.

Subscribing to Event Emissions

OK, good job getting this far! You now know how to create a shared flow and customize its behavior. There’s only one thing left to do, which is to subscribe to a shared flow.

In the code, go to the coinhistory package inside presentation and open CoinHistoryFragment.kt. At the top of the class, declare and initialize the shared ViewModel:

private val sharedViewModel: CoinsSharedViewModel by activityViewModels { CoinsSharedViewModelFactory }

You want the shared flow to emit no matter which screen you’re in, so you can’t bind this ViewModel to this specific Fragment. Instead, you want it bound to the Activity so it survives when you go from one Fragment to another. That’s why the code uses the by activityViewModels delegate. As for CoinsSharedViewModelFactory, don’t worry about it: Every ViewModel factory in the app is already prepared to properly inject any dependencies.

Collecting the SharedFlow

Now that you have the shared ViewModel, you can use it. Locate subscribeToSharedViewEffects(). Subscribe to the shared flow here by adding the following code:

viewLifecycleOwner.lifecycleScope.launchWhenStarted { // 1
  sharedViewModel.sharedViewEffects.collect { // 2
    when (it) {
      // 3
      is SharedViewEffects.PriceVariation -> notifyOfPriceVariation(it.variation)
    }
  }
}

This code has a few important details:

  1. The coroutine is scoped to the View instead of the Fragment. This ensures the coroutine is alive only while the View is alive, even if the Fragment outlives it. The code creates the coroutine with launchWhenStarted, instead of the most common launch. This way, the coroutine launches only when the lifecycle is at least in the STARTED state, suspends when it’s at least in the STOPPED state and gets canceled when the scope is destroyed. Using launch here can lead to potential crashes, as the coroutine will keep processing events even in the background.
  2. As you can see, subscribing to a shared flow is the same as subscribing to a regular flow. The code calls collect() on the SharedFlow to subscribe to new events.
  3. The subscriber reacts to the shared flow event.

Keep in mind at all times that even using launchWhenStarted, the shared flow will keep emitting events without subscribers. As such, you always need to consider the wasted resources. In this case, the event emission code is fairly harmless. But things can get heavy, especially if you turn cold flows into hot ones using something like shareIn.

Note: Turning cold flows into hot ones is out of scope for this tutorial — truth be told, it deserves a tutorial on its own. If you’re interested, check out the last section of the tutorial for references about the topic.