学计算机的那个

不是我觉到、悟到,你给不了我,给了也拿不住;只有我觉到、悟到,才有可能做到,能做到的才是我的.

0%

Building a Custom Combine Operator for Exponential Backoff[译]

Make your Combine code reusable

在上一篇文章中,我向你展示了如何使用Combine来改善Combine管道的错误处理,并以一种对用户有意义的方式暴露SwiftUI应用中的错误。

不出所料,我们最终得到的代码看起来比开始时要复杂一些。正确处理错误比完全不处理错误(或忽略错误)占用更多的代码行。

但是我们可以做得更好!

在这篇文章中,你将了解Combine操作符:它们是什么,它们是如何工作的,以及如何将我们的代码重构成一个自定义的Combine操作符,从而使其更容易推理(reason about),同时更易于重用。

What is a Combine Operator?

Combine定义了三个主要概念来实现响应式编程的思想:

  1. Publishers
  2. Subscribers
  3. Operators

发布者随着时间的推移交付values ,订阅者在收到这些values 时采取行动。Operators位于发布者和订阅者之间,可以用来操纵(manipulate)数据流(stream of values)。

我们需要操作符有以下几个原因:

  • 发布者并不总是以订阅者所需的格式生成事件。例如,发布者可能会发出HTTP网络请求的结果,但订阅者需要自定义数据结构。在这种情况下,可以使用mapdecode等操作符将发布者的输出转换为订阅者期望的数据结构。

  • 发布者可能会生成订阅者不感兴趣的事件。例如,当输入一个搜索词时,我们可能对每一次击键都不感兴趣,而只对最后的搜索词感兴趣。在这种情况下,我们可以使用debouncethrottle之类的操作符来减少订阅者必须处理的事件数量。

操作符帮助我们获取发布者产生的输出,并将其转化为订阅者可以消费的东西。在前面的章节中,我们已经使用了一些内置操作符,例如:

  • map(以及它的朋友,tryMap)来转换元素
  • debounce只在两个事件之间暂停之后才发布元素
  • removeduplates删除重复事件
  • flatMap将元素转换为新的发布者

How can we implement a custom operator?

通常,在创建Combine管道时,我们将从一个发布者开始,然后连接一堆Combine的内置操作符来处理发布者发出的事件。任何Combine管道的末端都是接收事件的订阅者。正如你在上一篇文章中看到的,管道很快就会变得复杂。

从技术上讲,操作符只是创建其他发布者和订阅者的函数,这些发布者和订阅者处理从上游发布者接收到的事件

这意味着,我们可以通过使用返回发布者(或订阅者)的函数来扩展Publisher来创建自己的自定义操作符,该发布者(或订阅者)对从使用它的发布者接收到的事件进行操作。

让我们通过实现一个简单的操作符来看看这在实践中意味着什么,这个操作符允许我们使用Swift的dump()函数来检查merge管道中的事件。这个函数将变量的内容打印到控制台,并以嵌套树的形式显示变量的结构——类似于Xcode中的调试检查器。

现在,您可能知道Combineprint()操作符,它的工作原理非常相似。但是,它没有提供足够的细节,更重要的是,它没有将结果显示为嵌套结构。

要添加操作符,首先需要向Publisher类型添加扩展。由于我们不想操作此操作符接收到的事件,因此我们也可以使用上游发布者的类型作为结果类型,并返回AnypPublisher<Self.Output, Self.Failure>作为结果类型:

1
2
3
4
extension Publisher {
func dump() -> AnyPublisher<Self.Output, Self.Failure> {
}
}

在函数内部,我们可以使用handleEvents操作符检查该管道处理的任何事件。handleEvents有一堆可选闭包,当发布者接收到新的订阅、新的输出值、取消事件、完成订阅或订阅者请求更多元素时,会调用这些闭包。由于我们只对新的Output值感兴趣,因此可以忽略大多数闭包,只实现receiveOutput闭包。

每当我们接收到一个值时,我们将使用Swift的dump()函数将值的内容打印到控制台:

1
2
3
4
5
6
7
8
extension Publisher {
func dump() -> AnyPublisher<Self.Output, Self.Failure> {
handleEvents(receiveOutput: { value in
Swift.dump(value)
})
.eraseToAnyPublisher()
}
}

可以像使用Combine的任何内置操作符一样使用这个操作符。在下面的示例中,我们将新操作符附加到一个发出当前日期的简单发布者:

1
2
3
4
5
6
7
Just(Date())
.dump()

// prints:

2022-03-02 09:38:49 +0000
- timeIntervalSinceReferenceDate: 667906729.659255

Implementing a retry operator with a delay

既然我们已经对如何实现基本运算符有了基本的了解,那么让我们看看是否可以重构上一集的代码。以下是相关部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
return dataTaskPublisher
.tryCatch { error -> AnyPublisher<(data: Data, response: URLResponse), Error> in
if case APIError.serverError = error {
return Just(Void())
.delay(for: 3, scheduler: DispatchQueue.global())
.flatMap { _ in
return dataTaskPublisher
}
.print("before retry")
.retry(10)
.eraseToAnyPublisher()
}
throw error
}
.map(\.data)

让我们首先为Publisher上的重试操作符构造一个重载扩展:

1
2
3
4
5
6
7
extension Publisher {
func retry<T, E>(_ retries: Int, withDelay delay: Int)
-> Publishers.TryCatch<Self, AnyPublisher<T, E>>
where T == Self.Output, E == Self.Failure
{
}
}

这定义了两个输入参数,retrieswithDelay,我们可以使用它们来指定应该重试多少次上游发布者,以及每次重试之间应该剩下多少时间(以秒为单位)。

由于要在新操作符中使用tryCatch操作符,因此需要使用它的发布者类型Publishers。将TryCatch作为返回类型。

有了这一点,我们现在可以通过粘贴现有的实现来实现操作符的主体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
extension Publisher {
func retry<T, E>(_ retries: Int, withDelay delay: Int)
-> Publishers.TryCatch<Self, AnyPublisher<T, E>>
where T == Self.Output, E == Self.Failure
{
return self.tryCatch { error -> AnyPublisher<T, E> in
return Just(Void())
.delay(for: .init(integerLiteral: delay), scheduler: DispatchQueue.global())
.flatMap { _ in
return self
}
.retry(retries)
.eraseToAnyPublisher()
}
}
}

您可能已经注意到我们删除了错误检查。这是因为APIError是特定于我们的应用程序的错误类型。因为我们有兴趣让这个实现也可以在其他应用程序中使用,让我们看看如何使它更灵活。

Conditionally retrying

为了使此代码在其他上下文中可重用,让我们为末尾闭包(trailing closure)添加一个参数,调用者可以使用该参数来控制操作符是否应该重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func retry<T, E>(_ retries: Int, withDelay delay: Int, condition: ((E) -> Bool)? = nil) -> Publishers.TryCatch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure {
return self.tryCatch { error -> AnyPublisher<T, E> in
if condition?(error) == true {
return Just(Void())
.delay(for: .init(integerLiteral: delay), scheduler: DispatchQueue.global())
.flatMap { _ in
return self
}
.retry(retries)
.eraseToAnyPublisher()
}
else {
throw error
}
}
}

如果调用者没有提供闭包,操作符将使用参数重试和延迟重试。

有了这个,我们可以简化原来的调用:

1
2
3
4
5
6
7
8
9
10
// ...
return dataTaskPublisher
.retry(10, withDelay: 3) { error in
if case APIError.serverError = error {
return true
}
return false
}
.map(\.data)
// ...

Implementing a retry operator for exponential backoff

现在,让我们更进一步,实现一个带有指数回退(exponential back-off)的重试操作符。

指数回退通常用作网络服务等计算机系统中速率限制机制的一部分,以帮助强制公平分配对资源的访问并防止网络拥塞。(维基百科)

为了增加两个请求之间的延迟,我们引入了一个保存当前间隔的局部变量,并在每个请求之后将其加倍。为了实现这一点,我们需要将启动原始管道的内部管道包装在一个增加backoff变量的管道中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func retry<T, E>(_ retries: Int, 
withBackoff initialBackoff: Int,
condition: ((E) -> Bool)? = nil)
-> Publishers.TryCatch<Self, AnyPublisher<T, E>>
where T == Self.Output, E == Self.Failure
{
return self.tryCatch { error -> AnyPublisher<T, E> in
if condition?(error) ?? true {
var backOff = initialBackoff
return Just(Void())
.flatMap { _ -> AnyPublisher<T, E> in
let result = Just(Void())
.delay(for: .init(integerLiteral: backOff), scheduler: DispatchQueue.global())
.flatMap { _ in
return self
}
backOff = backOff * 2
return result.eraseToAnyPublisher()
}
.retry(retries - 1)
.eraseToAnyPublisher()
}
else {
throw error
}
}
}

为了仅对某些类型的错误使用指数回退,我们可以实现闭包来检查错误,就像以前一样。这里是一个代码片段,展示了如何使用增量后退与3秒的初始间隔为任何APIError.serverError:

1
2
3
4
5
6
7
8
9
10
return dataTaskPublisher
.retry(2, withBackoff: 3) { error in
if case APIError.serverError(_, _, _) = error {
return true
}
else {
return false
}
}
// ...

为了使用指数回退而不考虑错误,这变得更加紧凑(compact):

1
2
3
return dataTaskPublisher
.retry(2, withIncrementalBackoff: 3)
// ...

Closure

Combine是一个非常强大的框架,它允许我们为应用程序组合非常高效的数据和事件处理管道。

有时,这种能力是有代价的:在上一节中,我们构建了一个强大的错误处理管道,使我们的代码看起来比原始版本更复杂,原始版本使用Combine的内置操作符来处理错误,将它们替换为完全忽略它们的默认值。

在这一集中,您看到了如何使用自定义操作符来重构代码。

由于Combine灵活的设计,创建自定义操作符不需要编写大量代码,并且有助于使我们的代码更具可读性和可重用性。

感谢阅读🔥