Home Flutter & Dart Tutorials

RxDart Tutorial for Flutter: Getting Started

Learn how to develop Flutter apps using the Reactive Programming paradigm with RxDart.

Version

  • Dart 2.17, Flutter 3.0, VS Code

Developing highly responsive, fault-tolerant, event-driven asynchronous applications that are scalable requires a different way of thinking from the traditional synchronous programming architecture.

This is why the reactive programming principle exists. It uses streams to build these types of reactive applications.

In this tutorial, you’ll use RxDart, a Dart implementation of the reactive programming principle, to develop ‘Gravity Pop’, a very simple game based off a very popular block falling videogame. In this game, blocks fall and disappear if a row is filled Along the way, you’ll learn:

  • RxDart streams.
  • Extension functions.
  • Subjects.
  • The concept of Backpressure.
Note: This tutorial assumes a basic knowledge of Dart and Flutter. You can learn these topics from our Getting Started With Flutter and Your First Flutter App: an App From Scratch tutorials.

Getting Started

Click the Download Materials button at the top or bottom of this tutorial to download the starter project. The starter project has a few models to represent the various states/events you’ll be manipulating using RxDart. These are:

  • GameState: An enum to store the game state: whether the game has started, is playing or is over.
  • GameData: A model that represents the current state of the game and all the collected tetriminos.
  • Input: A model representing change events that affect the current tetrimino in play. This includes xOffset, yOffset and angle.
  • Piece: An enum that represents the type of block from the available seven.
  • Tetrimino: represents the current block animating in the screen.
Note: A group of falling blocks is also known as a tetrimino.

Along with the models, here are some other things to note about the starter project:

  • The project contains utility functions for common use cases.
  • There are classes for the board, the player and a layer that only handles user interactions.
  • We paint each tetrimino using the custom painter class.

Now, get packages by running flutter pub get in your terminal/PowerShell. Then build and run to see the starter app:

Starter screen for app

Tap the Play button to enter play mode. You’ll see the screen below:
Gravity pop game start screen

You’ll notice that it only contains a white screen; there are no blocks, and the buttons don’t work. You’ll change that and make this into a simple game using Reactive streams in RxDart.

At the end, you will get to see a game like this:
The final game of Gravity Pop

Are you ready? Time to get stacking.

The Reactive Programming Paradigm

An event is a general term used to represent the press of a button or any of the many sensors in your device recording their data. Various components process events to produce something meaningful in the end, to make updates in the UI, for instance.

But this process is not a one-time deal. As long as events are produced, the process repeats itself. A “stream” in programming is this flow of any raw input event to a useful result:

A red dot moving between two big black circles

Above is a basic illustration of a stream showing relation between a source and a sink.

All streams have two required components: a source (for example, an event) and a sink or receiver (a logical or UI component that can consume that stream.) Reactive programming uses such streams as a backbone to build applications.

Flutter offers built-in support for streams using the Stream class. This class handles creating streams from various sources such as I/O updates, sensor data capture, UI event capture and so much more.

A sink can be any component that has the power to consume stream events. Moving forward, you’ll use StreamBuilder to consume streams and update UI.

For your first task, open lib/player.dart and replace //TODO: add a stream builder with the following code snippet:

  //1
child: StreamBuilder<Tetrimino>(
  //2
  //TODO: replace with staticPlayerStream
  stream: _engine.blankPlayerStream,
  //3
  builder: ((context, snapshot) {
    if (snapshot.hasData) {
      if (snapshot.data?.current == Piece.Empty)
        return const SizedBox.shrink();

      return ClipRect(
        child: CustomPaint(
          painter:
              _getNextPiece(snapshot.data!, _engine.extent.toDouble()),
          child: SizedBox(
            width: _engine.effectiveWidth.toDouble(),
            height: _engine.effectiveHeight.toDouble(),
          ),
        ),
      );
    }
    return const SizedBox.shrink();
  }),
),

Here’s what’s happening in the preceding code snippets:

  1. You used Flutter StreamBuilder as a sink to accept events from a stream, _engine.blankPlayerStream.
  2. _engine.blankPlayerStream creates a stream that returns an empty stream at the moment. You’ll expand on this as you get further into the tutorial.
  3. The snapshot variable references stream events and processes them. In this case, you use each event in a Text widget.

Build and run the app and you will see the same white board. But now the building blocks for streams are in place with the source(stream instances) and sink(StreamBuilder widget).

 Current state of the app with the stream schematic to the side At this point, it may seem trivial, but as you go on you will implement more features of the reactive programming paradigm using RxDart.

Creating RxDart Streams

Dart Stream API fulfils the basic requirements to follow the reactive programming style. RxDart offers various new features that increase its usefulness in real-world applications to enhance it more.

As an example of this, consider the Stream.fromIterable() constructor returned from gridStateStream in lib/engine.dart. As simple as it may be, you can improve it further by specifying the start and end of the iterable and turn each numbers in between into events.

The RangeStream stream from RxDart fulfills this exact usecase. Open lib/player.dart and replace TODO: replace with staticPlayerStream and the line beneath it with the following code:

//TODO: replace with animatingPlayerStream
stream: _engine.staticPlayerStream(),

Next, open lib/engine.dart and take a look at the implementation for staticPlayerStream:

return RangeStream(0, effectiveHeight ~/ extent - 1).map((value) =>
    Tetrimino(
        current: Piece.I, origin: Point(0, value * extent.toDouble())));

RangeStream emits numbers within its input range as events. Each event is “mapped” into a tetrimino object in the above code snippet.

Build and run the app and you will see that since RangeStream has completed emitting all its events, the block has come to rest at the bottom of the board which is represented by the final event emitted by RangeStream.
Current state of the app with stream schematic updated with new source

Along with RangeStream, RxDart also offers various other types of “pre-built” streams. You’ll see more of them as you continue with this tutorial.

Using RxDart Extensions

The map function at the end of the code above is an example of an extension function. Extension functions extend the operations you can apply on streams. You can use them when you need to manipulate an event before it reaches the sink.

For example, what if you only need odd events from a stream that emits integer values? Or, you need to create a gap between each event emitted in a stream.

That’s what map does in the previous code. It takes each integer and transforms it into a tetrimino. These functions have the power to transform the emitted events in any shape or form and can even create other streams from the events.

Note: A higher-order stream is a special stream that emits other streams.

Let’s make the previous example a bit more exciting using interval. The interval function emits events of the stream after the specified time interval. Open lib/player.dart and replace //TODO: replace with animatingPlayerStream and the code statement below it with the following:

//TODO: replace with animatingPlayerWithCompletionStream
stream: _engine.animatingPlayerStream(),

Check staticPlayerStream in lib/engine.dart to see how you used the interval extension.

return RangeStream(0, effectiveHeight ~/ extent - 1)
    .interval(const Duration(milliseconds: 500))
    .map((value) => Tetrimino(
        current: Piece.I, origin: Point(0, value * extent.toDouble())));

The only difference between this RangeStream and the previous stream is the addition of the interval extension that emits each event from its input stream in predefined intervals, 500 milliseconds in this case.

Build and run the app now. You will now see the block fall in intervals of 500 ms.

A stream schematic showing the effect of RxDart extensions

As you can see, you’re not limited to just one extension function between the source and a sink. You can chain multiple extension functions to manipulate the data between the source and the sink. Chaining various functions one after the other is an important part of reactive programming, used to achieve high degrees of data manipulation.

Stream Lifecycle Extensions

The interval function and map are just the beginning when it comes to extension functions in RxDart. Other useful examples include:

  • delay: Emits the stream events after a specified delay.
  • startWith/endWith: These extension functions allow you to add an event at the beginning or the end of a stream.
  • doOn_: These are extensions that specify various callbacks at different points of the stream lifecycle. For example, doOnData triggers on new events while doOnDone triggers on stream completion.

Speaking of doOnDone, you need to set the tetrimino into the grid once it can’t progress any further. In lib/player.dart, replace TODO: replace with animatingPlayerWithCompletionStream and the next statement with:

//TODO: replace with infiniteAnimatingPlayerWithCompletionStream
stream: _engine.animatingPlayerWithCompletionStream(),

A quick look at animatingPlayerWithCompletionStream in lib/engine.dart will tell you all need to know how doOnDone is being used to define the tetrimino’s behaviour when it is blocked.

return animatingPlayerStream()
    .takeWhile((_transformedPiece) => _canTetriminoMove(_transformedPiece))
    .doOnDone(() {
  _onTetriminoSet(Tetrimino(
      current: Piece.I,
      yOffset: (effectiveHeight ~/ extent - 1).toDouble(),
      origin: const Point(0, 0)));
});

Here’s a breakdown of the code above:

  1. takeWhile is an example of a extension function available from the Dart Stream API, along with map. It forces the stream to complete when it encounters the first event that does not satisfy its predicate. When the stream completes, the doOnDone callback is triggered.
  2. _onTetriminoSet is a utility function you use to check if the stacked pieces have reached the top of the board. If not, you use _gameController to set the last tetrimino to the board.

Build and run the app. You will see that once the tetrimino reaches the bottom of the board, the internal game state is updated to indicate that the portion of the board that this tetrimino lands rests on is now ‘blocked’.
Gif showing a tetrimino being blocked by the board edge

You now have a portion of a basic game with a block animating from the top of the board to the end. Let’s see how you can add more features, like moving the tetriminos with user input and tetrimino creation, once the previous tetrimino reaches the end of the board.

Combining Streams

The RangeStream you’ve been using up to this point is only meant to move the tetrimino, and the game now has to generate another random one to continue. It also has to handle user input so you can control the trajectory of the tetrimino while it travels down the board.

Generating New Tetriminos

The game needs to generate a new tetrimino when the current one is blocked by the board’s bounds or another tetrimino. This new one will logically be the same as the previous; a RangeStream with its events mapped to new positions on the board.

This time, you need to create a new stream to listen to events representing new tetriminos and return a RangeStream that helps to animate it.

So, how can you create a new stream from the events of another? RxDart provides various options for this use case.

One of the options you’ll use is switchMap, it converts events of a stream to new streams using a mapper function. On each new tetrimino event of _playerController.stream, you can add or convert it to a RangeStream which you saw in action earlier.

In lib/player.dart, replace TODO: replace with infiniteAnimatingPlayerWithCompletionStream and the statement after it with the following code snippet:

//TODO: replace with infiniteAnimatingPlayerWithCompletionStreamWithInput
stream: _engine.infiniteAnimatingPlayerWithCompletionStream(),

Next, you’ll take a look at how this stream operates. Open lib/engine.dart and take a look at the implementation of infiniteAnimatingPlayerWithCompletionStream.

//1
return _playerController.stream.switchMap((value) {
  //2
  var _current = value;
  return RangeStream(0, effectiveHeight ~/ extent - 1)
      .interval(const Duration(milliseconds: 500))
      .map(
        (value) => Tetrimino(
          angle: 0.0,
          current: _current.current,
          origin: const Point(0, 0),
          yOffset: value.toDouble(),
          xOffset: 0,
        ),
      )
      .takeWhile(
        (_transformedPiece) => _canTetriminoMove(_transformedPiece),
      )
      //3
      .doOnData((_validTransformedPiece) {
    _current = _validTransformedPiece;
  }).doOnDone(() {
    _onTetriminoSet(_current);
  });
});

Here’s a quick breakdown of the preceeding snippet:

  1. switchMap is a extension function which is used in this case to convert the events of its input stream into new individual streams.
  2. _current is used to store a refrence to the current event so that it can be used in the doOnDone extension to be used for game logic. In particular, it is used to check if the whether or not the stacked tetriminos have reached the top of the board.
  3. Similar to doOnDone, doOnData is another extension function called when an event is consumed by a sink.

From this point on, you’ll now always see a randomized tetrimino. Hurray!

The main source of the generated tetrimino is the switchMap extension function. This function takes an event and maps it into a new stream. In this case, you create a new RangeStream every time the previous one reaches the end of its journey.

Build and run the app to see new tetriminos generated once the previous ones have been blocked:

generating new tetriminos using the second stream controller

Now that the game generates more tetriminos after each round, you’ll focus on user input.

Taking User Input

How do you factor in user input that makes games interesting? The answer is using StreamController from the Dart Stream API. StreamController allows you to create a stream you can add events to and listen to for updates.

It also exposes various pieces of information about the stream itself such as state and number of subscribers. More on them later. For now lets handle the input events using CombineLatestStream.combine2. In lib/player.dart, replace //TODO: replace with infiniteAnimatingPlayerWithCompletionStream and the next line of code with the following:

stream: _engine.infiniteAnimatingPlayerWithCompletionStreamWithInput(),

Look at the implementation for infiniteAnimatingPlayerWithCompletionStreamWithInput in lib/engine.dart:

return _playerController.stream.switchMap((value) {
  var _current = value;
  return CombineLatestStream.combine2<UserInput, int, Tetrimino>(
    _inputController.stream
        .startWith(UserInput(angle: 0, xOffset: 0, yOffset: 0))
        //TODO: add the debounceTime operator
        .map((input) => _offsetUserInput(input, _current)),
    RangeStream(0, effectiveHeight ~/ extent - 1)
        .interval(const Duration(milliseconds: 500)),
    (userInput, yOffset) =>
        _transformedTetrimino(_current, userInput, yOffset),
  )
      .takeWhile(
          (_transformedPiece) => _canTetriminoMove(_transformedPiece))
      .doOnData((_validTransformedPiece) {
    _current = _validTransformedPiece;
  }).doOnDone(() {
    _onTetriminoSet(_current);
  });
});

As you can see, CombineLatestStream.combine2 takes two other streams as input and creates a third stream using the latest events from both streams. Since you need to combine the latest events from _inputController and _playerController, the current tetrimino, CombineLatestStream is perfect for this scenario.

Build and run the app. With the two streams now combined, you can control the falling tetriminos using the left/right buttons and rotate a piece using the rotate button.

an input controller introduced  into the game to  stream input events

Subjects

Up until this point, you’ve been dealing with streams and the improvements RxDart offers to them. Let’s shift the focus to StreamControllers. The StreamController in Dart manages streams, exposing various metadata information and callbacks associated with streams.

By default, they only allow single-subscription streams. To understand single-subscription streams take a look at lib/engine.dart. You use the following code snippet to initialize the game, player and input stream controllers:

final StreamController<Tetrimino> _playerController = StreamController();
final StreamController<UserInput> _inputController = StreamController.broadcast();
final StreamController<GameData> _gameController =
    StreamController.broadcast();

There are two different observations you can make from the code above:

  • _playerController manages a single-subscription stream.
  • _inputController and _gameController handle broardcast streams, meaning they handle multiple listeners.

If you didn’t specify the StreamController.broadcast and rather initialized _gameController with StreamController() for instance, it’ll throw a runtime exception with the following stacktrace:
stacktrace explaining multiple  listener error in streams

Subjects in RxDart are the equivalent of StreamController in the native Dart API, albeit with more capabilities. They handle broadcast streams by default. Let’s have a look at them in a bit more detail.

BehaviorSubject

BehaviorSubject is one of the many different types of Subjects offered by RxDart. They offer all the functionality of a StreamController but differ in the fact that they manage broadcast streams by default. So, to solve the problem you encountered earlier using RxDart, you’ll simply replace the StreamController construction invocation for _gameController with a BehaviorSuject. Replace //TODO: replace with BehaviorSubject and the assignment below it in lib/engine.dart with the following:

//TODO: replace with seeded BehaviorSubject
final StreamController <GameData>_gameController = BehaviorSubject();

Seeded BehaviorSubjects

Besides handling broadcast streams by default, BehaviorSubjects also support seed values. Seed values are the first events transmitted when you initialize BehaviorSubjects. In the case of the game, you can use them to represent a start screen which is shown when none of the blocks have been set. Now, you’ll leverage the power of RxDart and create a seeded BehaviorSubject.

While still in lib/engine.dart, replace //TODO: replace with seeded BehaviorSubject and the code beneath it with the following statement:

final StreamController<GameData> _gameController =
      BehaviorSubject.seeded(GameData(state: GameState.Start, pieces: []));

Build and run the app. Although this is the same start screen as the one from the start of the tutorial, it was achieved using RxDart.

Starter screen for app

Awesome!

Introducing Backpressure

The whole reactive programming style is useful for developers since the UI can only subscribe to the streams they want to listen to. This promotes the idea of “Separation of Concerns, ” a crucial idea for developing apps.

Of course, reactive programming has issues as well. For instance, think about what happens when the source is much faster than the sink. That is, the sink finishes processing the events at a much slower rate than the source is creating events.

A common example of this issue is in implementing a suggestive search field. As the user types the query, the app needs to present suggestions. If you use a REST API to back the suggestion functionality that takes the query as a parameter, there’ll be a certain amount of delay until you receive a response.

If this delay proves to be greater than the time it takes for the keyword to change, the response returned may be useless for certain search queries. In other words, you’re making unnecessary network calls.

In terms of streams, the keyword changes are the source and the responses from the API are the sink. The source in this case is a lot faster than the sink. Reactive programming uses the term Backpressure to denote this phenomena and RxDart offers various operators to deal with it. Three notable examples of these operators are:

  • debounce: Emits items from the input stream only when a “window” has completed.
  • debounceTime: Similar to debounce but the “window” in this case is just a timer.
  • exhaustMap: Useful for converting events of an input stream into another stream. Ignores all subsequent events of the input stream until the result stream has completed.

To see how you can use these operators to deal with backpressure, open lib/engine.dart and replace //TODO: add the debounceTime operator with the following:

.debounceTime(const Duration(seconds: 1))

Build and run the app. debounceTime will allow _inputController to now ignore frequent taps for a certain period of time:

handling backpressuer

Where to Go From Here?

Download the final project by clicking the Download Materials button at the top or bottom of this tutorial. In this tutorial, you learned the basics of Dart streams and how RxDart augments it using the following concepts:

  • Rx Streams: Builds upon Dart streams to offer more features and flexibility in working with streams.
  • Subjects: Works like StreamControllers, but with additional powers.
  • Extension Functions: Extends the Dart streams API with expressive, chainable functions to make developing reactive apps easier.
  • Backpressure: Arises when there’s inconsistency between the source and sink. You learned how to ensure there’s consistent data synchronization between stream sources and sinks.

To dive deeper, check out the following resources:

  • Visit the Dart docs on Asynchronous programming to learn about its nuances.
  • Check out the RxDart package and all the extension functions and subjects it provides.
  • To strengthen your understanding of Reactive Programming, you can checkout Rx Marbles. It has interactive diagrams that help you understand how different streams operate. Also as a plus point, almost every stream mentioned in the RxDart docs has a corresponding marble diagram.

If you have any questions, comments or suggestions, feel free to join the discussion below!

Contributors

Comments

Reviews

More like this