使用 Akka Actor 和 Java 8 构建反应式应用

阅读数:10428 2017 年 12 月 14 日 17:07

本文要点

  • Actor 模型为编写并发和分布式的系统提供了高层次的抽象,为开发人员屏蔽了显式锁定和线程管理的工作;
  • Actor 模型为反应式系统提供了核心功能,这些功能在反应式宣言中定义为响应性、弹性、扩展性以及消息驱动;
  • Akka 是一个基于 Actor 的框架,借助 Java 8 的 Lambda 支持它非常易于实现;
  • 通过使用 Actor 模型,开发人员在设计和实现系统时,能够更加关注于核心功能,忽略其他业务不相关的冗余内容;
  • 基于 Actor 的系统非常适合快速演化的微服务架构。

尽管“反应式(reactive)”这个术语已经存在很长时间了,但是只有到最近它才被行业实际应用到系统设计之中,并得到了主流的采纳。在 2014 年 Gartner 就写到,过去非常流行的三层架构已经日薄西山。随着企业在推进现代化方面的努力,这一点已经越发明晰了,企业必须要重新思考他们十多年来构建应用的方式。

微服务席卷了软件行业,它所带来的冲击波正在从根本上动摇传统开发流程。我们看到软件设计范式发生了变化,项目管理的方法论也随之发生了演化。我们正在向新的应用设计和实现方式转变,它以前所未有的势头在 IT 系统中实现。即便微服务这个术语不是全新的概念,我们的行业也正在意识到它不仅仅是解耦 RESTful 端点和拆分单体应用,它真正的价值在于更好的资源利用效率以及面对不可预知工作负载时更强的扩展性。反应式宣言(Reactive Manifesto)的原则很快变成了微服务架构的圣经,因为它们本质上就是分布式的反应式应用。

如今应用中的 Actor 模型

为了保持用户的兴趣,应用必须要保持很高的响应性,同时,为了满足受众不断变化的需求和预期,应用必须要快速演化。用于构建应用的技术在不断地快速演进;科学在不断发展,持续涌现的新需求不能依赖于昨天的工具和方法论。Actor 模型正在不断发展起来,它是一种构建应用的高效工具,能够充分发挥多核、内存以及集群环境所带来的强大处理能力。

Actor 提供了一种简单却强大的模型,通过该模型设计和实现的应用可以分布式的,并且能够跨系统中所有的资源共享工作任务,这些资源可以从线程和核心级别一直到服务器集群和数据中心级别。它提供了一个高效的框架来构建应用,所构建出的应用具有较高的并发性,并且能够提升资源的利用率。另外很重要的一点,Actor 模型还提供了定义良好的方式来优雅地处理错误和故障,确保应用的可靠性级别,它能够隔离问题,防止级联故障和长时间的宕机。

在过去,构建高并发的系统通常涉及到大量的装配和非常技术化的编程,它们都是非常难以掌握的。这些技术方面的挑战会抢占我们对系统核心业务功能的注意力,因为很大一部分的工作都集中在业务细节上,这需要花费很多的时间和精力用于搭建处理管道和功能装配。如果我们使用 Actor 来构建系统的话,就能在一个较高的抽象层级完成这些任务,因为处理管道和功能装配已经内置在了 Actor 模型之中。这不仅能够将我们从繁琐的传统系统实现的细节中解放出来,还能让我们更加关注于系统的核心功能和创新。

使用 Java 8 和 Akka 实现 Actor 模型

Akka 是一个在 JVM 上构建高并发、分布式、有弹性的消息驱动应用的工具集。Akka “actor”只是 Akka 工具集中一部分,它能够让我们在编写并发代码时,不用去思考低层级的线程和锁。Akka 中其他的工具还包括 Akka Streams 和 Akka http。尽管 Akka 是使用 Scala 编写的,但是它也有 Java API ,如下的样例运行在 2.4.9 版本以上(目前 Akka 的最新版本为 2.5.7,但核心 API 与本文基本相同——译者注)。

在 Akka 中,Actor 是基本的工作单元。Actor 是状态和行为的一个容器,它可以创建和监管子 Actor。Actor 之间通过异步的消息实现相互的通信。这个模型保护了 Actor 的内部状态,使其能够实现线程安全,该模型还实现了事件驱动的行为,从而不会阻塞其他的 Actor。作为开始,我们所需要知道的只是 akka 的 Maven 依赖。

复制代码
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.9</version>
</dependency>

改变 Actor 的状态

就像通过移动设备收发短信一样,我们需要使用消息来调用 Actor。与短信类似,Actor 之间的消息也必须是不可变的。在使用 Actor 的时候,最重要的就是定义它所能接受的消息。(这种消息通常被称为协议,因为它定义了 Actor 之间的交互点。)Actor 接收消息,然后以各种方式对其作出反应,它们可以发送其他的消息、修改自己的状态或行为、创建其他的 Actor。

Actor 的初始行为是通过实现receive()方法来定义的,在实现这个方法时,可以在默认的constructor. receive()中借助ReceiveBuilder匹配传入的消息并执行相关的行为。每条信息的行为通过一个 Lambda 表达式来定义。在下面的样例中,ReceiveBuilder使用了对接口方法“onMessage”的引用。onMessage方法增加了一个计数器(内部状态)并通过AbstractLoggingActor.log方法记录了一条 info 级别的日志信息。

复制代码
static class Counter extends AbstractLoggingActor {
static class Message { }
private int counter = 0;
{
receive(ReceiveBuilder
.match(Message.class, this::onMessage)
.build()
);
}
private void onMessage(Message message) {
counter++;
log().info("Increased counter " + counter);
}
}

Actor 就绪之后,还需要启动它。这需要通过ActorSystem实现,它控制着 Actor 的生命周期。但是,我们首先需要提供一些关于如何启动这个 Actor 所需的额外信息。akka.actor.Props 是一个配置对象,能够将上下文范围内的配置暴露给框架的各个地方。它用来创建我们的 Actor,这个对象是不可变的,因此线程安全,完全可以共享。

return Props.create(Counter.class);

Props对象描述了 Actor 的构造器参数。将其封装到一个工厂函数中并放到 Actor 的构造器附近通常是一种好的实践。ActorSystem本身是在 main 方法中创建的。

复制代码
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("sample1");
ActorRef counter = system.actorOf(Counter.props(), "counter");
}

ActorSystem (“sample1”)和它所包含的 Actor(“counter”)都可以给定名称,这样便于在 Actor 层级结构中进行导航,这个话题稍后会进行讨论。现在,ActorRef可以发送一条消息给 Actor,如样例所示:

counter.tell(new Counter.Message(), ActorRef.noSender());

在这里,使用两个参数定义了要发送的 Message 以及消息的发送者。(顾名思义,noSender表明在本例中,并没有使用发送者。)如果运行上述样例的话,我们就能得到预期的输出:

[01/10/2017 10:15:15.400] [sample1-akka.actor.default-dispatcher-4] [akka://sample1/user/counter] Increased counter 1

这是一个非常简单的样例,但是它提供了我们所需的线程安全性。发送给 Actor 的消息来源于不同的线程,这些消息屏蔽了并发的问题,因为 Actor 框架会进行消息的序列化处理。读者可以在线查看完整的样例。

修改 Actor 的行为

读者可能已经注意到,我们的简单样例修改了 Actor 的状态,但是它并没有改变 Actor 行为,也没有发送消息给其他 Actor。我们接下来考虑一个防盗报警系统,它可以通过密码来启用或禁用,它的传感器会探测活动。如果有人试图通过不正确的密码禁用告警的话,它就会发出声音。Actor 能够响应三种消息,分别是通过密码(以负载的形式提供该值)进行禁用和启用的消息以及盗窃活动的消息。这三种消息都包含在了下面的协议中:

复制代码
static class Alarm extends AbstractLoggingActor {
// contract
static class Activity {}
static class Disable {
private final String password;
public Disable(String password) {
this.password = password;
}
}
static class Enable {
private final String password;
public Enable(String password) {
this.password = password;
}
}
// ...
}

Actor 有一个针对密码的预置属性,它也会传入到构造器中:

复制代码
private final String password;
public Alarm(String password) {
this.password = password;
// ...
}

前面提到的 akka.actor.Props 配置对象也需要知道 password 属性,这样的话,才能在 Actor 系统启动的时候将其传递给实际的构造器。

复制代码
public static Props props(String password) {
return Props.create(Alarm.class, password);
}

针对每种可能的消息,Alarm 还需要对应的行为。这些行为是AbstractActorreceive方法的实现。receive 方法应该定义一系列的match语句(每个都是PartialFunction<Object, BoxedUnit>类型),它定义了 Actor 能够处理的消息,另外还包含消息如何进行处理的实现。

复制代码
private final PartialFunction<Object, BoxedUnit> enabled;
private final PartialFunction<Object, BoxedUnit> disabled;

如果这个签名看上去令人望而生畏的话,那么我们的代码可以通过前面所使用的ReceiveBuilder将细节隐藏起来。

复制代码
public Alarm(String password) {
this.password = password;
enabled = ReceiveBuilder
.match(Activity.class, this::onActivity)
.match(Disable.class, this::onDisable)
.build();
disabled = ReceiveBuilder
.match(Enable.class, this::onEnable)
.build();
receive(disabled);
}
}

需要注意最后对receive的调用,将默认行为设置为“disabled”。这三个行为是使用已有的三个方法(onActivity、onDisable、onEnable)来实现的。这些方法中最简单的是onActivity。如果接收到 activity 的话,报警会在控制台记录一条日志。在这里需要注意 activity 没有消息负载,所以我们将其命名为 ignored。  

复制代码
private void onActivity(Activity ignored) {
log().warning("oeoeoeoeoe, alarm alarm!!!");
}

如果 Actor 接收到一条enable消息的话,新的状态将会记录下来并且状态将会变更为enabled。如果密码不匹配的话,会记录一条简短的警告日志。消息负载现在包含了密码,所以我们可以通过访问它来校验密码。

复制代码
private void onEnable(Enable enable) {
if (password.equals(enable.password)) {
log().info("Alarm enable");
getContext().become(enabled);
} else {
log().info("Someone failed to enable the alarm");
}
}

当收到一条 disable 消息时,Actor 需要检查密码,记录一条关于状态变化的简短消息然后将状态修改为 disabled 或者在密码不匹配的情况下记录一条警告信息。

复制代码
private void onDisable(Disable disable) {
if (password.equals(disable.password)) {
log().info("Alarm disabled");
getContext().become(disabled);
} else {
log().warning("Someone who didn't know the password tried to disable it");
}
}

这样就完成了 Actor 的逻辑,我们接下来可以启动 Actor 系统并向其发送一些消息。注意,我们的正确密码“cats”是作为一个属性传递给 Actor 系统的。

复制代码
ActorSystem system = ActorSystem.create();
final ActorRef alarm = system.actorOf(Alarm.props("cat"), "alarm");

消息:

复制代码
alarm.tell(new Alarm.Activity(), ActorRef.noSender());
alarm.tell(new Alarm.Enable("dogs"), ActorRef.noSender());
alarm.tell(new Alarm.Enable("cat"), ActorRef.noSender());
alarm.tell(new Alarm.Activity(), ActorRef.noSender());
alarm.tell(new Alarm.Disable("dogs"), ActorRef.noSender());
alarm.tell(new Alarm.Disable("cat"), ActorRef.noSender());
alarm.tell(new Alarm.Activity(), ActorRef.noSender());

产生的输出如下所示:

复制代码
[01/10/2017 10:15:15.400] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Someone failed to enable the alarm
[01/10/2017 10:15:15.401] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Alarm enable
[WARN] [01/10/2017 10:15:15.403] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] oeoeoeoeoe, alarm alarm!!!
[WARN] [01/10/2017 10:15:15.404] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Someone who didn't know the password tried to disable it
[01/10/2017 10:15:15.404] [default-akka.actor.default-dispatcher-4] [akka://default/user/alarm] Alarm disabled

你可以在 GitHub 上找到完整的可运行样例。到目前为止,我们只使用了一个Actor 来处理消息。不过就像在业务组织中一样,Actor 也能形成自然的层级结构。

Actor 的层级结构

Actor 可能会创建其他的 Actor。当一个 Actor 创建另外一个 Actor 时,创建者也被称为监管者(supervisor),而被创建的 Actor也被称为工作者(worker)。我们可能基于很多原因需要创建工作者 Actor,最常见的原因是工作的委托。监管者创建一个或多个工作者 Actor,然后将工作委托给它们。

监管者同时会成为工作者的看守人。就像父母会时刻关注孩子的行为那样,监管者也会照顾它的工作者 Actor。如果 Actor 遇到问题的话,它会将自己挂起(也就是说在恢复之前,它不会处理正常的消息),并且会通知其监管者自己发生了故障。

到目前为止,我们创建了多个 Actor 并为其分配了名字。Actor 的名字用来在层级结构中识别 Actor。与 Actor 交互的一般都是用户所创建 Actor 的父 Actor,也就是带有"/user"路径的 guardian。使用原始system.actorOf()创建的 Actor 是该 guardian 的直接子 Actor,如果它终止的话,系统中所有正常的 Actor 也都会关闭。在上面的 alarm 样例中,我们创建的是/user/alarm路径的用户 Actor。因为 Actor 是按照严格的层级方式来创建的,所以 Actor 会存在一个由 Actor 名称组成的唯一序列,这个序列会从 Actor 系统的根逐级往下,按照父子关系形成。这个序列类似于文件系统中的封闭文件夹,因此采用了“路径(path)”这个名称来代指它,当然 Actor 层级结构与文件系统的层级结构还有一些基础的差异。

在 Actor 内部,我们可以调用getContext().actorOf(props, “alarm-child”)创建名为“alarm-child”的新 Actor,它会作为 alarm Actor 的子 Actor。子 Actor 的生命周期是绑定在父 Actor 之上的,这意味着如果我们停止“alarm” Actor 的话,也会停掉其子 Actor:

使用Akka Actor和Java 8构建反应式应用

这种层级结构对于基于 Actor 系统的故障处理也有着直接影响。Actor 系统的典型特点就是将任务进行分解和委托,直到它被拆分得足够小,能够从一个地方进行处理。通过这种方式,不仅任务本身能够非常清晰地进行结构化,所形成的 Actor 也能在如下方面变得非常明确:

  • 应该处理哪些消息
  • 应该怎样正确地应对接收到的消息
  • 应该如何处理故障。

如果某个 Actor 无法处理特定情景的话,它会发送对应的故障消息给它的监管者,请求帮助。面对故障,监管者有四种不同的可选方案:

  • 恢复(Resume)子 Actor,保持其已有的内部状态,但是忽略掉导致故障的消息;
  • 重启(Restart)子 Actor,通过启动新的实例,清理其已有的状态;
  • 永久停止(Stop)子 Actor,将子 Actor 未来所有的消息发送至 Dead-Letter Office
  • 将故障传递至更高的层级 (Escalate),这需要让监管者本身也发生故障。

接下来,我们将上面学到的所有内容通过一个样例来具体讲解一下:NonTrustWorthyChild接收 Command 消息,每当收到该消息时,会增加一个内部的计数器。如果消息数能够被 4 整除的话,会抛出一个 RuntimeException,这个异常会向上传递给 Supervisor。这里并没有什么新东西,Command 消息本身并没有负载。

复制代码
public class NonTrustWorthyChild extends AbstractLoggingActor {
public static class Command {}
private long messages = 0L;
{
receive(ReceiveBuilder
.match(Command.class, this::onCommand)
.build()
);
}
private void onCommand(Command c) {
messages++;
if (messages % 4 == 0) {
throw new RuntimeException("Oh no, I got four commands, can't handle more");
} else {
log().info("Got a command " + messages);
}
}
public static Props props() {
return Props.create(NonTrustWorthyChild.class);
}
}

Supervisor 在它的构造器中启动NonTrustWorthyChild,并将它所接收到的 command 消息直接转发给子 Actor。

复制代码
public class Supervisor extends AbstractLoggingActor {
{
final ActorRef child = getContext().actorOf(NonTrustWorthyChild.props(), "child");
receive(ReceiveBuilder
.matchAny(command -> child.forward(command, getContext()))
.build()
);
}
//…
}

Supervisor实际启动之后,所形成的层级结构将会是“/user/supervisor/child”。在我们完成该任务之前,需要预先定义所谓的监管策略(supervision strategy)。Akka 提供了两种类型的监管策略:OneForOneStrategyAllForOneStrategy。它们之间的差异在于前者会将指令应用于发生故障的子 Actor,而后者则会将指令同时应用于子 Actor 的兄弟节点。正常情况下,我们应该使用OneForOneStrategy,如果没有明确声明的话,它也是默认方案。监管策略需要通过覆盖SupervisorStrategy方法来定义。

复制代码
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(
10,
Duration.create(10, TimeUnit.SECONDS),
DeciderBuilder
.match(RuntimeException.class, ex -> stop())
.build()
);
}

第一个参数定义了maxNrOfRetries,它指定了子 Actor 在停止之前允许尝试重启的次数。(如果设置为负数值,则代表没有限制)。withinTimeRange参数定义了maxNrOfRetries的持续时间窗口。按照上面的定义,该策略会在 10 秒钟之内尝试 10 次。DeciderBuilder的工作方式与ReceiveBuilder完全类似,它定义了要匹配的异常以及如何应对。在本例中,如果在 10 秒钟内尝试了 10 次的话,Supervisor会停止掉NonTrustWorthyChild,所有剩余的消息将会发送至 dead letter box。

Actor 系统是通过Supervisor Actor 来启动的。

复制代码
ActorSystem system = ActorSystem.create();
final ActorRef supervisor = system.actorOf(Supervisor.props(), "supervisor");

当系统启动之后,我们发送 10 条 command 信息到 Supervisor。需要注意,“Command”消息是定义在 NonTrustWorthyChild 中的。

复制代码
for (int i = 0; i < 10; i++) {
supervisor.tell(new NonTrustWorthyChild.Command(), ActorRef.noSender());
}

输出的内容显示,在四条消息之后,异常传递到了Supervisor中,剩下的消息发送到了deadLetters收件箱中。如果SupervisorStrategy被定义为restart()而不是stop()的话,那么将会启动一个新的NonTrustWorthyChild Actor 实例。

复制代码
[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 1
[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 2
[01/10/2017 12:33:47.540] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Got a command 3
[01/10/2017 12:33:47.548] [default-akka.actor.default-dispatcher-4] [akka://default/user/supervisor] Oh no, I got four commands, I can't handle any more
java.lang.RuntimeException: Oh no, I got four commands, I can't handle any more
...
[01/10/2017 12:33:47.556] [default-akka.actor.default-dispatcher-3] [akka://default/user/supervisor/child] Message [com.lightbend.akkasample.sample3.NonTrustWorthyChild$Command] from Actor[akka://default/deadLetters] to Actor[akka://default/user/supervisor/child#-1445437649] was not delivered. [1] dead letters encountered.

这个日志可以关闭或者进行调整,这需要修改 “akka.log-dead-letters”“akka.log-dead-letters-during-shutdown”的配置。

读者可以在线查看完整的样例,并尝试调整 SupervisorStrategy

总结

借助 Akka 和 Java 8,我们能够创建分布式和基于微服务的系统,这在几年前还是一种梦想。现在,所有行业的企业都迫切希望系统的演化速度能够跟上业务的速度和用户的需求。如今,我们能够弹性的扩展系统,使其支持大量的用户和庞大的数据。我们创建的系统有望具备一定级别的弹性,使停机时间不再是按照小时来计算,而是按照秒来计算。基于 Actor 的系统能够让我们创建快速演进的微服务架构,它可以进行扩展并且能够不停机运行。

Actor 模型提供了反应式系统的核心功能,也就是反应式宣言所定义的响应性、弹性、扩展性以及消息驱动。

Actor 系统可以进行水平扩展,从一个节点扩展到具有众多节点的集群,这样的话就为我们提供了灵活性以应对大规模的负载。除此之外,还可能实现有弹性的扩展,也就是扩展系统的处理能力,不管是手动的还是自动的,都能充分支持系统活动所出现的高峰和低谷状态。

借助 Actor 和 Actor 系统,故障探测和恢复就成为一种架构上的特性,而不是事后才考虑增补上去的功能。我们可以使用内置的 Actor 监管策略来处理下属工作者 Actor 遇到的问题,能够一直向上追溯到 Actor 系统层级,集群节点会积极监控集群的状态,在这种环境中处理故障已经植入到了 Actor 和 Actor 系统的 DNA 之中了。这其实始于 Actor 之间异步交换消息的最基础层级:如果你给我发送一条消息的话,你必须要考虑可能的输出,如果得到了预期的答复该怎么办,没有收到预期答复又该怎么办?这种处理策略会一直延伸到集群中节点的加入和离开。

在设计系统时,按照 Actor 的方式思考在很多方面都会更加直观。Actor 的交互方式对我们来说会更加自然,因为简单来讲,它的交互方式与人类之间的交互方式更加接近。这样的话,我们在设计和实现系统时,能够更加关注核心功能,忽略其他业务不相关的冗余内容。

关于作者

使用Akka Actor和Java 8构建反应式应用Markus Eisele 是一位 Java Champion、前 Java EE 专家组成员、JavaLand 的创始人,在世界范围内的 Java 会议上是享有盛誉的讲师,在企业级 Java 领域颇为知名。他在 Lightbend 担任开发人员,读者可以在 Twitter 上 @myfear 联系到他。

查看英文原文: Building Reactive Applications with Akka Actors and Java 8

评论

发布