RxSwift 核心实现原理

一直以来,响应式编程都是业界讨论的热门话题之一。为了推广响应式编程,ReactiveX 社区几乎为每一种编程语言设计实现了一种对应的响应式编程框架。RxSwift 就是针对 Swift 所开发的响应式框架。

关于 RxSwift,网上有不少相关的学习资料,但绝大多数都是 RxSwift 的使用说明,鲜有文章介绍 RxSwift 背后的设计原理。通过阅读源码,查阅资料,正向设计,我逐步理解了 RxSwift 的设计思想。因此,趁热打铁,记录并总结一下我的理解。

下文我们首先介绍一下 RxSwift 中所涉及的基本概念。然后,从零开始设计并实现 RxSwift,从而逐步理解 RxSwift 的设计理念。

本文所实现的 RxSwift 代码已在 Github 开源——传送门。参照源码阅读本文效果更佳。

基础

RxSwift 主要蕴含了以下几种设计思想:

  • 发布-订阅模式
  • 流编程
  • 函数式编程

下面,我们依次来进行介绍。

发布-订阅模式

发布-订阅模式 是 RxSwift 所呈现的一种最直观思想。发布-订阅模式可以分为两个角色:发布者订阅者

订阅者的主要职责是:

  • 订阅:监听并处理某个事件。其本质就是向发布者注册一个处理某个事件的闭包。
  • 取消(订阅)

发布者的主要职责是:

  • 发布:分发某个事件

发布-订阅模式的基本原理是:

  • 订阅者调用发布者提供的订阅方法进行订阅,从而在发布者内部注册订阅者。
  • 发布者内部会维护一个订阅者列表。
  • 当发布者发布事件时,会遍历订阅者列表,执行其中的处理方法(将事件作为参数传递给闭包,并执行)。

举例

在日常开发中,发布-订阅模式是一种广泛被应用的设计模式。比如:iOS 中的 NotificationCenter、Flutter 中的事件总线都是基于这种模式实现的。如下所示为 Flutter 中事件总线的一种实现方式,其中的代码逻辑基本遵循了上述所描述的发布-订阅模式的基本原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 订阅者订阅内容签名
typedef void EventCallback(arg);

class EventBus {
// 私有构造函数
EventBus._internal();

// 保存单例
static EventBus _singleton = new EventBus._internal();

// 工厂构造函数
factory EventBus()=> _singleton;

// 保存事件订阅者队列,key:事件名(id),value: 对应事件的订阅者队列
var _emap = new Map<Object, List<EventCallback>>();

// 添加订阅者,即订阅
void on(eventName, EventCallback f) {
if (eventName == null || f == null) return;
_emap[eventName] ??= new List<EventCallback>();
_emap[eventName].add(f);
}

// 移除订阅者,即取消
void off(eventName, [EventCallback f]) {
var list = _emap[eventName];
if (eventName == null || list == null) return;
if (f == null) {
_emap[eventName] = null;
} else {
list.remove(f);
}
}

// 触发事件,事件触发后该事件所有订阅者会被调用,即发布
void emit(eventName, [arg]) {
var list = _emap[eventName];
if (list == null) return;
int len = list.length - 1;
// 反向遍历,防止订阅者在回调中移除自身带来的下标错位
for (var i = len; i > -1; --i) {
// 执行注册的闭包
list[i](arg);
}
}
}

// 定义一个 top-level(全局)变量,页面引入该文件后可以直接使用 bus
var bus = new EventBus();

思考:为什么 iOS 中一个监听了某个通知的类必须要在 dealloc 时要执行 removeObserver: 方法?

流编程

流(stream) 是 RxSwift 另一个重要的设计思想。Observable<T> 是 Rx 框架的基础,也被称为 可观察序列。它的作用是可以异步地产生一系列数据,即一个 Observable<T> 对象会随着时间的推移不定期地发出 event(element: T)。数据就像水流一样持续不断地在流动,顾名思义,这也被称为 流编程

关于流编程,《计算机程序的构造与解释》一书中认为 流编程是一种调用驱动的编程思想。流编程的基本思想是:一般情况下,只是部分地构造出流的结构,并将这样的部分结构传给使用流的程序。如果使用者需要访问这个流中未构造出的那个部分,那么这个流就会自动地继续构造下去,但是只做出满足当时需要的那一部分

如下所示是 RxSwift 中常见使用形式,observable 是数据源,不断地发出数据,如果水流一样,最终流向 subscribe 中的闭包。期间会流经 mapfilter 等操作符,经过转换或过滤。这就是流编程的思想。

1
2
3
4
5
6
7
8
9
10
11
12
let observable: Observable<Int> = Observable.create { (observer) -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}

observable.map { $0 + 2 }.filter { $0 > 3 }.subscribe { (event) in
...
}

流编程底层实现的本质则是 闭包的延迟执行和强制执行。具体来说是基于一种称为 delay 的特殊形式,对于 delay <exp> 的求值不会对 <exp> 求值,而是返回一个称为 延时对象 的对象。它可以看做是对未来的某个时间求值 <exp> 的允诺。在各类编程语言中,返回特定类型的闭包常被用于描述一个延迟对象。与 delay 配对的是 force 的过程。它以一个延时对象为参数,执行相应的求值工作,即迫使 delay 完成其所允诺的求值。在各类编程语言中,执行返回特定类型的闭包常被用于描述 force 的过程。

函数式编程

RxSwift 提供了大量无副作用的操作符,无副作用也是函数式编程的一种重要特性。RxSwift 能够实现操作符的链式调用,一个重要的前提是:提供操作符的类型和操作符的返回类型必须保持一致。这里就涉及到了函数式编程中的一些高阶概念:函子(Functor)适用函子(Applicative)单子(Monad)。详细内容可参见《函数式编程——Functor、Applicative、Monad》一文。

不过,RxSwift 操作符中运用最多的是 函子(Functor),什么是函子?简而言之,函子能够将普通函数应用到一个包装类型。如下图及代码所示,一个包装类型包含了一个原始值,当函子作用在其上后,使用普通函数对原始值进行转换,最终将结果值放入包装类型中返回。

1
2
3
4
5
6
7
8
9
extension Result {
// 满足 Functor 的条件:map 方法能够将 普通函数 应用到包装类
func map<U>(_ f: (T) -> U) -> Result<U> {
switch self {
case .success(let x): return .success(f(x))
case .failure: return .failure
}
}
}

以 RxSwift 中最常用的 map 操作符为例,如下所示。map 方法扩展自 ObservableType,其能够将普通函数 (Element) -> Result 应用到 ObservableType 包装类型。这其实就是一种典型的 函子 应用。

1
2
3
4
5
6
extension ObservableType {
public func map<Result>(_ transform: @escaping (Element) throws -> Result)
-> Observable<Result> {
return Map(source: self.asObservable(), transform: transform)
}
}

核心实现

下面我们将以正向设计的方式,结合 RxSwift 中的设计思想,手动实现一个 RxSwift 的核心部分。

基本功能(RxDemo-01)

首先,我们来定义事件,RxSwift 中有三种类型的事件,如下所示:

1
2
3
4
5
6
7
// MARK: - Event

enum Event<Element> {
case next(Element)
case error(Error)
case completed
}

其次,我们来定义订阅者,在 RxDemo-01 中,我们先忽略 取消订阅 的功能,如下所示。订阅者必须遵循订阅者协议,需要实现 监听事件 的方法 on(event: Event<Element>)。订阅者内部维护一个处理事件的闭包 _handler。当监听事件方法触发时,会立即执行处理事件闭包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// MARK: - Observer

protocol ObserverType {
associatedtype Element

// 监听事件
func on(event: Event<Element>)
}

class Observer<Element>: ObserverType {

// 处理事件的闭包
private let _handler: (Event<Element>) -> Void

init(handler: @escaping (Event<Element>) -> Void) {
_handler = handler
}

// 实现 监听事件 的协议,内部处理事件
func on(event: Event<Element>) {
// 处理事件
_handler(event)
}
}

最后,我们来定义发布者,如下所示。发布者必须遵循发布者协议,需要实现 订阅操作 的方法 subscribe<O: ObserverType>(observer: O) where O.Element == Element。发布者内部维护一个发布事件的闭包 _eventGenerator。当订阅发生时(订阅操作方法被执行时),会立即执行发布事件的闭包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// MARK: - Observable

protocol ObservableType {
associatedtype Element

// 订阅操作
func subscribe<O: ObserverType>(observer: O) where O.Element == Element
}

class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Void

init(eventGenerator: @escaping (Observer<Element>) -> Void) {
_eventGenerator = eventGenerator
}

// 实现 订阅操作 的协议,内部发布事件
func subscribe<O: ObserverType>(observer: O) where O.Element == Element {
// 生成事件
_eventGenerator(observer as! Observer<Element>)
}
}

接下来,我们来试用一下 RxDemo-01 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式基本吻合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// MARK: - Test

let observable = Observable<Int> { (observer) in
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
print("send completed")
observer.on(event: .completed)
}

let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}

observable.subscribe(observer: observer)
// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// send completed
// recive completed

下图所示为 RxDemo-01 实现的 RxSwift 的内部调用关系。通过闭包实现将 observer 传递给 observable,在发布事件时能够将事件传递给 observer,从而形成一条看似自左向右流动的数据流。

取消订阅(RxDemo-02)

RxDemo-01 有一个明显的缺陷——无法取消订阅。我们来参考 RxSwift 的实现,它 并不是直接让订阅者支持取消订阅,而是通过一个第三方类型 Disposable 对订阅进行管理Disposable 的核心作用是 提供一个状态位标识订阅是否已经取消

关于由第三方类来管理订阅,而不是让订阅者自己管理的原因,我猜测有两个:一是出于职责单一的原则;二是为了支持函数式编程,抽取一个第三方类型作为返回值,从而在链式调用时保持类型一致。

Disposable 协议要求所有的 Disposable 类型都实现 dispose 方法,表示取消订阅。

1
2
3
4
protocol Disposable {
// 取消订阅
func dispose()
}

这里我们定义两个遵循 Dispoable 协议的类型:AnonymousDisposableCompositeDisposable

AnonymousDisposable 作为一个匿名 Disposable,在本例中作为最底层的 Disposable 并没有什么作用,只是为了实现模式统一而已。

1
2
3
4
5
6
7
8
9
10
11
12
class AnonymousDisposable: Disposable {
// AnonymousDisposable 封装了 取消订阅 的闭包
private let _disposeHandler: () -> Void

init(_ disposeClosure: @escaping () -> Void) {
_disposeHandler = disposeClosure
}

func dispose() {
_disposeHandler()
}
}

CompositeDisposable 作为一个可管理多个 Disposable 的容器,它内部维持一个标志位表示订阅是否被取消。CompositeDisposable 所实现的 dispose 方法真正改变了标志位,并对其所维护的所有 Disposable 执行各自的 dispose 方法,从而完成它们定义的在取消订阅时需要执行的附带操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class CompositeDisposable: Disposable {
// 可用于管理一组 Disposable 的 CompositeDisposable

// 判断是否已销毁(取消订阅)的标志位
private(set) var isDisposed: Bool = false
// 管理一组 Disposable
private var disposables: [Disposable] = []

init() {}

func add(disposable: Disposable) {
if isDisposed {
disposable.dispose()
return
}
disposables.append(disposable)
}

func dispose() {
guard !isDisposed else { return }
// 销毁所有 disposable,并设置标志位
disposables.forEach {
$0.dispose()
}
isDisposed = true
}
}

很显然,这里执行 dispose 操作只是修改了状态,并没有释放订阅资源。只有当 CompositeDisposable 对象被释放后才算真正释放资源。在原版 RxSwift 中,DisposeBag 差不多就是 CompositeDisposable。这样也是为什么我们要把订阅交给 DisposeBag 来进行管理,DisposeBag 作为某个对象的属性,会随着对象的释放,从而自动释放真正的订阅资源。

为了支持 Disposable,我们需要在 RxDemo-01 的基础上稍作修改即可,主要是修改发布者 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// MARK: - Observable

protocol ObservableType {
associatedtype Element

// 订阅操作
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element
}

class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Disposable

init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
_eventGenerator = eventGenerator
}

// 实现 订阅操作 的协议,内部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let composite = CompositeDisposable()
// 通过一个中间 Observer 对原始 Observer 进行封装,用于过滤事件的传递。
let disposable = _eventGenerator(Observer { (event) in
guard !composite.isDisposed else { return }
// 事件传递给原始 observer
observer.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .error(_), .completed:
composite.dispose()
default:
break
}
})
// 将 _eventGenerator 返回的 AnonymousDisposable 加入至 CompositeDisposable 中进行管理
composite.add(disposable: disposable)
return composite
}
}

接下来,我们来试用一下 RxDemo-02 所实现的 RxSwift。很显然,这种模式与原始的 RxSwift 的使用方式进一步吻合。这里,我们实现了取消订阅的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// MARK: - Test

let observable = Observable<Int> { (observer) -> Disposable in
print("send 0")
observer.on(event: .next(0)) // observer.on(event: .next(0).map({ $0 * 2 }))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}

let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}

let disposable = observable.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}

// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// dispose
// send completed

RxDemo-02 的核心思想是在 RxDemo-01 的基础上对事件进行拦截和过滤,如下图所示。

具体的实现方式如下所示,通过 CompositeDisposable 管理 AnonymousDisposable(原始 subscribe 中的闭包执行后的返回类型)。同时,在执行发布事件时,使用一个中间 Observer 接收原始事件,中间 Observer 引用外部 CompositeDisposable 的状态决定是否将事件发送给原始 Observer。

使用 CompositeDisposable 的本质就是添加了一个中间层来解决管理订阅的问题。

结构优化(RxDemo-03)

在 RxDemo-02 中,AnonymousObserver 引用了外部的 CompositeDisposable 中的订阅状态,从而决定事件的传递方向。这种代码逻辑由内而外实现,并不是很直观。

为了更加清晰地描述这个事件流动方向,RxDemo-03 通过增加一个中间层,将原始 observer、中间 observer、事件转发逻辑聚合在同一层级下,让代码具有更好的可读性。这里我们实现一个遵循 Disposable 协议的 Sink 类型。Sink水槽 的意思,象征着这里我们通过它来控制事件的流动方向,暗示了这个类的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _eventGenerator: (Observer<O.Element>) -> Disposable
private let _composite = CompositeDisposable()

init(forward: O, eventGenerator: @escaping (Observer<O.Element>) -> Disposable) {
_forward = forward
_eventGenerator = eventGenerator
}

func run() {
// 通过一个中间 Observer 接收原始事件
// 根据 CompositionDisposable 的状态决定是否传递给原始 Observer
let observer = Observer<O.Element>(forward)
// 执行事件生成器
// 将返回值 Disposable 加入到 CompositeDisposable 中进行管理
_composite.add(disposable: _eventGenerator(observer))
}

private func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件传递给原始 observer
_forward.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}

func dispose() {
_disposed = true
_composite.dispose()
}
}

在定义了 Sink 之后,我们就可以简化 Observablesubscribe 方法的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Observable<Element>: ObservableType {
// 定义 发布事件 的闭包
private let _eventGenerator: (Observer<Element>) -> Disposable

init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
_eventGenerator = eventGenerator
}

// 实现 订阅操作 的协议,内部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let sink = Sink(forward: observer, eventGenerator: _eventGenerator)
sink.run()
return sink
}
}

SinkDisposable 引用了原始的事件发生器,并定义一个中间 Observer 转入至原始事件发生器,从而让中间 observer 接收原始事件。除此之外,SinkDisposable 还引用了原始 observer,当中间 observer 处理原始事件时,会判断订阅是否已经取消,从而决定是否将原始事件转发给原始 observer。此时,RxDemo-03 实现的 RxSwift 的内部调用关系如下所示。

操作符(RxDemo-04)

下面,我们来实现操作符,RxSwift 中包含了大量的操作符,它们基本上都是对函数式编程中 函子单子 等进行了应用。我们以 map 为例进行介绍。

如下所示,map 方法能够将一个普通函数应用到包装类型 ObservableType 上。map 方法最终返回一个 Observable<Result> 类型(同样遵循 ObservableType 协议),因此能够完美支持链式操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return Observable<Result> { (observer) in // observer 为原始 observer
// 此闭包可看成是一个 eventGenerator
// 向原始 observable 中传入一个中间 map observer,即由中间 map observer 替换原始 observer 监听原始事件
// 中间 map observer 对原始事件进行转换后,转发给原始 observer
return self.subscribe(observer: Observer { (event) in
switch event {
case .next(let element):
do {
try observer.on(event: .next(transform(element)))
} catch {
observer.on(event: .error(error))
}
case .error(let error):
observer.on(event: .error(error))
case .completed:
observer.on(event: .completed)
}
})
}
}
}

接下来,我们来试用一下 RxDemo-04 所实现的 RxSwift。这里,我们实现了 map 操作符的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// MARK: - Test

let observable = Observable<Int> { (observer) -> Disposable in
print("send 0")
observer.on(event: .next(0)) // observer.on(event: .next(0).map({ $0 * 2 }))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}

let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}

let disposable = observable.map { $0 * 2 }.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}

// send 0
// recive 0
// send 1
// recive 2
// send 2
// recive 4
// send 3
// recive 6
// dispose
// send completed

RxDemo-04 中由于增加了操作符功能,其内部的调用关系也发生了变化。特别是,发布事件和过滤事件的逻辑,很显然,每增加一个操作符,就会增加一个 SinkDisposable 中间层,调用栈也会更深。

如果我们仔细分析 RxDemo-04,其实我们可以发现内部隐藏了如下所示的调用关系链:observable -> MapObserver -> MapObservable -> observer。实际执行时,调用关系如下所示:

  1. observable 调用 MapObserver.on 方法,将原始事件传递给 MapObserver;
  2. MapObserver 使用 map 方法将原始事件转换成 map 事件(即 map 后的数据),作为 MapObservable 发出的事件;
  3. MapObservable 调用 observer.on 方法,将 map 事件传给 observer。

分类细化(RxDemo-05)

在 RxDemo-04 中,为了增加 map 操作符,对 ObservableType 进行了扩展,其本质就是在原始的 Observable 和 Observer 之间插入了一个 MapObserver 和一个 MapObservable。本节,我们继续进行优化,对中间类也进行细分和定义。

在 RxDemo-04 中,Sink 的主要作用是 根据订阅是否取消决定是否拦截事件的传递。这里我们可能会想到:中间订阅者(如:MapObserver)本身是不是就应该具备 Sink 的这种功能呢?事实上,RxSwift 就是让 Sink 作为所有 Observer 的基类。

对于操作符,我们可以实现上述的模式;但是,对于不带操作符的情况,该如何处理呢?为了实现模式的统一,我们可以认为不带操作符的情况等同于带了 只返回原值的匿名操作符,即等同于 map { $0 }。针对此情况,我们也需要定义两个类 AnonymousObserverAnonymousObservable

下面,我们来进行优化改进。

首先,我们将原来的 Observable<Element> 改成抽象基类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 抽象类
class Observable<Element>: ObservableType {
// // 定义 发布事件 的闭包,有子类来定义
// private let _eventGenerator: (Observer<Element>) -> Disposable
//
// init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
// _eventGenerator = eventGenerator
// }

// 实现 订阅操作 的协议,内部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}

func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
fatalError("Abstract Method", file: file, line: line)
}

其次,我们再一定一个新的类 Producer<Element> 代替 Observable<Element>Producer<Element> 继承自 Observable<Element>,作为发布者的基类,该类内部没有时间生成器 eventGenerator 的闭包,由子类选择性进行定义。

1
2
3
4
5
6
7
8
9
10
class Producer<Element>: Observable<Element> {
// 实现 订阅操作 的协议,内部生成事件
override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
return run(observer: observer)
}

func run<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}

然后,我们来对 Sink 进行修改。在 RxDemo-04 中,Sink 即提供了事件生成的功能和事件转发的功能。这里,我们让 Sink 的职责更加单一,仅仅是提供事件转发的功能。修改结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _composite = CompositeDisposable()

init(forward: O) {
_forward = forward
}

func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件传递给原始 observer
_forward.on(event: event)
// 通过 composite 管理 error、completed 时,自动取消订阅
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}

func dispose() {
_disposed = true
print("dispose execute")
_composite.dispose()
}
}

为了便于泛型类型的转换,我们给 ObservableType 协议增加一个方法,并由 Observable<Element> 予以实现。

1
2
3
4
5
protocol ObservableType {
// ...

func asObservable() -> Observable<Element>
}
1
2
3
4
5
6
7
class Observable<Element>: ObservableType {
// ...

func asObservable() -> Observable<Element> {
return self
}
}

接下来,我们来分别实现 Sink 的子类 AnonymousObserverMapObserver 以及 Producer 的子类 AnonymousObservableMapObservable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class AnonymousObserver<O: ObserverType>: Sink<O>, ObserverType {
typealias Element = O.Element

override init(forward: O) {
super.init(forward: forward) // forward 为原始订阅者
}

func on(event: Event<Element>) {
// 对原始事件进行转发
switch event {
case .next(let element):
self.forward(event: .next(element))
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}

func run(parent: AnonymousObservable<Element>) -> Disposable {
// 执行事件生成器
parent._eventGenerator(Observer(self))
}
}

class AnonymousObservable<Element>: Producer<Element> {
// 持有事件生成器闭包
let _eventGenerator: (Observer<Element>) -> Disposable

init(eventGenerator: @escaping (Observer<Element>) -> Disposable) {
self._eventGenerator = eventGenerator
}

override func run<O>(observer: O) -> Disposable where Element == O.Element, O : ObserverType {
// 订阅发生时,生成一个中间订阅者 AnonymousObserver 来订阅原始事件,并将事件转发给原始订阅者
let sink = AnonymousObserver(forward: observer)
sink.run(parent: self)
return sink
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class MapObserver<Source, Result, O: ObserverType>: Sink<O>, ObserverType {
typealias Element = Source
typealias Result = O.Element
typealias Transform = (Source) throws -> Result
private let _transform: Transform

init(forward: O, transform: @escaping Transform) { // forward 为原始订阅者
self._transform = transform
super.init(forward: forward)
}

func on(event: Event<Element>) {
// 对原始事件进行 map 转换,对结果进行转发
switch event {
case .next(let element):
do {
let mappedElement = try _transform(element)
self.forward(event: .next(mappedElement as! O.Element))
} catch {
self.forward(event: .error(error))
}
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}
}

class MapObservable<Source, Result>: Producer<Result> {
typealias Transform = (Source) throws -> Result
private let _transform: Transform
private let _source: Observable<Source>

init(source: Observable<Source>, transform: @escaping Transform) {
self._source = source
self._transform = transform
}

override func run<O: ObserverType>(observer: O) -> Disposable where Result == O.Element {
// 订阅发生时,生成一个中间订阅者 MapObserver 来订阅上游事件
let sink = MapObserver(forward: observer, transform: self._transform)
self._source.subscribe(observer: sink)
return sink
}
}

现在我们再来看发布者、操作符,其实两者的本质都是一样,都是创建了一个发布者。对此,我们采用扩展的方式来提供相应的方法。如下所示:

1
2
3
4
5
6
7
8
9
10
11
extension ObservableType {
static func create(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(eventGenerator: eventGenerator)
}
}

extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return MapObservable(source: self.asObservable(), transform: transform)
}
}

最后,我们再来验证一下实现结果,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// MARK: - Test

let observable = Observable<Int>.create { (observer) -> Disposable in // observer 为 MapObserver
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}

let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}

let disposable = observable.map { $0 * 2 }.map { $0 + 1 }.subscribe(observer: observer)

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}

// send 0
// recive 1
// send 1
// recive 3
// send 2
// recive 5
// send 3
// recive 7
// dispose execute
// send completed

从运行结果而言,RxDemo-05 基本实现了分类细化,并且达到了取消订阅的功能。此时,上述例子中实际的订阅关系如下所示。相比 RxDemo-04,多了 AnonymousObserverAnonymousObservable,但是整体的内部订阅关系链更加清晰了。

此时,我们再来对照一下 RxSwift 和 RxDemo-05 中类的定义,如下表所示。各个类的功能基本相同,整体结构也是大同小异,只是在类名上略有差异。

RxDemo-05 Producer Sink AnonymousObserver AnonymousObservable MapObserver MapObservable
RxSwift Producer Sink AnonymousObservableSink AnonymousObservable Map MapSink

订阅管理(RxDemo-06)

如果,我们仔细对比,可以发现 RxSwift 中 Producersubscribe 方法内部与 RxDemo-05 还是不太一样,前者内部还引用了一个 SinkDisposer 类。其作用是什么呢?事实上,其主要作用是管理 disposeHandler。细心的同学可能会发现 RxDemo-05 中有个 BUG:取消订阅时没有执行 print("dispose") 闭包。

对此,我们也可以用类似的方式来解决,通过增加一个 Diposer 类来进行管理。具体代码见:RxDemo-06。

当订阅发生时(即执行 subscribe 方法时),内部会产生一个递归的控制流,如下图所示。

通过递归返回的方式构建整个订阅管理关系链,如下图所示。diposer0subscribe 方法最终返回的 Disposable 对象。当我们对 disposer0 执行 dispose 方法时,内部会递归地执行 dispose 方法,最终取消订阅链中所有的订阅。

注意:这里面会有循环引用,如 MapObserver 内部又引用了 Disposer0,RxDemo-06 以及 RxSwift 中的处理是给 Disposer 类内部添加一个状态,表示是否已经取消了订阅,从而避免循环引用导致的循环调用。这里的循环引用并不一定是坏事,它在下述的场景下起到了非常关键的作用。

当事件中出现一个 completeerror 事件时,由于事件会依次传递至 Observer,最后一次传递时,即 MapObserver 进行传递时,会判断是否是 completeerror 事件,从而决定是否执行 dispose 方法。当 MapObserver 执行 dipose 方法时,会通过上述的循环引用,调用 Disposer0 执行 dispose 方法,从而实现整体取消订阅。

总结

本文通过逐步实现 RxSwift 核心部分中的功能,一窥其背后的设计思路。从中我们也看到了其对函数式编程的应用,以及其所呈现出来的流编程模式的底层实现原理。

后续,我们将进一步探索原版 RxSwift 中其他的一些内容。

参考

  1. RxSwift repo
  2. ReactiveX
  3. 函数式编程——Functor、Applicative、Monad
  4. 《计算机程序的构造与解释》
  5. Modern RxSwift Architectures
  6. Learn Rx by implementing Observable