RxSwift 核心实现原理
一直以来,响应式编程都是业界讨论的热门话题之一。为了推广响应式编程,ReactiveX 社区几乎为每一种编程语言设计实现了一种对应的响应式编程框架。RxSwift 就是针对 Swift 所开发的响应式框架。
关于 RxSwift,网上有不少相关的学习资料,但绝大多数都是 RxSwift 的使用说明,鲜有文章介绍 RxSwift 背后的设计原理。通过阅读源码,查阅资料,正向设计,我逐步理解了 RxSwift 的设计思想。因此,趁热打铁,记录并总结一下我的理解。
下文我们首先介绍一下 RxSwift 中所涉及的基本概念。然后,从零开始设计并实现 RxSwift,从而逐步理解 RxSwift 的设计理念。
本文所实现的 RxSwift 代码已在 Github 开源——传送门。参照源码阅读本文效果更佳。
基础
RxSwift 主要蕴含了以下几种设计思想:
- 发布-订阅模式
- 流编程
- 函数式编程
下面,我们依次来进行介绍。
发布-订阅模式
发布-订阅模式 是 RxSwift 所呈现的一种最直观思想。发布-订阅模式可以分为两个角色:发布者、订阅者。
订阅者的主要职责是:
- 订阅:监听并处理某个事件。其本质就是向发布者注册一个处理某个事件的闭包。
- 取消(订阅)
发布者的主要职责是:
- 发布:分发某个事件
发布-订阅模式的基本原理是:
- 订阅者调用发布者提供的订阅方法进行订阅,从而在发布者内部注册订阅者。
- 发布者内部会维护一个订阅者列表。
- 当发布者发布事件时,会遍历订阅者列表,执行其中的处理方法(将事件作为参数传递给闭包,并执行)。
举例
在日常开发中,发布-订阅模式是一种广泛被应用的设计模式。比如:iOS 中的 NotificationCenter、Flutter 中的事件总线都是基于这种模式实现的。如下所示为 Flutter 中事件总线的一种实现方式,其中的代码逻辑基本遵循了上述所描述的发布-订阅模式的基本原理。
1 | // 订阅者订阅内容签名 |
思考:为什么 iOS 中一个监听了某个通知的类必须要在
dealloc
时要执行removeObserver:
方法?
流编程
流(stream) 是 RxSwift
另一个重要的设计思想。Observable<T>
是 Rx
框架的基础,也被称为
可观察序列。它的作用是可以异步地产生一系列数据,即一个
Observable<T>
对象会随着时间的推移不定期地发出
event(element: T)
。数据就像水流一样持续不断地在流动,顾名思义,这也被称为
流编程。
关于流编程,《计算机程序的构造与解释》一书中认为 流编程是一种调用驱动的编程思想。流编程的基本思想是:一般情况下,只是部分地构造出流的结构,并将这样的部分结构传给使用流的程序。如果使用者需要访问这个流中未构造出的那个部分,那么这个流就会自动地继续构造下去,但是只做出满足当时需要的那一部分。
如下所示是 RxSwift 中常见使用形式,observable
是数据源,不断地发出数据,如果水流一样,最终流向 subscribe
中的闭包。期间会流经 map
,filter
等操作符,经过转换或过滤。这就是流编程的思想。
1 | let observable: Observable<Int> = Observable.create { (observer) -> Disposable in |
流编程底层实现的本质则是
闭包的延迟执行和强制执行。具体来说是基于一种称为
delay
的特殊形式,对于 delay <exp>
的求值不会对 <exp>
求值,而是返回一个称为
延时对象 的对象。它可以看做是对未来的某个时间求值
<exp>
的允诺。在各类编程语言中,返回特定类型的闭包常被用于描述一个延迟对象。与
delay
配对的是 force
的过程。它以一个延时对象为参数,执行相应的求值工作,即迫使
delay
完成其所允诺的求值。在各类编程语言中,执行返回特定类型的闭包常被用于描述
force
的过程。
函数式编程
RxSwift 提供了大量无副作用的操作符,无副作用也是函数式编程的一种重要特性。RxSwift 能够实现操作符的链式调用,一个重要的前提是:提供操作符的类型和操作符的返回类型必须保持一致。这里就涉及到了函数式编程中的一些高阶概念:函子(Functor)、适用函子(Applicative)、单子(Monad)。详细内容可参见《函数式编程——Functor、Applicative、Monad》一文。
不过,RxSwift 操作符中运用最多的是 函子(Functor),什么是函子?简而言之,函子能够将普通函数应用到一个包装类型。如下图及代码所示,一个包装类型包含了一个原始值,当函子作用在其上后,使用普通函数对原始值进行转换,最终将结果值放入包装类型中返回。
1 | extension Result { |
以 RxSwift 中最常用的 map
操作符为例,如下所示。map
方法扩展自
ObservableType
,其能够将普通函数
(Element) -> Result
应用到 ObservableType
包装类型。这其实就是一种典型的 函子 应用。
1 | extension ObservableType { |
核心实现
下面我们将以正向设计的方式,结合 RxSwift 中的设计思想,手动实现一个 RxSwift 的核心部分。
基本功能(RxDemo-01)
首先,我们来定义事件,RxSwift 中有三种类型的事件,如下所示:
1 | // MARK: - Event |
其次,我们来定义订阅者,在 RxDemo-01
中,我们先忽略
取消订阅
的功能,如下所示。订阅者必须遵循订阅者协议,需要实现
监听事件 的方法
on(event: Event<Element>)
。订阅者内部维护一个处理事件的闭包
_handler
。当监听事件方法触发时,会立即执行处理事件闭包。
1 | // MARK: - Observer |
最后,我们来定义发布者,如下所示。发布者必须遵循发布者协议,需要实现
订阅操作 的方法
subscribe<O: ObserverType>(observer: O) where O.Element == Element
。发布者内部维护一个发布事件的闭包
_eventGenerator
。当订阅发生时(订阅操作方法被执行时),会立即执行发布事件的闭包。
1 | // MARK: - Observable |
接下来,我们来试用一下 RxDemo-01 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式基本吻合。
1 | // MARK: - Test |
下图所示为 RxDemo-01 实现的 RxSwift 的内部调用关系。通过闭包实现将
observer
传递给
observable
,在发布事件时能够将事件传递给
observer
,从而形成一条看似自左向右流动的数据流。
取消订阅(RxDemo-02)
RxDemo-01 有一个明显的缺陷——无法取消订阅。我们来参考 RxSwift
的实现,它
并不是直接让订阅者支持取消订阅,而是通过一个第三方类型
Disposable
对订阅进行管理。Disposable
的核心作用是 提供一个状态位标识订阅是否已经取消。
关于由第三方类来管理订阅,而不是让订阅者自己管理的原因,我猜测有两个:一是出于职责单一的原则;二是为了支持函数式编程,抽取一个第三方类型作为返回值,从而在链式调用时保持类型一致。
Disposable
协议要求所有的 Disposable
类型都实现 dispose
方法,表示取消订阅。
1 | protocol Disposable { |
这里我们定义两个遵循 Dispoable
协议的类型:AnonymousDisposable
和
CompositeDisposable
。
AnonymousDisposable
作为一个匿名
Disposable
,在本例中作为最底层的 Disposable
并没有什么作用,只是为了实现模式统一而已。
1 | class AnonymousDisposable: Disposable { |
CompositeDisposable
作为一个可管理多个
Disposable
的容器,它内部维持一个标志位表示订阅是否被取消。CompositeDisposable
所实现的 dispose
方法真正改变了标志位,并对其所维护的所有
Disposable
执行各自的 dispose
方法,从而完成它们定义的在取消订阅时需要执行的附带操作。
1 | class CompositeDisposable: Disposable { |
很显然,这里执行 dispose
操作只是修改了状态,并没有释放订阅资源。只有当
CompositeDisposable
对象被释放后才算真正释放资源。在原版
RxSwift 中,DisposeBag
差不多就是
CompositeDisposable
。这样也是为什么我们要把订阅交给
DisposeBag
来进行管理,DisposeBag
作为某个对象的属性,会随着对象的释放,从而自动释放真正的订阅资源。
为了支持 Disposable
,我们需要在 RxDemo-01
的基础上稍作修改即可,主要是修改发布者 Observable
。
1 | // MARK: - Observable |
接下来,我们来试用一下 RxDemo-02 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式进一步吻合。这里,我们实现了取消订阅的功能。
1 | // MARK: - Test |
RxDemo-02 的核心思想是在 RxDemo-01 的基础上对事件进行拦截和过滤,如下图所示。
具体的实现方式如下所示,通过 CompositeDisposable
管理
AnonymousDisposable
(原始 subscribe
中的闭包执行后的返回类型)。同时,在执行发布事件时,使用一个中间
Observer 接收原始事件,中间 Observer 引用外部
CompositeDisposable
的状态决定是否将事件发送给原始
Observer。
使用 CompositeDisposable
的本质就是添加了一个中间层来解决管理订阅的问题。
结构优化(RxDemo-03)
在 RxDemo-02 中,AnonymousObserver
引用了外部的
CompositeDisposable
中的订阅状态,从而决定事件的传递方向。这种代码逻辑由内而外实现,并不是很直观。
为了更加清晰地描述这个事件流动方向,RxDemo-03
通过增加一个中间层,将原始 observer、中间
observer、事件转发逻辑聚合在同一层级下,让代码具有更好的可读性。这里我们实现一个遵循
Disposable
协议的 Sink
类型。Sink
是 水槽
的意思,象征着这里我们通过它来控制事件的流动方向,暗示了这个类的作用。
1 | class Sink<O: ObserverType>: Disposable { |
在定义了 Sink
之后,我们就可以简化
Observable
中 subscribe
方法的具体实现。
1 | class Observable<Element>: ObservableType { |
SinkDisposable
引用了原始的事件发生器,并定义一个中间
Observer 转入至原始事件发生器,从而让中间 observer
接收原始事件。除此之外,SinkDisposable
还引用了原始
observer,当中间 observer
处理原始事件时,会判断订阅是否已经取消,从而决定是否将原始事件转发给原始
observer。此时,RxDemo-03 实现的 RxSwift 的内部调用关系如下所示。
操作符(RxDemo-04)
下面,我们来实现操作符,RxSwift
中包含了大量的操作符,它们基本上都是对函数式编程中
函子、单子 等进行了应用。我们以
map
为例进行介绍。
如下所示,map
方法能够将一个普通函数应用到包装类型
ObservableType
上。map
方法最终返回一个
Observable<Result>
类型(同样遵循
ObservableType
协议),因此能够完美支持链式操作。
1 | extension ObservableType { |
接下来,我们来试用一下 RxDemo-04 所实现的 RxSwift。这里,我们实现了
map
操作符的功能。
1 | // MARK: - Test |
RxDemo-04
中由于增加了操作符功能,其内部的调用关系也发生了变化。特别是,发布事件和过滤事件的逻辑,很显然,每增加一个操作符,就会增加一个
SinkDisposable
中间层,调用栈也会更深。
如果我们仔细分析 RxDemo-04,其实我们可以发现内部隐藏了如下所示的调用关系链:observable -> MapObserver -> MapObservable -> observer。实际执行时,调用关系如下所示:
- observable 调用 MapObserver.on 方法,将原始事件传递给 MapObserver;
- MapObserver 使用 map 方法将原始事件转换成 map 事件(即 map 后的数据),作为 MapObservable 发出的事件;
- MapObservable 调用 observer.on 方法,将 map 事件传给 observer。
分类细化(RxDemo-05)
在 RxDemo-04 中,为了增加 map
操作符,对
ObservableType
进行了扩展,其本质就是在原始的 Observable 和
Observer 之间插入了一个 MapObserver 和一个
MapObservable。本节,我们继续进行优化,对中间类也进行细分和定义。
在 RxDemo-04 中,Sink
的主要作用是
根据订阅是否取消决定是否拦截事件的传递。这里我们可能会想到:中间订阅者(如:MapObserver)本身是不是就应该具备
Sink
的这种功能呢?事实上,RxSwift 就是让 Sink
作为所有 Observer 的基类。
对于操作符,我们可以实现上述的模式;但是,对于不带操作符的情况,该如何处理呢?为了实现模式的统一,我们可以认为不带操作符的情况等同于带了
只返回原值的匿名操作符,即等同于
map { $0 }
。针对此情况,我们也需要定义两个类
AnonymousObserver
和 AnonymousObservable
。
下面,我们来进行优化改进。
首先,我们将原来的 Observable<Element>
改成抽象基类。
1 | // 抽象类 |
其次,我们再一定一个新的类 Producer<Element>
代替
Observable<Element>
。Producer<Element>
继承自
Observable<Element>
,作为发布者的基类,该类内部没有时间生成器
eventGenerator
的闭包,由子类选择性进行定义。
1
2
3
4
5
6
7
8
9
10class Producer<Element>: Observable<Element> {
// 实现 订阅操作 的协议,内部生成事件
override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
return run(observer: observer)
}
func run<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}
然后,我们来对 Sink
进行修改。在 RxDemo-04
中,Sink
即提供了事件生成的功能和事件转发的功能。这里,我们让 Sink
的职责更加单一,仅仅是提供事件转发的功能。修改结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _composite = CompositeDisposable()
init(forward: O) {
_forward = forward
}
func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件传递给原始 observer
_forward.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}
func dispose() {
_disposed = true
print("dispose execute")
_composite.dispose()
}
}
为了便于泛型类型的转换,我们给 ObservableType
协议增加一个方法,并由 Observable<Element>
予以实现。
1 | protocol ObservableType { |
1 | class Observable<Element>: ObservableType { |
接下来,我们来分别实现 Sink
的子类
AnonymousObserver
和 MapObserver
以及
Producer
的子类 AnonymousObservable
和
MapObservable
。
1 | class AnonymousObserver<O: ObserverType>: Sink<O>, ObserverType { |
1 | class MapObserver<Source, Result, O: ObserverType>: Sink<O>, ObserverType { |
现在我们再来看发布者、操作符,其实两者的本质都是一样,都是创建了一个发布者。对此,我们采用扩展的方式来提供相应的方法。如下所示:
1
2
3
4
5
6
7
8
9
10
11extension ObservableType {
static func create(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(eventGenerator: eventGenerator)
}
}
extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return MapObservable(source: self.asObservable(), transform: transform)
}
}
最后,我们再来验证一下实现结果,如下所示。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// MARK: - Test
let observable = Observable<Int>.create { (observer) -> Disposable in // observer 为 MapObserver
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.map { $0 * 2 }.map { $0 + 1 }.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 1
// send 1
// recive 3
// send 2
// recive 5
// send 3
// recive 7
// dispose execute
// send completed
从运行结果而言,RxDemo-05
基本实现了分类细化,并且达到了取消订阅的功能。此时,上述例子中实际的订阅关系如下所示。相比
RxDemo-04,多了 AnonymousObserver
和
AnonymousObservable
,但是整体的内部订阅关系链更加清晰了。
此时,我们再来对照一下 RxSwift 和 RxDemo-05 中类的定义,如下表所示。各个类的功能基本相同,整体结构也是大同小异,只是在类名上略有差异。
RxDemo-05 | Producer | Sink | AnonymousObserver | AnonymousObservable | MapObserver | MapObservable |
---|---|---|---|---|---|---|
RxSwift | Producer | Sink | AnonymousObservableSink | AnonymousObservable | Map | MapSink |
订阅管理(RxDemo-06)
如果,我们仔细对比,可以发现 RxSwift 中 Producer
的
subscribe
方法内部与 RxDemo-05
还是不太一样,前者内部还引用了一个 SinkDisposer
类。其作用是什么呢?事实上,其主要作用是管理
disposeHandler
。细心的同学可能会发现 RxDemo-05 中有个
BUG:取消订阅时没有执行 print("dispose")
闭包。
对此,我们也可以用类似的方式来解决,通过增加一个 Diposer
类来进行管理。具体代码见:RxDemo-06。
当订阅发生时(即执行 subscribe
方法时),内部会产生一个递归的控制流,如下图所示。
通过递归返回的方式构建整个订阅管理关系链,如下图所示。diposer0
是 subscribe
方法最终返回的 Disposable
对象。当我们对 disposer0
执行 dispose
方法时,内部会递归地执行 dispose
方法,最终取消订阅链中所有的订阅。
注意:这里面会有循环引用,如 MapObserver
内部又引用了
Disposer0
,RxDemo-06 以及 RxSwift 中的处理是给
Disposer
类内部添加一个状态,表示是否已经取消了订阅,从而避免循环引用导致的循环调用。这里的循环引用并不一定是坏事,它在下述的场景下起到了非常关键的作用。
当事件中出现一个 complete
或 error
事件时,由于事件会依次传递至 Observer
,最后一次传递时,即
MapObserver
进行传递时,会判断是否是 complete
或 error
事件,从而决定是否执行 dispose
方法。当 MapObserver
执行 dipose
方法时,会通过上述的循环引用,调用 Disposer0
执行
dispose
方法,从而实现整体取消订阅。
总结
本文通过逐步实现 RxSwift 核心部分中的功能,一窥其背后的设计思路。从中我们也看到了其对函数式编程的应用,以及其所呈现出来的流编程模式的底层实现原理。
后续,我们将进一步探索原版 RxSwift 中其他的一些内容。