- For more about push-based vs pull-based AsyncStreams, see AsyncSequence & AsyncStream Tutorial for iOS.
- This video uses Xcode 14’s
Task.sleep(until:clock:)
. If you use Xcode 13, replace this withTask.sleep(nanoseconds: 1_000_000_000)
.
In this episode, you’ll learn about AsyncStream
, an easier way to create a custom AsyncSequence
.
In the course materials, locate the starter playground and open it.
Custom AsyncSequence from episode 4 of Getting Started
In the preceding course, you created this simple typewriter with a custom AsyncSequence
that “types” a phrase, adding a character every second.
struct Typewriter: AsyncSequence {
typealias Element = String
let phrase: String
func makeAsyncIterator() -> TypewriterIterator {
return TypewriterIterator(phrase)
}
}
struct TypewriterIterator: AsyncIteratorProtocol {
typealias Element = String
let phrase: String
var index: String.Index
init(_ phrase: String) {
self.phrase = phrase
self.index = phrase.startIndex
}
mutating func next() async throws -> String? {
guard index < phrase.endIndex else { return nil }
try await Task.sleep(until: .now + .seconds(1),
clock: .continuous)
defer { index = phrase.index(after: index) }
return String(phrase[phrase.startIndex...index])
}
}
Each call to next()
returns a substring of the initial string that is one character longer than the last one.
When it reaches the end of the phrase, next()
returns nil
to signify the end of the sequence.
You tried out your Typewriter
sequence with this Task
:
Task {
for try await item in Typewriter(phrase: "Hello, world!") {
print(item)
}
print("AsyncSequence Done")
}
Run this Task
to remind yourself how it works.
He
Hel
Hell
Hello
Hello,
Hello,
Hello, w
Hello, wo
Hello, wor
Hello, worl
Hello, world
Hello, world!
AsyncSequence Done
I promised you I’d show you a much easier way. And now it’s time to learn about AsyncStream
.
Comment out this Task
so it doesn’t interfere with the tasks you’re about to write.
AsyncStream Typewriter
In the next section, the starter has already set up test phrase
and index
properties:
let phrase = "Hello, world!"
var index = phrase.startIndex
You’ll create a pull-based AsyncStream
first:
let stream_pull = AsyncStream<String> {
}
Instead of a type alias, you just initialize your AsyncStream
with your element type String
. This trailing closure is the unfolding
argument of the AsyncStream
. It just has to return the next element.
So scroll up to the TypewriterIterator
next()
method, copy its code, and paste this into the AsyncStream
closure:
let stream_pull = AsyncStream<String> {
🟩
guard index < phrase.endIndex else { return nil }
try await Task.sleep(until: .now + .seconds(1),
clock: .continuous)
defer { index = phrase.index(after: index) }
return String(phrase[phrase.startIndex...index])
🟥
}
Xcode complains because AsyncStream
doesn’t throw.
In TypewriterIterator
, you were able to declare that next()
throws.
You can do a similar thing here: Change AsyncStream
to AsyncThrowingStream
:
let stream_pull = 🟩AsyncThrowingStream<String, Error🟥> {
The throwing version of AsyncStream
allows its closure to throw errors. You add an Error
type parameter placeholder inside the angle brackets.
Or, you could wrap the try await
in a do-catch
.
Undo back to AsyncStream
, then add your do-catch:
do {
try await Task.sleep(until: .now + .seconds(1),
clock: .continuous)
} catch {
return nil
}
Now, to try out your AsyncStream
, create a Task
:
Task {
for try await item in stream_pull {
print(item)
}
print("Pull AsyncStream Done")
}
The Task
loops over the stream items: For each iteration, the stream’s closure gets executed. This continues until the closure returns nil
. Run this Task
.
You get the same results as before, with a lot less code. And you didn’t have to add new types to your code base.
AsyncStream: pull or push?
Option-click AsyncStream then Open in Developer Documentation. Scroll down to Topics.
There are two kinds of AsyncStream
. The one you just created has an unfolding
closure. The closure is marked as async
. Like the sequence iterator you wrote for custom sequences in the preceding course, it supplies the next
element. It creates a sequence of values, one at a time, only when the task asks for one. Think of it as pull-based or demand-driven. Use this when you have control over when elements are generated.
Click the other initializer. This AsyncStream
has a build
closure. This closure is not marked as async
, because this version is meant to interface with non-asynchronous APIs. This closure creates a sequence of values and buffers them until someone asks for them. Think of it as push-based or supply-driven.
Notice that Task.sleep
is wrapped in a Task
because it’s asynchronous while the closure is not.
In episode 4, you’ll create a push-based AsyncStream
to manage notifications, which arrive on their own schedule. In this episode, you’ll create a push-based version of the Typewriter
sequence, so you can compare the two kinds of AsyncStream
.
Close the documentation window. Just above the pull-based code, create a push-based stream:
// These two lines are already above the pull-based stream code
let phrase = "Hello, world!"
var index = phrase.startIndex
let stream_push = AsyncStream<String> { continuation in // closure receives a continuation
Task { // wrap any asynchronous code in a Task
}
}
Inside the Task
closure, add a while-loop:
while index < phrase.endIndex { // while index hasn't reached the end of phrase
}
Now, copy the do-catch
code from the pull-based AsyncStream
into this while
loop, and delete the return nil
in the catch
closure.
while index < phrase.endIndex {
do {
try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
} catch {
// delete return nil
}
}
Next, in the do
closure, yield a sequence value:
continuation.yield(String(phrase[phrase.startIndex...index])) // copy pull-based return value
index = phrase.index(after: index) // copy index increment from pull-based defer
For a push-based AsyncStream, you don’t have to defer
incrementing index
because you don’t return
from this closure. You just use continuation.yield()
to create each value. If there’s no process waiting for the values, they go into the buffer.
Finally, in the catch
closure, finish the continuation and also do this after the while
loop ends:
continuation.finish()
If there’s an error or when you reach phrase.endIndex
, you use continuation.finish()
to push nil
into the buffer to indicate the end of the sequence.
The Task
to use this stream is the same as for the pull-based Task
, so scroll down to copy it, paste it and change pull
to push
:
// These two lines are already above the pull-based stream code
var phrase = "Hello, world!"
var index = phrase.startIndex
let stream_push = AsyncStream<String> { continuation in
Task {
while index < phrase.endIndex {
do {
try await Task.sleep(until: .now + .seconds(1),
clock: .continuous)
continuation.yield(String(phrase[phrase.startIndex...index]))
index = phrase.index(after: index)
} catch {
continuation.finish()
}
}
continuation.finish()
}
}
🟩Task {
for try await item in stream_push { // change pull to push
print(item)
}
print("Push AsyncStream Done") // change pull to push
}🟥
You try await
items in this push-based stream, just like for the pull-based stream.
The stream_push
closure is called once. It creates all the stream elements, buffering them until the Task
loop asks for them. Run this push-based Task
.
As you expect, the output is the same. To see how these two AsyncStream
closures differ, comment out the Task
s that print out the streams. Run the playground.
The pull-based AsyncStream
code never runs at all. It runs only when the Task
asks for the next value.
The while
loop in the push-based AsyncStream
closure runs 13 times, creating the stream of String
s. It does this even though there’s no Task
asking for the values.
In the next three episodes, you’ll use both types of AsyncStream
to add features to the Blabber app.