Chapters

Hide chapters

Dart Apprentice: Beyond the Basics

Dart Apprentice: Beyond the Basics

Section 1: 15 chapters
Show chapters Hide chapters

13. Streams
Written by Jonathan Sande

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

A future represents a single value that will arrive in the future. On the other hand, a stream represents multiple values that will arrive in the future. Think of a stream as a list of futures.

You can imagine a stream meandering through the woods as the autumn leaves fall onto the water’s surface. Each time a leaf floats by, it’s like the value that a Dart stream provides.

value value value value value value value
Stream of values

Streaming music online rather than downloading the song before playing it is another good comparison. When you stream music, you get many little chunks of data, but when you download the whole file, you get a single value, which is the entire file — a little like what a future returns. The http.get command you used in the last section was implemented as a stream internally. However, Dart just waited until the stream finished and then returned all the data at once as a completed future.

Streams, which are of type Stream, are used extensively in Dart and Dart-based frameworks. Here are some examples:

  • Reading a large file stored locally where new data from the file comes in chunks.
  • Downloading a file from a remote server.
  • Listening for requests coming into a server.
  • Representing user events such as button clicks.
  • Relaying changes in app state to the UI.

Although it’s possible to build streams from scratch, you usually don’t need to do that. You only need to use the streams that Dart or a Dart package provides. The first part of this chapter will teach you how to do that. The chapter will finish by teaching you how to make your own streams.

Using a Stream

Reading and writing files are important skills to learn in Dart. This will also be a good opportunity to practice using a stream.

The dart:io library contains a File class, which allows you to read data from a file. First, you’ll read data the easy way using the readAsString method, which returns the file’s contents as a future. Then, you’ll do it again by reading the data as a stream of bytes.

Adding an Assets File

You need a text file to work with, so you’ll add that to your project now.

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

Reading as a String

Now that you’ve created the text file, replace your Dart code with the following:

import 'dart:io';

Future<void> main() async {
  final file = File('assets/text.txt');
  final contents = await file.readAsString();
  print(contents);
}

Increasing the File Size

If the file is large, you can read it as a stream. This allows you to start processing the data more quickly because you don’t have to wait to finish reading the entire file as you did in the last example.

Reading From a Stream

Replace the contents in the body of the main function with the following code:

final file = File('assets/text_long.txt');
final stream = file.openRead();
stream.listen(
  (data) {
    print(data.length);
  },
);
65536
65536
65536
65536
65536
65536
52783

Using an Asynchronous For-Loop

Just as you can use callbacks or async-await to get the value of a future, you also have two ways to get the values of a stream. In the example above, you used the listen callback. Here is the same example using an asynchronous for loop:

Future<void> main() async {
  final file = File('assets/text_long.txt');
  final stream = file.openRead();
  await for (var data in stream) {
    print(data.length);
  }
}

Error Handling

Like futures, stream events can also include an error rather than a value.

askol itsiz nigea wereu rubau xowue qonoa
Ckqiuk af ririif okw avmugs

Using a Callback

One way to handle errors is to use the onError callback like so:

final file = File('assets/text_long.txt');
final stream = file.openRead();
stream.listen(
  (data) {
    print(data.length);
  },
  onError: (Object error) {
    print(error);
  },
  onDone: () {
    print('All finished');
  },
);

Using Try-Catch

The other way to handle errors on a stream is with a try-catch block in combination with async-await. Here is what that looks like:

try {
  final file = File('assets/text_long.txt');
  final stream = file.openRead();
  await for (var data in stream) {
    print(data.length);
  }
} on Exception catch (error) {
    print(error);
} finally {
  print('All finished');
}
FileSystemException: Cannot open file, path = 'assets/pink_elephants.txt' (OS Error: No such file or directory, errno = 2)
All finished

Cancelling a Stream

As mentioned above, you may use the cancelOnError parameter to tell the stream that you want to stop listening in the event of an error. But even if there isn’t an error, you should always cancel your subscription to a stream if you no longer need it. This allows Dart to clean up the memory the stream was using. Failing to do so can cause a memory leak.

import 'dart:async';
import 'dart:io';

void main() {
  final file = File('assets/text_long.txt');
  final stream = file.openRead();
  StreamSubscription<List<int>>? subscription;
  subscription = stream.listen(
    (data) {
      print(data.length);
      subscription?.cancel();
    },
    cancelOnError: true,
    onDone: () {
      print('All finished');
    },
  );
}

Transforming a Stream

Being able to transform a stream as the data is coming in is very powerful. In the examples above, you never did anything with the data except print the length of the bytes list. Those bytes represent text, though, so you’re going to transform the data from numbers to text.

Viewing the Bytes

Replace the contents of main with the following code:

final file = File('assets/text.txt');
final stream = file.openRead();
stream.listen(
  (data) {
    print(data);
  },
);
[76, 111, 114, 101, ... ]
22 54 87 44 85 34 55 32 14 84 91 11 13 49 96 70 ! '' # $ % & ( ) * + , - . / ' 83 72 26 49 85 26 58 16 38 78 87 30 04 13 60 98 3 3 2 2 7 2 2 4 8 0 : ; < = > ? 03 91 98 40 78 18 45 39 94 59 45 94 53 99 22 81 @ A V J N I X P H O Z S P X L O 24 97 20 71 32 86 36 72 10 11 52 12 39 85 14 32 Y Z S L W A Q V B S K [ \ ] ^ _ 70 69 79 29 255 286 709 679 341 086 674 250 196 551 181 048 ` a m p j o k g h i f j k q x e 952 719 261 169 140 313 688 993 144 934 631 955 446 293 644 611 g k r q m o p g l z g { | } ~ FOS
Opemujo vcarebyamr ef fhu fafqe 89-707

Decoding the Bytes

Next, you’ll take the UTF-8 bytes and convert them to a string.

import 'dart:convert';
import 'dart:io';

Future<void> main() async {
  final file = File('assets/text.txt');
  final byteStream = file.openRead();
  final stringStream = byteStream.transform(utf8.decoder);
  await for (var data in stringStream) {
    print(data);
  }
}

Exercise

The following code produces a stream that outputs an integer every second and stops after the tenth time.

Stream<int>.periodic(
  Duration(seconds: 1),
  (value) => value,
).take(10);

Creating Streams From Scratch

You’ve learned how to use streams. As you advance in your skills, you might want to also create packages with streams for other developers to use.

Using Stream Constructors

The Stream class has several constructors you can use to create streams. You saw an example in the exercise above with Stream.periodic, which added data at periodic intervals. Here are a few more named constructors:

final first = Future(() => 'Row');
final second = Future(() => 'row');
final third = Future(() => 'row');
final fourth = Future.delayed(
  Duration(milliseconds: 300),
  () => 'your boat',
);
final stream = Stream<String>.fromFutures([
  first,
  second,
  third,
  fourth,
]);

stream.listen((data) {
  print(data);
});
Row
row
row
your boat

Using Asynchronous Generators

The Stream constructors are good when they match the data you have, but if you want more flexibility, consider using an asynchronous generator.

Reviewing Synchronous Generators

You learned about synchronous generators in Chapter 15, “Iterables”, of Dart Apprentice: Fundamentals. But to review, a synchronous generator returns its values as an iterable. These values are available on demand. You can get them as soon as you need them. That’s why they’re called synchronous.

Iterable<int> hundredSquares() sync* {
  for (int i = 1; i <= 100; i++) {
    yield i * i;
  }
}

Implementing an Asynchronous Generator

When creating an asynchronous generator, use the async* keyword, which you can read as “async star”.

Stream<String> consciousness() async* {
  final data = ['con', 'scious', 'ness'];
  for (final part in data) {
    await Future<void>.delayed(Duration(milliseconds: 500));
    yield part;
  }
}

Listening to the Stream

Replace the contents of main with the following:

final stream = consciousness();

stream.listen((data) {
  print(data);
});
con
scious
ness

Using Stream Controllers

The final way you’ll create a stream is with the low-level StreamController. You could go even more low-level than that, but a stream controller is fine for most practical purposes.

Understanding Sinks and Streams

The way to add data or errors to a stream is with what’s called a sink. You can think of this like your kitchen sink with water flowing out of it into a pipe. The water pipe is like a stream. Throwing a grape into the sink is like adding a data value event to the stream. The grape gets washed through the sink’s drain and enters the water stream flowing through the pipe. Alternatively, you could throw a cherry in the sink, and it will have the same fate as the grape. Putting in a cherry is like adding an error event. You can also close the sink. Think of that like putting a plug in the hole. No more data or errors can enter the stream.

Jhneib lumua fimai tohoo wekoi ebcuc Sodk burio

Writing the Code

Replace your main function with the following code:

import 'dart:async';

Future<void> main() async {
  // 1
  final controller = StreamController<String>();
  final stream = controller.stream;
  final sink = controller.sink;
  // 2
  stream.listen(
    (value) => print(value),
    onError: (Object error) => print(error),
    onDone: () => print('Sink closed'),
  );
  // 3
  sink.add('grape');
  sink.add('grape');
  sink.add('grape');
  sink.addError(Exception('cherry'));
  sink.add('grape');
  sink.close();
}

Testing It Out

Run your code, and you’ll see the following lines in the console:

grape
grape
grape
Exception: cherry
grape
Sink closed

Challenges

Before going on to the next chapter, here are some challenges to test your knowledge of streams. It’s best if you try to solve them yourself, but if you get stuck, solutions are available in the challenge folder of this chapter.

Challenge 1: Data Stream

The following code uses the http package to stream content from the given URL:

final url = Uri.parse('https://kodeco.com');
final client = http.Client();
final request = http.Request('GET', url);
final response = await client.send(request);
final stream = response.stream;

Challenge 2: Heads or Tails?

Create a coin flipping service that provides a stream of 10 random coin flips, each separated by 500 milliseconds. You use the service like so:

final coinFlipper = CoinFlippingService();

coinFlipper.onFlip.listen((coin) {
  print(coin);
});

coinFlipper.start();

Key Points

  • A stream, which is of type Stream, is like a series of futures.
  • Using a stream enables you to handle data events as they happen rather than waiting for them all to finish.
  • You can handle stream errors with callbacks or try-catch blocks.
  • You can create streams with Stream constructors, asynchronous generators or stream controllers.
  • A sink is an object for adding values and errors to a stream.

Where to Go From Here?

Streams are powerful, and you can do much more with them. For example, if your app has a “Download Song” button, you don’t want to overload the server when some happy kid presses the button as fast as they can a million times. You can consolidate that stream of button-press events into a single server request. This is called debouncing. It doesn’t come built into Dart, but packages like RxDart support debouncing and many other stream functions.

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2023 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a kodeco.com Professional subscription.

Unlock now