浅谈 Actor 模型

自分布式计算出现以来,业界已经开始广泛研究基于消息传递编程模型的解决方案。关于消息传递,Wikipedia 描述其广泛定义主要包括:远程过程调用(Remote Procedure Calls, RPC)消息传递接口(Message Passing Interface, MPI)。但是,如今我们所谈到的消息传递,通常是指 actor 模型(Actor Model)。作为一种通用的消息传递编程模型,其起源于 20 世纪 70 年代,如今被广泛用于构建大规模可伸缩分布式系统。

作为入门,本文我们来简单聊一聊 actor 模型。

Actor 模型

一个 actor 定义为一个计算单元。所谓麻雀虽小,五脏俱全,每个 Actor 包含了存储、通信、计算等能力。在分布式系统中,通常包含了非常多的服务器集群,每一台服务器又包含了大量 actor 实例,它们共同构成了强大的并行计算能力。

Actor 的核心思想是 独立维护隔离状态,并基于消息传递实现异步通信。围绕其进行实现,actor 通常包含以下特征:

  • 每个 actor 持有一个邮箱(mailbox),本质上是一个队列,用于存储消息。
  • 每个 actor 可以发送消息至任何 actor。
  • 每个 actor 可以通过处理消息来更新内部状态,对于外部而言,actor 的状态是隔离的状态(isolated state)。

为了便于通信,actor 模型使用 异步 消息传递。消息传递不使用任何中间实体,如:通道(channel)。由于 actor 模型的消息是异步传递的,中间可能会经过很长时间,甚至丢失,因此无法保证消息到达目标 actor 时的顺序。每个 actor 都完全独立于任何其他实例,actor 之间的交互完全基于异步消息,因此能够在很大程度上避免共享内存的存在问题。

任务调度

Actor 模型根据任务调度的方式可以分为两种,分别是:

  • 基于线程(thread-based)的 actor 模型
  • 事件驱动(event-driven)的 actor 模型

基于线程的 actor 模型

基于线程的 actor 模型,其本质是为每一个 actor 分配一个独立的“线程”。这里的“线程”并不是严格意义的操作系统线程,而是广泛意义的执行过程,它可以是线程、协程或虚拟机线程。

在基于线程的 actor 模型中,每个 actor 独占一个线程,如果当前 actor 的邮箱为空,actor 会阻塞当前线程,等待接收新的消息。在实现中,一般使用 receive 原语。

这种 actor 模型实现起来比较简单,但是缺点也非常明显,由于线程数量受到系统的限制,因此 actor 的数量也会受到限制。现阶段,只有少部分 actor 模型采用基于线程的实现方式,如:Erlang、Scala Actor、Cloud Haskell。

Erlang Actor

Erlang 是第一种实现基于线程的 actor 模型的编程语言。Erlang 提供三种基本操作以实现 actor 模型,分别是:

  • spawn:创建一个进程(process)。在 Erlang 中,进程属于虚拟机,而非操作系统。
  • send:发送消息至一个线程。
  • receive:接收消息。

如下所示,为一个基于 Erlang actor 的使用示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
% area_server.erl
-module(area_server).
-export([start/0, area/2, loop/0]).

start() -> spawn(area_server, loop, []).

area(Pid, What) ->
rpc(Pid, What).

rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.

loop() ->
receive
{From, {rectangle, Width, Ht}} ->
From ! {self(), Width * Ht},
loop();
{From, {circle, R}} ->
From ! {self(), 3.14159 * R * R},
loop();
{From, Other} ->
From ! {self(), {error, Other}},
loop()
end.
我们可以 Shell 环境下通过 erl 解释执行。当进入 Erlang REPL 环境后,我们可以执行如下代码进行测试。
1
2
3
4
5
6
7
8
9
10
11
1> c(area_server).
{ok,area_server}

2> Pid = area_server:start().
<0.94.0>

3> area_server:area(Pid, {rectangle, 10 * 8}).
{error,{rectangle,80}}

4> area_server:area(Pid, {circle, 5}).
78.53975

Scala Actor

Scala Actor 同样也实现了基于线程的 actor 模型,它将 Erlang 风格的轻量级消息传递并发性待到了 JVM,并将其集成到了重量级的线程/进程并发模型中。如下所示,为 Scala Actor 的使用示例,其实现语法也与 Erlang 非常相似。不过,从 Scala 2.11 开始,scala actors 不再作为标准库,示例中的代码我们需要进行一番改造才能运行。但是,从实现上来看,Scala Actor 和 Erlang Actor 非常相似,均采用 receive 原语接收消息,阻塞线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// pingpong.scala
import scala.actors.Actor
import scala.actors.Actor._

case object Ping
case object Pong
case object Stop

class Ping(count: int, pong: Actor) extends Actor {
def act() {
var pingsLeft = count - 1
pong ! Ping
while (true) {
receive {
case Pong =>
if (pingsLeft % 1000 == 0)
Console.println("Ping: pong")
if (pingsLeft > 0) {
pong ! Ping
pingsLeft -= 1
} else {
Console.println("Ping: stop")
pong ! Stop
exit()
}
}
}
}
}

class Pong extends Actor {
def act() {
var pongCount = 0
while (true) {
receive {
case Ping =>
if (pongCount % 1000 == 0)
Console.println("Pong: ping "+pongCount)
sender ! Pong
pongCount = pongCount + 1
case Stop =>
Console.println("Pong: stop")
exit()
}
}
}
}

object pingpong extends Application {
val pong = new Pong
val ping = new Ping(100000, pong)
ping.start
pong.start
}

事件驱动的 actor 模型

在事件驱动的 actor 模型,actor 并不直接与线程耦合,只有在事件触发(即接收消息)时,才为 actor 的任务分配线程并执行。这种方式使用续体闭包(Continuation Closure)来封装 actor 及其状态。当事件处理完毕,即退出线程。通过这种方式,我们可以使用很少的线程来执行大量 actor 产生的任务。在实现中,一般使用 react 原语。

事件驱动的 actor 模型在消息触发时,会自动创建并分配线程。在这种过程中,一般的优化是将 actor 执行建立在底层的线程池之上,这些线程可以是线程、协程或虚拟机线程。从概念上讲,这种实现与 run loop、event loop 机制非常相似。

现阶段,大部分 actor 模型采用事件驱动的调度方式。

Dart Isolate

Dart Isolate 本质上是一种事件驱动的 actor 模型,一个 Isolate 对应一个 Actor。一个 IsolateGroup 管理多个 Isolate,基于此可以实现结构化并发。Dart VM 底层实现了一个线程池,管理操作系统线程。当接收到一个消息时,会自动创建一个线程来执行对应的处理方法。

如下所示,是一个 Dart Isolate 的使用示例。ReceivePortSendPort 本质上就是 Isolate 的地址,只不过从语义上进行区分,定义了接收者和发送者。spawn 方法创建一个新的 Isolate,续体闭包 _readAndParseJson 即新创建的 actor。执行完毕之后,通过 SendPort 将结果返回给主 Isolate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void main() async {
final jsonData = await _parseInBackground();
print('Number of JSON keys: ${jsonData.length}');
}

Future<Map<String, dynamic>> _parseInBackground() async {
final p = ReceivePort();
await Isolate.spawn(_readAndParseJson, p.sendPort);
return await p.first as Map<String, dynamic>;
}

Future<void> _readAndParseJson(SendPort p) async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData);
Isolate.exit(p, jsonData);
}

Groovy Actor

Groovy 的 Gpars Actor 也是一种事件驱动的 actor 模型,并发的 actor 共享一个线程池,底层使用 fork/join 进行线程调度,其使用了 react 原语。

如下所示,为一个 Gpars Actor 的使用示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.Actors
final def doubler = Actors.reactor {
2 * it
}.start()

Actor actor = Actors.actor {
(1..10).each {doubler << it}
int i = 0
loop {
i += 1
if (i > 10) stop()
else {
react {message ->
println "Double of $i = $message"
}
}
}
}.start()

actor.join()
doubler.stop()
doubler.join()

总结

Actor 模型是分布式/并发编程中常用的一种解决方案,其基本的设计结构非常简单,其核心思想是 “独立维护隔离状态,并基于消息传递实现异步通信”。

根据 actor 的底层调度方式,其又可以分为:基于线程的 actor 和事件驱动的 actor。两者在底层的线程使用方式上有所区别。目前,绝大多数编程语言采用的事件驱动的 actor 模型,其在资源分配方面更加合理,执行效率也更高;缺点在于底层实现复杂度高。

后续有机会我们来探索一下 actor 的实现源码,加强一下对于 actor 实现的整体认知。

参考

  1. Message passing
  2. Actor model
  3. Message Passing and the Actor Model
  4. Dart 中的并发
  5. 《七周七并发模型》
  6. Actor-based Concurrency
  7. The Actor Model Towards Better Concurrency
  8. Locks, Actors, And Stm In Pictures
  9. 并发模型与事件循环
  10. Erlang's actor model
  11. ActorLite: 一个轻量级Actor模型实现
  12. 《Erlang 程序设计》
  13. The Scala Actors API
  14. Scala Actors: A Short Tutorial
  15. Classic Actors
  16. Dart asynchronous programming: Isolates and event loops
  17. Introduction to Dart VM