Home iOS & Swift Books Combine: Asynchronous Programming with Swift

18
Custom Publishers & Handling Backpressure Written by Florent Pillet

At this point in your journey to learn Combine, you may feel like there are plenty of operators missing from the framework. This may be particularly true if you have experience with other reactive frameworks, which typically provide a rich ecosystem of operators, both built-in and third-party. Combine allows you to create your own publishers. The process can be mind-boggling at first, but rest assured, it’s entirely within your reach! This chapter will show you how.

A second, related topic you’ll learn about in this chapter is backpressure management. This will require some explanation: What is this backpressure thing? Is that some kind of back pain induced by too much leaning over your chair, scrutinizing Combine code? You’ll learn what backpressure is and how you can create publishers that handle it.

Creating your own publishers

The complexity of implementing your own publishers varies from “easy” to “pretty involved.” For each operator you implement, you’ll reach for the simplest form of implementation to fulfill your goal. In this chapter, you’ll look at three different ways of crafting your own publishers:

  • Using a simple extension method in the Publisher namespace.
  • Implementing a type in the Publishers namespace with a Subscription that produces values.
  • Same as above, but with a subscription that transforms values from an upstream publisher.

Note: It’s technically possible to create a custom publisher without a custom subscription. If you do this, you lose the ability to cope with subscriber demands, which makes your publisher illegal in the Combine ecosystem. Early cancellation can also become an issue. This is not a recommended approach, and this chapter will teach you how to write your publishers correctly.

Publishers as extension methods

Your first task is to implement a simple operator just by reusing existing operators. This is as simple as you can get.

extension Publisher {
  // 1
  func unwrap<T>() -> Publishers.CompactMap<Self, T> where Output == Optional<T> {
    // 2
    compactMap { $0 }
  }
}
func unwrap<T>()
-> Publishers.CompactMap<Self, T>
where Output == Optional<T> {

Testing your custom operator

Now you can test your new operator. Add this code below the extension:

let values: [Int?] = [1, 2, nil, 3, nil, 4]

values.publisher
  .unwrap()
  .sink {
    print("Received value: \($0)")
  }
Received value: 1
Received value: 2
Received value: 3
Received value: 4

The subscription mechanism

Subscriptions are the unsung heroes of Combine: While you see publishers everywhere, they are mostly inanimate entities. When you subscribe to a publisher, it instantiates a subscription which is responsible for receiving demands from the subscribers and producing the events (for example, values and completion).

Publishers emitting values

In Chapter 11, “Timers,” you learned about Timer.publish() but found that using Dispatch Queues for timers was somewhat uneasy. Why not develop your own timer based on Dispatch’s DispatchSourceTimer?

struct DispatchTimerConfiguration {
  // 1
  let queue: DispatchQueue?
  // 2
  let interval: DispatchTimeInterval
  // 3
  let leeway: DispatchTimeInterval
  // 4
  let times: Subscribers.Demand
}

Adding the DispatchTimer publisher

You can now start creating your DispatchTimer publisher. It’s going to be straightforward because all the work occurs inside the subscription!

extension Publishers {
  struct DispatchTimer: Publisher {
    // 5
    typealias Output = DispatchTime
    typealias Failure = Never
    
    // 6
    let configuration: DispatchTimerConfiguration
    
    init(configuration: DispatchTimerConfiguration) {
      self.configuration = configuration
    }
  }
}
// 7
func receive<S: Subscriber>(subscriber: S)
  where Failure == S.Failure,
        Output == S.Input {
  // 8
  let subscription = DispatchTimerSubscription(
    subscriber: subscriber,
    configuration: configuration
  )
  // 9
  subscriber.receive(subscription: subscription)
}

Building your subscription

The subscription’s role is to:

private final class DispatchTimerSubscription
  <S: Subscriber>: Subscription where S.Input == DispatchTime {
}

Adding required properties to your subscription

Now add these properties to the subscription class’ definition:

// 10
let configuration: DispatchTimerConfiguration
// 11
var times: Subscribers.Demand
// 12
var requested: Subscribers.Demand = .none
// 13
var source: DispatchSourceTimer? = nil
// 14
var subscriber: S?

Initializing and canceling your subscription

Now, add the initializer to your subscription definition:

init(subscriber: S,
     configuration: DispatchTimerConfiguration) {
  self.configuration = configuration
  self.subscriber = subscriber
  self.times = configuration.times
}
func cancel() {
  source = nil
  subscriber = nil
}

Letting your subscription request values

Do you remember what you learned in Chapter 2, “Publishers & Subscribers?” Once a subscriber obtains a subscription by subscribing to a publisher, it must request values from the subscription. This is where all the magic happens. To implement it, add this method to the class, above the cancel method:

// 15
func request(_ demand: Subscribers.Demand) {
  // 16
  guard times > .none else {
    // 17
    subscriber?.receive(completion: .finished)
    return
  }
}
// 18
requested += demand

// 19
if source == nil, requested > .none {
  
}

Configuring your timer

Add this code to the body of this last if conditional:

// 20
let source = DispatchSource.makeTimerSource(queue: configuration.queue)
// 21
source.schedule(deadline: .now() + configuration.interval,
                repeating: configuration.interval,
                leeway: configuration.leeway)
// 22
source.setEventHandler { [weak self] in
  // 23
  guard let self = self,
        self.requested > .none else { return }

  // 24
  self.requested -= .max(1)
  self.times -= .max(1)
  // 25
  _ = self.subscriber?.receive(.now())
  // 26
  if self.times == .none {
    self.subscriber?.receive(completion: .finished)
  }
}

Activating your timer

Now that you’ve configured your source timer, store a reference to it and activate it by adding this code after setEventHandler:

self.source = source
source.activate()
extension Publishers {
  static func timer(queue: DispatchQueue? = nil,
                    interval: DispatchTimeInterval,
                    leeway: DispatchTimeInterval = .nanoseconds(0),
                    times: Subscribers.Demand = .unlimited)
                    -> Publishers.DispatchTimer {
    return Publishers.DispatchTimer(
      configuration: .init(queue: queue,
                           interval: interval,
                           leeway: leeway,
                           times: times)
                      )
  }
}

Testing your timer

You’re now ready to test your new timer!

// 27
var logger = TimeLogger(sinceOrigin: true)
// 28
let publisher = Publishers.timer(interval: .seconds(1),
                                 times: .max(6))
// 29
let subscription = publisher.sink { time in
  print("Timer emits: \(time)", to: &logger)
}
+1.02668s: Timer emits: DispatchTime(rawValue: 183177446790083)
+2.02508s: Timer emits: DispatchTime(rawValue: 183178445856469)
+3.02603s: Timer emits: DispatchTime(rawValue: 183179446800230)
+4.02509s: Timer emits: DispatchTime(rawValue: 183180445857620)
+5.02613s: Timer emits: DispatchTime(rawValue: 183181446885030)
+6.02617s: Timer emits: DispatchTime(rawValue: 183182446908654)
DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
  subscription.cancel()
}

Publishers transforming values

You’ve made serious progress in building your Combine skills! You can now develop your own operators, even fairly complex ones. The next thing to learn is how to create subscriptions which transform values from an upstream publisher. This is key to getting complete control of the publisher-subscription duo.

Implementing a ShareReplay operator

To implement shareReplay() you’ll need:

// 1
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
  // 2
  let capacity: Int
  // 3
  var subscriber: AnySubscriber<Output,Failure>? = nil
  // 4
  var demand: Subscribers.Demand = .none
  // 5
  var buffer: [Output]
  // 6
  var completion: Subscribers.Completion<Failure>? = nil
}

Initializing your subscription

Next, add the initializer to the subscription definition:

init<S>(subscriber: S,
        replay: [Output],
        capacity: Int,
        completion: Subscribers.Completion<Failure>?)
        where S: Subscriber,
              Failure == S.Failure,
              Output == S.Input {
  // 7
  self.subscriber = AnySubscriber(subscriber)
  // 8
  self.buffer = replay
  self.capacity = capacity
  self.completion = completion
}

Sending completion events and outstanding values to the subscriber

You’ll need a method which relays completion events to the subscriber. Add the following to the subscription class to satisfy that need:

private func complete(with completion: Subscribers.Completion<Failure>) {
  // 9
  guard let subscriber = subscriber else { return }
  self.subscriber = nil
  // 10
  self.completion = nil
  self.buffer.removeAll()
  // 11
  subscriber.receive(completion: completion)
}
private func emitAsNeeded() {
  guard let subscriber = subscriber else { return }
  // 12
  while self.demand > .none && !buffer.isEmpty {
    // 13
    self.demand -= .max(1)
    // 14
    let nextDemand = subscriber.receive(buffer.removeFirst())
    // 15
    if nextDemand != .none {
      self.demand += nextDemand
    }
  }
  // 16
  if let completion = completion {
    complete(with: completion)
  }
}
func request(_ demand: Subscribers.Demand) {
  if demand != .none {
    self.demand += demand
  }
  emitAsNeeded()
}

Canceling your subscription

Canceling the subscription is even easier. Add this code:

func cancel() {
  complete(with: .finished)
}
func receive(_ input: Output) {
  guard subscriber != nil else { return }
  // 17
  buffer.append(input)
  if buffer.count > capacity {
    // 18
    buffer.removeFirst()
  }
  // 19
  emitAsNeeded()
}

Finishing your subscription

Now, add the following method to accept completion events and your subscription class will be complete:

func receive(completion: Subscribers.Completion<Failure>) {
  guard let subscriber = subscriber else { return }
  self.subscriber = nil
  self.buffer.removeAll()
  subscriber.receive(completion: completion)
}

Coding your publisher

Publishers are usually value types implemented as struct in the Publishers namespace. Sometimes it makes sense to implement a publisher as a class like Publishers.Multicast, which multicast() returns. For this publisher, you need a class, though this is the exception to the rule – most often, you’ll use a struct.

extension Publishers {
  // 20
  final class ShareReplay<Upstream: Publisher>: Publisher {
    // 21
    typealias Output = Upstream.Output
    typealias Failure = Upstream.Failure
  }
}

Adding the publisher’s required properties

Now, add the properties that your publisher will need to operate to the definition of ShareReplay:

// 22
private let lock = NSRecursiveLock()
// 23
private let upstream: Upstream
// 24
private let capacity: Int
// 25
private var replay = [Output]()
// 26
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
// 27
private var completion: Subscribers.Completion<Failure>? = nil

Initializing and relaying values to your publisher

First, add the necessary initializer:

init(upstream: Upstream, capacity: Int) {
  self.upstream = upstream
  self.capacity = capacity
}
private func relay(_ value: Output) {
  // 28
  lock.lock()
  defer { lock.unlock() }

  // 29
  guard completion == nil else { return }

  // 30
  replay.append(value)
  if replay.count > capacity {
    replay.removeFirst()
  }
  // 31
  subscriptions.forEach {
    _ = $0.receive(value)
  }
}

Letting your publisher know when it’s done

Second, add this method to handle completion events:

private func complete(_ completion: Subscribers.Completion<Failure>) {
  lock.lock()
  defer { lock.unlock() }
  // 32
  self.completion = completion
  // 33
  subscriptions.forEach {
    _ = $0.receive(completion: completion)
  }
}
func receive<S: Subscriber>(subscriber: S)
  where Failure == S.Failure,
        Output == S.Input {
  lock.lock()
  defer { lock.unlock() }
}

Creating your subscription

Next, add this code to the method to create the subscription and hand it over to the subscriber:

// 34
let subscription = ShareReplaySubscription(
  subscriber: subscriber,
  replay: replay,
  capacity: capacity,
  completion: completion)

// 35
subscriptions.append(subscription)
// 36
subscriber.receive(subscription: subscription)

Subscribing to the upstream publisher and handling its inputs

You are now ready to subscribe to the upstream publisher. You only need to do it once: When you receive your first subscriber.

// 37
guard subscriptions.count == 1 else { return }

let sink = AnySubscriber(
  // 38
  receiveSubscription: { subscription in
    subscription.request(.unlimited)
  },
  // 39
  receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
    self?.relay(value)
    return .none
  },
  // 40
  receiveCompletion: { [weak self] in
    self?.complete($0)
  }
)
upstream.subscribe(sink)

Adding a convenience operator

Your publisher is complete! Of course, you’ll want one more thing: A convenience operator to help chain this new publisher.

extension Publisher {
  func shareReplay(capacity: Int = .max)
    -> Publishers.ShareReplay<Self> {
    return Publishers.ShareReplay(upstream: self,
                                  capacity: capacity)
  }
}

Testing your subscription

Add this code to the end of your playground to test your new operator:

// 41
var logger = TimeLogger(sinceOrigin: true)
// 42
let subject = PassthroughSubject<Int,Never>()
// 43
let publisher = subject.shareReplay(capacity: 2)
// 44
subject.send(0)
let subscription1 = publisher.sink(
  receiveCompletion: {
    print("subscription2 completed: \($0)", to: &logger)
  },
  receiveValue: {
    print("subscription2 received \($0)", to: &logger)
  }
)

subject.send(1)
subject.send(2)
subject.send(3)
let subscription2 = publisher.sink(
  receiveCompletion: {
    print("subscription2 completed: \($0)", to: &logger)
  },
  receiveValue: {
    print("subscription2 received \($0)", to: &logger)
  }
)

subject.send(4)
subject.send(5)
subject.send(completion: .finished)
var subscription3: Cancellable? = nil

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
  print("Subscribing to shareReplay after upstream completed")
  subscription3 = publisher.sink(
    receiveCompletion: {
      print("subscription3 completed: \($0)", to: &logger)
    },
    receiveValue: {
      print("subscription3 received \($0)", to: &logger)
    }
  )
}
+0.02967s: subscription1 received 1
+0.03092s: subscription1 received 2
+0.03189s: subscription1 received 3
+0.03309s: subscription2 received 2
+0.03317s: subscription2 received 3
+0.03371s: subscription1 received 4
+0.03401s: subscription2 received 4
+0.03515s: subscription1 received 5
+0.03548s: subscription2 received 5
+0.03716s: subscription1 completed: finished
+0.03746s: subscription2 completed: finished
Subscribing to shareReplay after upstream completed
+1.12007s: subscription3 received 4
+1.12015s: subscription3 received 5
+1.12057s: subscription3 completed: finished

Verifying your subscription

Fantastic! This works exactly as you wanted. Or does it? How can you verify that the publisher is being subscribed to only once? By using the print(_:) operator, of course! You can try it by inserting it before shareReplay.

let publisher = subject.shareReplay(capacity: 2)
let publisher = subject
  .print("shareReplay")
  .shareReplay(capacity: 2)
shareReplay: receive subscription: (PassthroughSubject)
shareReplay: request unlimited
shareReplay: receive value: (1)
+0.03004s: subscription1 received 1
shareReplay: receive value: (2)
+0.03146s: subscription1 received 2
shareReplay: receive value: (3)
+0.03239s: subscription1 received 3
+0.03364s: subscription2 received 2
+0.03374s: subscription2 received 3
shareReplay: receive value: (4)
+0.03439s: subscription1 received 4
+0.03471s: subscription2 received 4
shareReplay: receive value: (5)
+0.03577s: subscription1 received 5
+0.03609s: subscription2 received 5
shareReplay: receive finished
+0.03759s: subscription1 received completion: finished
+0.03788s: subscription2 received completion: finished
Subscribing to shareReplay after upstream completed
+1.11936s: subscription3 received 4
+1.11945s: subscription3 received 5
+1.11985s: subscription3 received completion: finished

Handling backpressure

In fluid dynamics, backpressure is a resistance or force opposing the desired flow of fluid through pipes. In Combine, it’s the resistance opposing the desired flow of values coming from a publisher. But what is this resistance? Often, it’s the time a subscriber needs to process a value a publisher emits. Some examples are:

Using a pausable sink to handle backpressure

To get started, switch to the PausableSink page of the playground.

protocol Pausable {
  var paused: Bool { get }
  func resume()
}
// 1
final class PausableSubscriber<Input, Failure: Error>:
  Subscriber, Pausable, Cancellable {
  // 2
  let combineIdentifier = CombineIdentifier()
}
// 3
let receiveValue: (Input) -> Bool
// 4
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

// 5
private var subscription: Subscription? = nil
// 6
var paused = false
// 7
init(receiveValue: @escaping (Input) -> Bool,
     receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void) {
  self.receiveValue = receiveValue
  self.receiveCompletion = receiveCompletion
}

// 8
func cancel() {
  subscription?.cancel()
  subscription = nil
}
func receive(subscription: Subscription) {
  // 9
  self.subscription = subscription
  // 10
  subscription.request(.max(1))
}

func receive(_ input: Input) -> Subscribers.Demand {
  // 11
  paused = receiveValue(input) == false
  // 12
  return paused ? .none : .max(1)
}

func receive(completion: Subscribers.Completion<Failure>) {
  // 13
  receiveCompletion(completion)
  subscription = nil
}
func resume() {
  guard paused else { return }

  paused = false
  // 14
  subscription?.request(.max(1))
}
extension Publisher {
  // 15
  func pausableSink(
    receiveCompletion: @escaping ((Subscribers.Completion<Failure>) -> Void),
    receiveValue: @escaping ((Output) -> Bool))
    -> Pausable & Cancellable {
    // 16
    let pausable = PausableSubscriber(
      receiveValue: receiveValue,
      receiveCompletion: receiveCompletion)
    self.subscribe(pausable)
    // 17
    return pausable
  }
}

Testing your new sink

You can now try your new sink! To make things simple, simulate cases where the publisher should stop sending values. Add this code:

let subscription = [1, 2, 3, 4, 5, 6]
  .publisher
  .pausableSink(receiveCompletion: { completion in
    print("Pausable subscription completed: \(completion)")
  }) { value -> Bool in
    print("Receive value: \(value)")
    if value % 2 == 1 {
      print("Pausing")
      return false
    }
    return true
}
Receive value: 1
Pausing
let timer = Timer.publish(every: 1, on: .main, in: .common)
  .autoconnect()
  .sink { _ in
    guard subscription.paused else { return }
    print("Subscription is paused, resuming")
    subscription.resume()
  }
Receive value: 1
Pausing
Subscription is paused, resuming
Receive value: 2
Receive value: 3
Pausing
Subscription is paused, resuming
Receive value: 4
Receive value: 5
Pausing
Subscription is paused, resuming
Receive value: 6
Pausable subscription completed: finished

Key points

Wow, this was a long and complex chapter! You learned a lot about publishers:

Where to go from here?

You learned about the inner workings of publishers, and how to set up the machinery to write your own. Of course, any code you write — and publishers in particular! — should be thoroughly tested. Move on to the next chapter to learn all about testing Combine code!

Have a technical question? Want to report a bug? You can ask questions and report bugs to the book authors in our official book forum here.

Have feedback to share about the online reading experience? If you have feedback about the UI, UX, highlighting, or other features of our online readers, you can send them to the design team with the form below:

© 2020 Razeware LLC

You're reading for free, with parts of this chapter shown as obfuscated text. Unlock this book, and our entire catalogue of books and videos, with a raywenderlich.com Professional subscription.

Unlock Now

To highlight or take notes, you’ll need to own this book in a subscription or purchased by itself.