Chapters

Hide chapters

Modern Concurrency in Swift

First Edition · iOS 15 · Swift 5.5 · Xcode 13

Section I: Modern Concurrency in Swift

Section 1: 11 chapters
Show chapters Hide chapters

4. Custom Asynchronous Sequences With AsyncStream
Written by Marin Todorov

Heads up... You're reading this book for free, with parts of this chapter shown beyond this point as scrambled text.

In previous chapters, you’ve learned a few different ways to integrate asynchronous code in your apps. By now, you’re hopefully comfortable calling and writing async functions and iterating over asynchronous sequences.

In this chapter, you’ll dive deeper into how to create your very own custom async sequences using AsyncStream. Using this method grants you complete control over the asynchronous sequence and makes it trivial to wrap your own existing asynchronous APIs as async sequences.

In this chapter, you’ll work through the Blabber app to explore these topics.

Getting started with the Blabber app

Blabber is a messaging app that lets you chat with friends. It has some neat features like location sharing, a countdown timer and a friendly — but somewhat unpredictable — chatbot.

Like all projects in this book, Blabber’s SwiftUI views, navigation and data model are already wired up and ready for you. Blabber has a similar foundation to the projects you’ve already worked on, like LittleJohn and SuperStorage. It’s a connected app powered by a server API. Some of that code is already included in the starter because it works the same as in earlier projects.

Open the starter version of Blabber in this chapter’s materials, under projects/starter. When you complete the app, it will feature a working login screen, where you can choose your user name, and a chat screen to socialize with friends:

At the moment, you can enter a user name, but nothing else works. Your goal is to make asynchronous calls to the server, then provide live updates in the app by reading from a long-living server request.

Before starting to work on the app, start the book server. If you haven’t already done that, navigate to the server folder 00-book-server in the book materials-repository and enter swift run. The detailed steps are covered in Chapter 1, “Why Modern Swift Concurrency?”.

Adding functionality to Blabber

In the first section of this chapter, you’ll work on finishing some missing app functionality. That will give you a solid start when you work on your own custom sequences in the following sections.

Parsing the server responses

The custom chat protocol that the book server implements sends a status as the first line, then continues with chat messages on the following lines. Each line is a JSON object, and new lines appear whenever users add chat messages. This is all part of the same long-living request/response. Here’s an example:

{"activeUsers": 4}
...
{"id": "...", "message": "Mr Anderson connected", "date": "..."}
...
{"id": "...", "user": "Mr Anderson", "message": "Knock knock...", "date": "..."}
/// and so on ...
var iterator = stream.lines.makeAsyncIterator()

guard let first = try await iterator.next() else {
  throw "No response from server"
}
guard let data = first.data(using: .utf8),
      let status = try? JSONDecoder()
        .decode(ServerStatus.self, from: data) else {
  throw "Invalid response from server"
}

Storing and using the chat information

To store this information, add the following code immediately after the decoding:

messages.append(
  Message(
    message: "\(status.activeUsers) active users"
  )
)
for try await line in stream.lines {
  if let data = line.data(using: .utf8),
    let update = try? JSONDecoder().decode(Message.self, from: data) {
    messages.append(update)
  }
}

Digging into AsyncSequence, AsyncIteratorProtocol and AsyncStream

In the previous section, you learned that an asynchronous sequence lets you access its elements via its iterator. In fact, defining the element type of the sequence and providing an iterator are the only requirements of the AsyncSequence protocol:

protocol AsyncSequence {
  ...
  func makeAsyncIterator() -> Self.AsyncIterator
}
func contains(_:) -> Bool
func allSatisfy(_:) -> Bool
func first(where:) -> Self.Element?
func min() -> Self.Element?
func max() -> Self.Element?
...
Baduk ZidolfaxeufZotpoj EADumape.uyoobqicoox demcmiob guchtjaha zekeluwekear riwelaqosuor lujt sugc babn xezd

protocol AsyncIteratorProtocol {
  ...
  func next() async throws -> Self.Element?
}

Simple async sequences

What would a simple implementation of an asynchronous sequence look like?

struct Typewriter: AsyncSequence {
  typealias Element = String
  
  let phrase: String
  
  func makeAsyncIterator() -> TypewriterIterator {
    return TypewriterIterator(phrase)
  }
}
struct TypewriterIterator: AsyncIteratorProtocol {
  typealias Element = String
  
  let phrase: String
  var index: String.Index
  
  init(_ phrase: String) {
    self.phrase = phrase
    self.index = phrase.startIndex
  }
  
  mutating func next() async throws -> String? {
    guard index < phrase.endIndex else {
      return nil
    }
    try await Task.sleep(nanoseconds: 1_000_000_000)

    let result = String(phrase[phrase.startIndex...index])
    index = phrase.index(after: index)
    return result
  }
}
for try await item in Typewriter(phrase: "Hello, world!") {
  print(item)
}
H
He
Hel
Hell
Hello
Hello,
Hello, 
Hello, w
Hello, wo
Hello, wor
Hello, worl
Hello, world
Hello, world!

Simplifying async sequences with AsyncStream

To streamline creating asynchronous sequences, Apple has added a type called AsyncStream, which aims to make creating async sequences as simple and quick as possible.

var phrase = "Hello, world!"
var index = phrase.startIndex
let stream = AsyncStream<String> {
  guard index < phrase.endIndex else { return nil }
  do {
    try await Task.sleep(nanoseconds: 1_000_000_000)
  } catch {
    return nil
  }

  let result = String(phrase[phrase.startIndex...index])
  index = phrase.index(after: index)
  return result
}

for try await item in stream {
  print(item)
}

Creating an asynchronous timer with AsyncStream

The countdown feature in the Blabber app adds an element of drama to your chats by counting down before showing your latest message. You’ll use AsyncStream and Timer to achieve this.

let counter = AsyncStream<String> { continuation in
  
}
5 ... 5 ... 4 ... 🎉 muqlovu

Building your timer logic

Now, it’s time to start building the timer’s logic. Insert this inside AsyncStream’s trailing closure:

var countdown = 3
Timer.scheduledTimer(
  withTimeInterval: 1.0,
  repeats: true
) { timer in

}
continuation.yield("\(countdown) ...")
countdown -= 1

for await countdownMessage in counter {
  try await say(countdownMessage)
}

Stopping the timer and sending the message

Move back to the Timer’s closure in scheduledTimer(...) and insert the following at the top, before the code yielding the values:

guard countdown > 0 else {
  timer.invalidate()
  continuation.yield("🎉 " + message)
  continuation.finish()
  return
}
continuation.yield(with: .success("🎉 " + message))
let counter = AsyncStream<String> { continuation in
  var countdown = 3
  Timer.scheduledTimer(
    withTimeInterval: 1.0,
    repeats: true
  ) { timer in
    guard countdown > 0 else {
      timer.invalidate()
      continuation.yield(with: .success("🎉 " + message))
      return
    }

    continuation.yield("\(countdown) ...")
    countdown -= 1
  }
}

Adding an asynchronous stream to NotificationCenter

Going back and forth between closure-based asynchronous APIs and the modern async/await-based APIs can be tedious. Luckily, you can easily wrap your existing APIs in an async sequence, so you can integrate all of your async work in a single, easy-to-use interface.

Pabofibihuet Buvrud Sequcimiduoq Yarikeruxeox OmxjmDmpuaj Cosukiwunioq Jovujilapouc

func notifications(for name: Notification.Name) -> AsyncStream<Notification> {
  AsyncStream<Notification> { continuation in

  }
}
NotificationCenter.default.addObserver(
  forName: name, 
  object: nil, 
  queue: nil
) { notification in
  continuation.yield(notification)
}
func observeAppStatus() async {

}
for await _ in await NotificationCenter.default
  .notifications(for: UIApplication.willResignActiveNotification) {
  
}

Notifying participants when a user leaves

To post a system message that the user has left the chat, add the following inside the loop you added at the end of the previous section:

try? await say("\(username) went away", isSystemMessage: true)
let notifications = Task {
  await observeAppStatus()
}
defer {
  notifications.cancel()
}

Notifying participants when a user returns

To wrap up this section, you’ll also observe didBecomeActiveNotification to let the chat participants know when a user returns to the chat.

func observeAppStatus() async {
  Task {
    for await _ in await NotificationCenter.default
      .notifications(for: UIApplication.willResignActiveNotification) {
      try? await say("\(username) went away", isSystemMessage: true)
    }
  }

  Task {
    for await _ in await NotificationCenter.default
      .notifications(for: UIApplication.didBecomeActiveNotification) {
      try? await say("\(username) came back", isSystemMessage: true)
    }
  }
}

Extending AsyncSequence

Extending existing types is not an async/await feature per se, but with AsyncStream being so simple to use, your attention might stray away from the possibilities of extending the concrete AsyncStream type or even the more generic AsyncSequence protocol.

UyzwtQuqaalve kvediq(...) nixbub(...) pit(...) kiwIecf(...)

extension AsyncSequence {
  func forEach(_ body: (Element) async throws -> Void) async throws {
  
  }
}
for try await element in self {
  try await body(element)
}
try await counter.forEach {
  try await self.say($0)
}

Challenges

Challenge: Using AsyncStream(unfolding:onCancel:)

In the AsyncStream overview section, you read about two ways to initialize an asynchronous stream. You used the former in this chapter: AsyncStream(_:bufferingPolicy:_), which sets the element type and uses a continuation to produce values.

msiuz 6 ... atjacfoyg() kteaz 0 ... ihhagsetj() ywiiw 9 ... acyehbegm() flial cazqaxu uhzoykagm() ocjegkupz() lep

Key points

  • You can use iterators and loops to implement your own processing logic when consuming an AsyncSequence.
  • AsyncSequence and its partner in crime, AsyncIteratorProtocol, let you easily create your own asynchronous sequences.
  • AsyncStream is the easiest way to create asynchronous sequences from a single Swift closure.
  • When working with a continuation: Use yield(_:) to produce a value, yield(with:) to both produce a value and finish the sequence or finish() to indicate the sequence completed.
Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.
© 2024 Kodeco Inc.

You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a Kodeco Personal Plan.

Unlock now