Logo for tanaschita.com

Backpressure in Combine

Learn what backpressure is and how to apply it in Combine.

06 Dec 2021 · 8 min read

When working with subscribers in Combine, we mostly use sink(receiveValue:) and assign(to:on:) to receive an unlimited amount of values from a publisher.

There may be some cases though, where processing those received values takes longer while new values arrive. In this case, we may need to control the amount of values that arrive to avoid some kind of blocking or overflow.

This concept of limiting the elements the subscriber receives is called backpressure.

Examples of backpressure

Backpressure issues might come up in cases, where processing values takes time like reading and writing data, server communication or rendering.

In real-time server-to-client communication for example, when a WebSocket is emitting many values per second, we may want to apply some kind of backpressure that does not update the UI with each incoming value. We could for example buffer the incoming values into an array and rerender on our own schedule.

Sponsorship logo
Capture HTTP(s) traffic with Proxyman
Proxyman - Your ultimate man-in-the-middle proxy to effortlessly capture, inspect, and manipulate HTTP(s) traffic on macOS, Windows, iOS, and Android devices.
Get started for free

Backpressure strategies

In general, there are different ways we can apply backpressure in Combine:

  • Buffer i.e. accumulate incoming values temporarily
  • Drop i.e. skipping some of the incoming values

Both strategies can be implemented in different ways, either by creating a custom subscriber or by using Combine’s buffering or timing operators.

Creating a custom subscriber to apply back pressure

A simple custom subscriber that applies back pressure could look like this:

class CountSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
var subscription: Subscription?
// 1.
func receive(subscription: Subscription) {
print("subscribed")
self.subscription = subscription
subscription.request(.max(1))
}
// 2.
func receive(_ input: Int) -> Subscribers.Demand {
print("received \(input)")
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
self.subscription?.request(.max(2))
}
return Subscribers.Demand.none
}
// 3.
func receive(completion: Subscribers.Completion<Never>) {
}
}

Let's go though it step by step:

  1. The receive(subscription:) method gets called as soon as a new subscription arrives. Here, we request one element from the publisher.

  2. Since we requested one element from the publisher, the receive(_ input:) method gets called as soon as it arrives. Here, we request additional 2 values after one second.

  3. The receive(completion:) method gets called when the publisher completes.

Now let's use the subscriber.

let subject = PassthroughSubject<Int, Never>()
let subscriber = CountSubscriber()
subject.subscribe(subscriber)
subject.send(1)
subject.send(2)
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
self.subject.send(3)
self.subject.send(4)
self.subject.send(5)
}

The output produced by the code above is:

subscribed
received 1
received 3
received 4

Since we configured the custom subscriber to receive only 1 value at first and then 2 additional values after 1 second, this is the expected output.

Using Combine's buffering and timing operators

Combine also provides operators to apply back pressure that we can attach to a non-custom subscriber that uses sink(receiveValue:) or assign(to:on:) to subscribe.

The debounce(for:scheduler:options:) operator

The debounce(for:scheduler:options:) operator lets us publish elements only after a specified time interval elapses.

cancellable = subject
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { value in
print("received \(value)")
}
subject.send(1)
subject.send(2)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
self.subject.send(3)
self.subject.send(4)
}

The output of the code above is

received 2
received 4

Since we specified a debounce of 0.5 seconds, the values 1 and 3 are dropped and we only receive the values 2 and 4.

The throttle(for:scheduler:latest:) operator

The throttle(for:scheduler:latest:) operator returns a publisher that produces one element per specified interval. If it receives multiple elements during that interval, it sends only the newest or oldest.

cancellable = Timer.publish(every: 1.0, on: .main, in: .default)
.autoconnect()
.throttle(for: 3.0, scheduler: RunLoop.main, latest: true)
.sink{ date in
print("received \(date)")
}

The output of the code above is as follows:

received 2021-11-10 18:36:16 +0000
received 2021-11-10 18:36:19 +0000
received 2021-11-10 18:36:22 +0000
received 2021-11-10 18:36:25 +0000

As expected, we receive a date value every 3 seconds.

The collect(_:) operator

The collect(_:) and collect(_:options:) operator returns a publisher that bundles elements to an array that are emitted in a given count or time interval.

cancellable = subject
.collect(2)
.sink { value in
print("received \(value)")
}
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(4)

Output:

received [1, 2]
received [3, 4]

This option is useful if the subscriber can process multiple elements at the same time.

Sponsorship logo
Capture HTTP(s) traffic with Proxyman
Proxyman - Your ultimate man-in-the-middle proxy to effortlessly capture, inspect, and manipulate HTTP(s) traffic on macOS, Windows, iOS, and Android devices.
Get started for free

Newsletter

Image of a reading marmot
Subscribe

Like to support my work?

Say hi

Related tags

Articles with related topics

combine

swift

ios

How to use a Connectable publisher in Combine

Learn how to avoid data loss by using Combine's ConnectablePublisher.

13 Dec 2021 · 2 min read

Latest articles and tips

© 2023 tanaschita.com

Privacy policy

Impressum