Modern Concurrency: Beyond the Basics

Oct 20 2022 Swift 5.5, iOS 15, Xcode 13.4

Part 1: AsyncStream & Continuations

2. AsyncStream

Episode complete

Play next episode

Next
Save for later
About this episode
See forum comments
Cinema mode Mark complete Download course materials
Previous episode: 1. Introduction Next episode: 3. Using AsyncStream to Count Down
  • For more about push-based vs pull-based AsyncStreams, see AsyncSequence & AsyncStream Tutorial for iOS.
  • This video uses Xcode 14’s Task.sleep(until:clock:). If you use Xcode 13, replace this with Task.sleep(nanoseconds: 1_000_000_000).

In this episode, you’ll learn about AsyncStream, an easier way to create a custom AsyncSequence.

In the course materials, locate the starter playground and open it.

Custom AsyncSequence from episode 4 of Getting Started

In the preceding course, you created this simple typewriter with a custom AsyncSequence that “types” a phrase, adding a character every second.

struct Typewriter: AsyncSequence {
  typealias Element = String
  let phrase: String

  func makeAsyncIterator() -> TypewriterIterator {
    return TypewriterIterator(phrase)
  }
}

struct TypewriterIterator: AsyncIteratorProtocol {
  typealias Element = String
  let phrase: String
  var index: String.Index

  init(_ phrase: String) {
    self.phrase = phrase
    self.index = phrase.startIndex
  }

  mutating func next() async throws -> String? {
    guard index < phrase.endIndex else { return nil }
    try await Task.sleep(until: .now + .seconds(1),
                         clock: .continuous)
    defer { index = phrase.index(after: index) }
    return String(phrase[phrase.startIndex...index])
  }
}

Each call to next() returns a substring of the initial string that is one character longer than the last one.

When it reaches the end of the phrase, next() returns nil to signify the end of the sequence.

You tried out your Typewriter sequence with this Task:

Task {
  for try await item in Typewriter(phrase: "Hello, world!") {
    print(item)
  }
  print("AsyncSequence Done")
}

Run this Task to remind yourself how it works.

He
Hel
Hell
Hello
Hello,
Hello, 
Hello, w
Hello, wo
Hello, wor
Hello, worl
Hello, world
Hello, world!
AsyncSequence Done

I promised you I’d show you a much easier way. And now it’s time to learn about AsyncStream.

Comment out this Task so it doesn’t interfere with the tasks you’re about to write.

AsyncStream Typewriter

In the next section, the starter has already set up test phrase and index properties:

let phrase = "Hello, world!"
var index = phrase.startIndex

You’ll create a pull-based AsyncStream first:

let stream_pull = AsyncStream<String> {

}

Instead of a type alias, you just initialize your AsyncStream with your element type String. This trailing closure is the unfolding argument of the AsyncStream. It just has to return the next element.

So scroll up to the TypewriterIterator next() method, copy its code, and paste this into the AsyncStream closure:

let stream_pull = AsyncStream<String> {
  🟩
  guard index < phrase.endIndex else { return nil }
  try await Task.sleep(until: .now + .seconds(1),
                       clock: .continuous)
  defer { index = phrase.index(after: index) }
  return String(phrase[phrase.startIndex...index])
  🟥
}

Xcode complains because AsyncStream doesn’t throw.

In TypewriterIterator, you were able to declare that next() throws.

You can do a similar thing here: Change AsyncStream to AsyncThrowingStream:

let stream_pull = 🟩AsyncThrowingStream<String, Error🟥> {

The throwing version of AsyncStream allows its closure to throw errors. You add an Error type parameter placeholder inside the angle brackets.

Or, you could wrap the try await in a do-catch.

Undo back to AsyncStream, then add your do-catch:

do {
  try await Task.sleep(until: .now + .seconds(1),
                       clock: .continuous)
} catch {
  return nil
}

Now, to try out your AsyncStream, create a Task:

Task {
  for try await item in stream_pull {
    print(item)
  }
  print("Pull AsyncStream Done")
}

The Task loops over the stream items: For each iteration, the stream’s closure gets executed. This continues until the closure returns nil. Run this Task.

You get the same results as before, with a lot less code. And you didn’t have to add new types to your code base.

AsyncStream: pull or push?

Option-click AsyncStream then Open in Developer Documentation. Scroll down to Topics.

There are two kinds of AsyncStream. The one you just created has an unfolding closure. The closure is marked as async. Like the sequence iterator you wrote for custom sequences in the preceding course, it supplies the next element. It creates a sequence of values, one at a time, only when the task asks for one. Think of it as pull-based or demand-driven. Use this when you have control over when elements are generated.

Click the other initializer. This AsyncStream has a build closure. This closure is not marked as async, because this version is meant to interface with non-asynchronous APIs. This closure creates a sequence of values and buffers them until someone asks for them. Think of it as push-based or supply-driven.

Notice that Task.sleep is wrapped in a Task because it’s asynchronous while the closure is not.

In episode 4, you’ll create a push-based AsyncStream to manage notifications, which arrive on their own schedule. In this episode, you’ll create a push-based version of the Typewriter sequence, so you can compare the two kinds of AsyncStream.

Close the documentation window. Just above the pull-based code, create a push-based stream:

// These two lines are already above the pull-based stream code
let phrase = "Hello, world!"   
var index = phrase.startIndex

let stream_push = AsyncStream<String> { continuation in  // closure receives a continuation
  Task {  // wrap any asynchronous code in a Task
  }
}

Inside the Task closure, add a while-loop:

while index < phrase.endIndex {  // while index hasn't reached the end of phrase

}

Now, copy the do-catch code from the pull-based AsyncStream into this while loop, and delete the return nil in the catch closure.

while index < phrase.endIndex {
  do {
    try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
  } catch {
    // delete return nil
  }
}

Next, in the do closure, yield a sequence value:

continuation.yield(String(phrase[phrase.startIndex...index])) // copy pull-based return value
index = phrase.index(after: index)  // copy index increment from pull-based defer

For a push-based AsyncStream, you don’t have to defer incrementing index because you don’t return from this closure. You just use continuation.yield() to create each value. If there’s no process waiting for the values, they go into the buffer.

Finally, in the catch closure, finish the continuation and also do this after the while loop ends:

continuation.finish()

If there’s an error or when you reach phrase.endIndex, you use continuation.finish() to push nil into the buffer to indicate the end of the sequence.

The Task to use this stream is the same as for the pull-based Task, so scroll down to copy it, paste it and change pull to push:

// These two lines are already above the pull-based stream code
var phrase = "Hello, world!"   
var index = phrase.startIndex

let stream_push = AsyncStream<String> { continuation in
  Task {
    while index < phrase.endIndex {
      do {
        try await Task.sleep(until: .now + .seconds(1),
                             clock: .continuous)
        continuation.yield(String(phrase[phrase.startIndex...index]))
        index = phrase.index(after: index)
      } catch {
        continuation.finish()
      }
    }
    continuation.finish()
  }
}

🟩Task {
  for try await item in stream_push {  // change pull to push
    print(item)
  }
  print("Push AsyncStream Done")  // change pull to push
}🟥

You try await items in this push-based stream, just like for the pull-based stream.

The stream_push closure is called once. It creates all the stream elements, buffering them until the Task loop asks for them. Run this push-based Task.

As you expect, the output is the same. To see how these two AsyncStream closures differ, comment out the Tasks that print out the streams. Run the playground.

The pull-based AsyncStream code never runs at all. It runs only when the Task asks for the next value.

The while loop in the push-based AsyncStream closure runs 13 times, creating the stream of Strings. It does this even though there’s no Task asking for the values.

In the next three episodes, you’ll use both types of AsyncStream to add features to the Blabber app.