Akka Typed 官方文档之随手记
阅读原文时间:2023年07月12日阅读:7

近两年,一直在折腾用FP与OO共存的编程语言Scala,采取以函数式编程为主的方式,结合TDD和BDD的手段,采用Domain Driven Design的方法学,去构造DDDD应用(Domain Driven Design & Distributed)。期间,尝试了大量的框架:业务领域主要适用Akka、Scalaz等框架,UI和基础设施方面主要适用Spring、Kafka等框架,测试则主要适用Spock、ScalaTest、Selenium等框架。

两年的折腾,不能说没有一点收获。在核心的领域模型设计方面,通过尝试用传统Akka的Actor包裹聚合,以自定义的Command和Event进行信息交换,用Free Monad构造表达树实现延迟计算,用Akka Persistence实现Event Store,用Kafka维护命令和事件队列,让我依稀看到了以FP的方式实现ES+CQRS架构应用的曙光。

但如此多的框架不仅让人眼花缭乱,还要学习Scala、Groovy、Java等编程语言以及Gradle等Build工具的DSL,如果再加上Cluster、Cloud和Micro Service,这无疑是一个浩大的工程。所幸,伴随Akka 2.6版本推出的Akka Typed,解决了消息难以管理、框架入侵过甚、节点部署繁复等痛点和难点。特别是在ES和CQRS方面,Akka Typed可以说是提供了一个完整的解决方案。就如同当初LINQ的出现一样,给我打开了一扇新的窗口。

以下,便是我学习Akka Typed官方文档时随看随写留下的笔记(语种Scala,版本2.6.5,在学习过程中已更新至2.6.6),以备查考。内容和路线均以我个人的关注点为主,所以只是节选且难免有失偏颇。

文中提到的RMP是指Vaughn Vernon撰写的Reactive Messaging Patterns with the Actor Model一书。它是我学习传统Actor模型时的重要参考书籍。

在Github上,Paweł Kaczor开发出的akka-ddd框架,是一个非常棒的学习案例。

  • akka-ddd:一个基于Akka和EventStore实现的CQRS架构的DDD框架。
  • ddd-leaven-akka-v2:使用akka-ddd框架实现的电子商务应用。


Actor模型的优点

  • 事件驱动的:Actor相互之间只能用异步的消息进行联系,不会产生直接的耦合。
  • 强壮的隔离性: Actor不象普通的对象那样提供可供调用的API,只会暴露它所支持的消息(通信协议),从而避免了状态的共享。
  • 位置的透明性: ActorSystem使用工厂创建Actor并返回其引用,所以位置无关紧要,Actor也可以启动、停止、移动或者重启,甚至从故障中恢复。
  • 轻量性: 每个Actor通常只需要数百字节的开销,所以一个应用程序完全可能拥有上百万个并发Actor实例。

Akka库与模块一览

  • Actor Library:Akka Typed的核心
  • Remoting:使Actor得以分布部署
  • Cluster及其Sharding、Singleton:集群支持
  • Persistence:使Actor得以将其事件持久化,是实现ES+CQRS的重要组成
  • Distributed Data:在Actor之间共享数据
  • Stream:使Actor支持流处理

示例 Internet of Things (IoT)

https://doc.akka.io/docs/akka/current/typed/guide/tutorial.html

这是一个在物联时代,让用户可以借助遍布家中的温度监测仪,随时掌握屋内各个角落温度情况的应用。

所需的准备工作

  • 在IDEA的Editor/General/Auto Import中排除对*javadsl*的自动导入。

  • 需要的依赖:

    implementation 'org.scala-lang:scala-library:2.13.2'
    implementation 'com.typesafe.akka:akka-actor-typed_2.13:2.6.5'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    
    testImplementation 'org.scalatest:scalatest_2.13:3.1.2'
    testImplementation 'com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5'

Note

  • 打印ActorRef将获得Actor的URL,从中可获悉actor族谱。
  • Actor的生命周期始终保持与其父Actor一致,Actor自身停止时推荐返回Behaviors.stopped,父可用context.stop(childRef)停止叶子Actor。
  • Actor在其生命周期中可触发PostStop之类的信号(参见RMP-47)。
  • 在Actor内部处理消息的是onMessage,处理信号的是onSignal。
  • 使用val child = context.spawn(Behaviors.supervise(child()).onFailure(SupervisorStrategy.restart), name = "child-actor")改变默认的监督策略Stop。传统Akka采取的方式是override supervisorStrategy,用一个闭包声明Decider函数(参见RMP-52)。
  • “协议即API”——在Actor的世界里,协议Protocol取代了传统OOP的接口Interface。利用这些由Command和Event组成的协议,各方的Actor们最终将借助from-replyTo所指向的ActorRef[Command/Event]完成对话。
  • 传统Akka围绕若干Actor的实例构筑整个系统,而Akka Typed则围绕Behavior的实例构筑系统,这是观念上的巨大差别。
  • Command用现在时,可以理解为“我能做什么”,是Actor对外公开的API、是它能提供的服务;而Event则用过去式,表明“我关心什么”,是触发Actor后续动作的事件。
  • 传递消息涉及网络、主机、Actor邮箱、Actor消息处理函数等多个环节,所以非常脆弱。主要的模式有三种(参见RMP-164:确保送达机制):
    • 最多一次:消息在发出去就不用管,也不用保存消息传送的状态。所以消息可能会丢失。这是Actor默认采用的方式,简单而高效。
    • 最少一次:发送后还要保存消息传送状态甚至进行重试,以确保收件人收到消息。所以消息不会丢失,但不能避免重复。
    • 正好一次:除了发件人,还要在收件人保存消息传送状态,以确保收件人不会接到重复的消息。所以消息既不会丢失,也不会重复。
  • Actor保证直连双方的消息会严格按序传送,但不保证不丢失消息。
  • 合理决定Actor的粒度,是Akka设计的核心难点:通常情况下都推荐使用较大的粒度,以降低细粒度引入的复杂度。仅在以下一些情况下方才增加粒度:
    • 需要更多的Actor提供更高的并发性。
    • 场景本身需要复杂的对话。
    • 为减少不同状态之间的耦合度,需要将多个状态分别交由更小的参与者进行独立维护。
    • 为隔离失败,减少不同参与者之间相互的干扰与牵扯,确保失败情况造成最小的负面影响。
  • 使用Dead Watch实现有关Actor停止时的互动。Dead Watch关系不仅限于父子之间,只要知道对方的ActorRef即可。当被观察的Actor停止时,观察者Actor将会收到一个Terminated(actorRefOfWatchee)信号。由于该信号无法附加其他信息,所以推荐做法是将其再包装成一条消息WatcheeTerminated,并在创建被观察者时就用context.watchWith(watcheeActor, WatcheeTerminated(watcheeActor,...))建立观察关系。( WatcheeTerminated会被context自动填充吗?貌似是的。)
  • 遵循CQRS的原则,在Actor里也推荐读写分离,将Query放入单独的Actor,避免对业务Actor的干扰。在示例中,由业务Actor负责创建Query Actor。
  • Query通常都要设置超时,于是引出Actor内建的调度机制,在工厂的Behaviors.setup中使用Behaviors.withTimers定义timers,然后在Actor类里用timers.startSingleTime来调度一条经过给定延时后才发出的消息。
  • 对于跨Actor的消息,通常需要使用context.messageAdapter()来提供一个消息转译器。而转译器最简单的方案就是把消息(通常是响应)包裹在本Actor的某个消息里。

三种无阻塞设计理念

为防止死锁(大家都抢,结果都吃不上)、饥饿(弱肉强食,弱者老是吃不上)和活锁(大家都谦让,结果都不好意思吃),有三种确保死锁无忧的无阻塞设计,其能力由强到弱如下:

  • Wait-Freedom:需要确保每个方法调用都能在有限的步数内完成,能保证不死锁、不饥饿。
  • Lock-Freedom:需要确保某些关键方法调用能在有限的步数内完成,即可保证不死锁,但不能避免饥饿。
  • Obstruction-Freedom:需要确保某些关键方法在特定的时段或条件下能在有限的步数内完成,即可避免死锁。所有的Lock-Freedom都是Obstruction-Freedom的,反之却不尽然。乐观并发控制(Optimistic Concurrency Control)就是典型的Obstruction-Freedom,因为在特定的时点,当只有一名参与者在尝试时,其共享操作即可完成。

Actor System

关于Actor体系设计的最佳实践

整个Actor System的体系,如同一个组织,任务总是逐级下派,命令总是下级服从上级。这与常见的分层软件设计(Layered Software Design)是不同的,后者总是想方设法把问题隐藏和解决在自己那一层,而不是交给上级去处理或与其他人协商。推荐的做法主要包括:

  • 如果一个Actor携带的数据非常重要,那么为了防止自身崩溃,导致数据损失,就应该把危险的任务交给子Actor负责,确保每个Request都由一个独立的子Actor进行处理,并负责好子Actor失败时的善后工作。(这被称作Erlang的“错误内核模式 Error Kernel Pattern”)
  • 如果一个Actor依赖另一个Actor来完成自己的工作,那么就要建立Watch关系,确保接受其委托的代理Actor始终处于有效状态。
  • 如果一个Actor承担了太多不同的职责,那么就把这些职责分派给不同的子Actor去负责。

关于Actor设计的最佳实践

  • Actor应当是位很好的同事,它总是能独立完成自己份内的工作,而且尽可能不打扰别人、不独占资源。即便需要访问某些外部资源,除非是逼不得已,它也不会处于阻塞状态。
  • 不要在Actor之间传递可变对象,应尽可能使用不可变的消息。
  • Actor被设计成包含了行为与状态的容器,所以不要习惯性地使用闭包等语法糖在消息里夹带行为,这将因分享可变状态而产生各种不可控的意外情况。
  • 应用中最顶层的Actor是整个错误内核模式的最核心,它应当只负责启动各个子系统,而不承担其他的业务职责。否则,它会因监督责任过重,影响失败和故障的处理。

协调关机

https://doc.akka.io/docs/akka/current/coordinated-shutdown.html

当应用的所有工作完成后,可以通知/user监督者停止运行,或者调用ActorSystem.terminate方法,从而通过运行协调关机CoordinatedShutdown来停止所有正在运行的Actor。在此过程中,你还可以执行其他一些清理和扫尾工作。

Actor基础

官方文档有专章讲解Actor的方方面面,本章只是介绍基本概念。

Actor的主要作用包括:向熟识的其他Actor发送消息,创建新的Actor,指定处理下一条消息的行为。它作为一个容器,包括有状态State、行为Behavior、邮箱Mailbox、监督策略Supervisor Strategy以及若干的子Actor等内容物,且该容器只能通过指定消息类型的参数化ActorRef进行引用,以确保最基本的隔离:

  • State可以是一台复杂的状态机,也可以只是一个简单的计数值,本质上是由Actor内部维护的一个状态。它将在Actor重启时回复到Actor刚创建时候的样子,或者也可以采用Event Sourcing的方式在重启后恢复到故障发生前的样子。
  • Behavior总是和当前Actor要处理的消息相对应,并且在Actor创建之初总会有一个初始化的行为。而在Actor的生命周期内,Actor的Behavior将可能随Actor的状态变化而变化,由上一个Behavior切换至下一个Behavior。
    • 由于消息总是发送给ActorRef的,而这背后实际对应的是能响应该消息的Behavior,所以这种对应关系必须在Actor创建之时就声明,且Behavior自身也和ActorRef一样是参数化的,这同时也决定了彼此切换的两个Behavior必须是类型相容的,否则便无法与其ActorRef保持一致。( 这便是为什么同一个Actor的Message要从同一个trait派生,以表明它就只处理这一类的消息。)
    • 在回应Command的回复消息里,通常都会包括指向应回复Actor的replyTo引用,所以能以这种方式把第三者引入当前的会话当中。
  • Mailbox按照消息的发送时间将收到的消息排好队,再交给Actor处理。默认的Mailbox是FIFO队列。从Mailbox中出队的消息,总是交由当前的Behavior进行处理。如果Behavior无法处理,就只能作失败处理。
  • Child Actors总是由父Actor监管,在spawn或stop后从context的列表中加入或退出,且这一类异步操作不会造成父Actor的阻塞。
  • Supervisor Strategy用于定义异常发生时的应对策略。默认情况下Akka Typed在触发异常时采取停止Actor的策略,而传统的Akka则采取的重启策略。

在Actor终止后,其持有的所有资源将被回收,剩下未处理的消息将转入Actor System的死信邮箱Dead Letter Mailbox,而后续新传来的消息也将悉数转到System的EventStream作为死信处理。

监管与监测

️ Akka Typed的监管已经重新设计,与传统Akka有显著区别

监管 Supervision

监管的对象是意料之外的失败(Unexpected Failure),而不是校验错误或者try-catch能处理的预期异常。所以,监管是Actor的额外装饰,并不属于Actor消息处理的组成部分。而后者,则是属于Actor业务逻辑的一部分。

当失败发生时,监管的策略包括以下三种:

  • Resume:恢复Actor及其内部状态。
  • Restart:清理Actor内部状态并恢复到Actor刚创建时候的样子。实际上,这是由父Actor使用一个新的Behavior实例替换掉当前失败Child Actor的行为,并用新的Actor接管失败Actor的邮箱,从而实现重启。
  • Stop:永久地停止Actor。

要注意的是,引发失败的那条消息将不会再被处理,而且期间Actor发生的这些变化,在父Actor以外的范围都是不可知的。

生命周期监测 Lifecycle Monitoring

Lifecycle Monitoring通常是指DeathWatch( 之前叫Dead Watch,Watch观察,Monitoring监测,译为观察更为妥帖)。这是除了父子间的监管关系外,Actor之间另一种监测关系。由于Supervision导致的Actor Restart对外是不可知的,所以要用Monitoring在一对Actor之间建立监测关系。但从目的上讲二者是有区别的,Supervision主要为应对失败情形,Monitoring主要为确保另一方知悉本方已终止运行。

使用context.watch(targetActorRef)及unwatch来建立或撤销监测关系。当被监测Actor终止时,监测方Actor将收到一条Terminated消息(不是Signal吗?),而默认的消息处理是抛出一个DeathPactException

要注意的是,监测关系的建立和目标Actor终止时间无关。这就意味着在建立监测关系时,即使目标Actor已经终止,此时监测Actor仍将收到一条Terminated消息。

在消息处理过程中触发异常时的结果

  • 对消息而言:该消息将被丢失,不再退回到邮箱。所以必须自己捕获异常,并建立相应的重试机制,并兼顾到非阻塞的要求。
  • 对邮箱而言:没有任何影响,后续的消息即使Actor被重启也将全部保留。
  • 对Actor而言:如果Actor将异常抛出,则其将被父Actor挂起(Suspend),并根据父Actor的监管策略决定将被恢复、重启还是终止。

容错能力设计

https://doc.akka.io/docs/akka/current/typed/fault-tolerance.html

Actor引用、路径和地址

一些基本的Actor Reference

  • ActorContext.self:指向自己的引用
  • PromiseActorRef:由Ask方式为回调而创建的ActorRef
  • DeadLetterActorRef:默认的死信服务提供的ActorRef
  • EmptyLocalActorRef:当被查找的Actor不存在时Akka使用的ActorRef。它虽等价于DeadLetterActorRef,但因其保留有path,因此该引用仍可被传送,用以与位于相同路径的Actor引用进行比较,以确定后者是否为Actor死亡前获得的。( 有点类似Null Object模式。)

Actor引用与路径之间的区别

  • Reference与Actor同生共死,随着Actor生命结束而失效。所以即便是处于同一Path的新旧2个Actor,也不会有同一个Reference,这也意味着发给旧ActorRef的消息永远不会自动转发发新的ActorRef。
  • Path只是一个代表族谱关系的名字,不存在生存周期,所以永不会失效。
获取Reference的2个主要渠道
  • 直接创建Actor。
  • 通过接线员Receptionist从已注册的Actor里查找。

Actor与Java内存模型

为防止Actor相互可见和消息乱序问题,Akka严格遵守以下两条“发生之前(happens before)”守则:

  • The actor send rule:发件人发送消息将始终先于收件人收到消息。
  • The actor subsequent processing rule:任何一个Actor,在任一时刻,有且只能处理一条消息。处理完成当前消息后,才接着处理下一条消息。

可靠的消息投递

Delivery翻译为“投递”更为妥帖,更好模仿邮政业务的妥投等术语。“送达”侧重结果,“发送"侧重动作本身。

Akka消息投递遵循的两条原则

  • 一条消息最多被投递一次。从业务角度讲,相比命令发成功没有,我们实际更关心对方的回复,有回复即印证对方收到命令了,否则重发命令进行催促即可。
  • 在一对发件人-收件人之间,消息的发送与接收顺序始终保持一致(仅限于用户自定义消息,不包括父子间的系统消息)

Akka消息传递采用的ACK-RETRY协议内容

  • 区分不同的消息及其确认消息的标识机制
  • 在超时前仍未收到预期的确认消息时的重试机制
  • 收件人甄别重复消息并决定丢弃它的检测机制。实现它的第一种方式,是直接采用Akka的妥投模块,改变消息投递模式为最少投递一次。第二种方式,是从业务逻辑的角度,确保消息处理的设计是幂等的。

保证妥投模块

借助Akka Persistence确保消息妥投。(参见RMP-164)

https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html

事件溯源

事件溯源的本质,是执行一条Command,衍生出若干条Event,这些Event既是Command产生的副作用,也是改变对象状态的动因,及其生命周期内不可变的历史。

Akka Persistence对事件溯源提供了直接支持。

https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing-concepts

带确认回执的邮箱

可以通过自定义邮箱,实现消息投递的重试。但这多数仅限于本地通讯的场景,具体原因参见 The Rules for In-JVM (Local) Message Sends

死信

无法妥投的而不是因网络故障等原因被丢失了的消息,将被送往名为/deadLetters的Actor,因此这些消息被称为Dead Letter(参见RMP-161)。产生死信的原因主要是收件人不详或已经死亡,而死信Actor也主要用于系统调试。

由于死信不能通过网络传递,所以要搜集一个集群内的所有死信,则需要一台一台地收集每台主机本地的死信后再进行汇总。通过在系统的Event Stream对象akka.actor.DeadLetter中注册,普通Actor将可以订阅到本地的所有死信消息。

配置

Akka使用Typesafe Config Library管理配置信息。该库独立于Akka,也可用于其他应用的配置信息管理。

Akka的ActorSystem在启动时,所有的配置信息均会通过解析class path根目录处的application.conf/.json/.properties等文件而加载入Config对象,并通过合并所有的reference.conf形成后备配置。

️ 若正在编写的属于Akka应用程序,则Akka配置信息应写入application.conf;若是基于Akka的库,则配置信息应写入reference.conf。并且,Akka不支持从另一个库中覆写(override)当前库中的config property。

配置信息既可以从外部配置文件加载,也可用代码实现运行时解析,还可以利用ConfigFactory.load()从不同地方加载。

import akka.actor.typed.ActorSystem
import com.typesafe.config.ConfigFactory

val customConf = ConfigFactory.parseString("""
  akka.log-config-on-start = on
""")
// ConfigFactory.load sandwiches customConfig between default reference
// config and default overrides, and then resolves it.
val system = ActorSystem(rootBehavior, "MySystem", ConfigFactory.load(customConf))

一个典型的多项目配置示例:

myapp1 {
  akka.loglevel = "WARNING"
  my.own.setting = 43
}
myapp2 {
  akka.loglevel = "ERROR"
  app2.setting = "appname"
}
my.own.setting = 42
my.other.setting = "hello"

相应的配置信息加载代码示例:

val config = ConfigFactory.load()
val app1 = ActorSystem(rootBehavior, "MyApp1", config.getConfig("myapp1").withFallback(config))
val app2 = ActorSystem(rootBehavior, "MyApp2", config.getConfig("myapp2").withOnlyPath("akka").withFallback(config))

Akka的默认配置列表,长达近千行……

Akka Config Checker是用于查找Akka配置冲突的有力工具。


com.typesafe.akka:akka-actor-typed:2.6.5

Actor概貌

Hello World

示例HelloWorld是由HelloWorldMain创建一个HelloWorld(即Greeter),在每次ActorSystem要求HelloWorld SayHello的时候,就创建一个SayHello消息所赋名称对应的HelloWorldBot(所以会有若干个动作相同但名称不同的Bot),然后要求Greeter去向这个Bot问好,最后以Greeter与Bot相互问候数次作为结束。

示例采用了FP风格,Actor的状态和行为均在Singleton对象里定义,采用了类似传统Akka receive()的函数Behaviors.receive { (context, message) => ... },以消息类型作为约束,实现了Actor的互动与组合。在每个Bot里,利用消息的递归重入维持一个Greeting的计数值,届满则用Behaviors.stopped停止响应,否则递归重入。

Behaviors.receive {…}与receiveMessage {…}的区别,在于前者将把context带入闭包。

ChatRoom

这是一个类似聊天室功能的示例,各Actor的职责、定义和联系如下表:

Actor

职责

Behavior类型

Command

Event

Main

创建聊天室ChatRoom和客户Gabbler,并为二者牵线搭桥

NotUsed

ChatRoom

创建并管理一组Session

RoomCommand

  • GetSession(screenName: String, replyTo: ActorRef[SessionEvent])
  • PublishSessionMessage(screenName: String, message: String)

Session

负责播发诸如Gabbler这样的Client的发言

SessionCommand

  • PostMessage(message: String)
  • NotifyClient(msgEvent: MessagePosted)

Gabbler

响应Session

SessionEvent

  • SessionGranted(session: ActorRef[PostMessage])- SessionDenied(reason: String)
  • MessagePosted(screenName: String, message: String)

示例先采用FP风格实现。比如ChatRoom在处理GetSession消息时,最后以chatRoom(ses :: sessions)返回一个新的Behavior实例结束,这里的sessions正是Actor ChatRoom维护的状态。

示例演示了如何限制消息的发件人。比如session及其工厂方法,以及PublishSessionMessage类型均为chatroom私有,外部不可访问;在session Behavior的PostMessage分支中,chatroom的ActorRef通过工厂方法传入session,且类型被限制为ActorRef[PublishSessionMessage]。这样外界只能与ChatRoom通信,然后由ChatRoom在内部将消息转交Session处理。

处理消息的参数来源于工厂方法的传入参数,还是封装在消息的字段里,这个示例也分别给出了样板。 在设计通信协议时,消息定义为Command还是Event,消息的主人是谁,处理消息需要的参数如何传入等等,都是需要考虑的问题。

为实现程序安全退出,示例在Main的Behavior里,设置了Dead Watch观察gabbler,并定义了Behaviors.receiveSignal {…},在收到gabbler处理完MessagePosted消息,因返回Behaviors.stopped而发出的Terminated信号后,以Main自身的Behaviors.stopped作为结束。

Behaviors.setup是一个Behavior的工厂方法,该Behavior的实例将在Actor启动后才创建。而Behaviors.receive虽也是Behavior的工厂方法之一,但Behavior的实例却是在Actor启动的那一刻就同时创建的。

Actor的生命周期

Actor是一个需要显式启停并且自带状态的资源(子Actor与随父Actor虽不共生、但定共死),所以回想在GC出现前需要自己管理内存句柄的时代吧。

Actor System是一个高能耗的系统,所以通常一个应用或者一个JVM里只有一个Actor System。

创建Actor

ActorContext

ActorContext可用作:

  • 孵化(Spawn)子Actor和监管关系。
  • 观察(Watch)其他Actor,并在被观察Actor停止运行时收到Terminated事件(信号)。
  • 记录日志(Logging)。
  • 创建消息适配器Message Adapter。
  • 以Request-Response方式与其他Actor进行交互。
  • 访问Actor自身引用self

ActorContext本身并不是完全线程安全的,主要有以下限制:

  • 不能从Future回调函数的线程访问。
  • 不能在多个Actor实例之间进行共享。
  • 只能在普通的消息处理线程里使用。
孵化子Actor

孵化有两层含义:创建并启动。

孵化协议SpawnProtocol

在使用Behaviors.setup启用SpawnProtocol后,在应用中任何地方都将可以不直接引用context,改用telling或asking方式完成Actor系统的组装。其中,Ask方式的使用类似传统Akka,它将返回Future[ActorRef[XX]]。

留意示例代码里的几处泛型约束,由这些Message串起了应用的流程。

// 启用SpawnProtocol的Actor
object HelloWorldMain {
  def apply(): Behavior[SpawnProtocol.Command] =
    Behaviors.setup { context =>
      // Start initial tasks
      // context.spawn(...)

      SpawnProtocol()
    }
}

implicit val system: ActorSystem[SpawnProtocol.Command] =
  ActorSystem(HelloWorldMain(), "hello")

val greeter: Future[ActorRef[HelloWorld.Greet]] =
  system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))

val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
  context.log.info2("Greeting for {} from {}", message.whom, message.from)
  Behaviors.stopped
}

val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
  system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))

for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
  greeterRef ! HelloWorld.Greet("Akka", replyToRef)
}

停止Actor

Actor可以通过返回Behaviors.stopped作为接替Behavior来停止自身运行。

子Actor可以在处理完当前消息后,被其父Actor使用ActorContext.stop方法强行关停。

所有子Actor都将伴随其父Actor关停而关停。

当Actor停止后将会收到一个PostStop信号,可以用Behaviors.receiveSignal在该信号的处理方法里完成其他的清理扫尾工作,或者提前给Behaviors.stopped传入一个负责扫尾的闭包函数,以实现Actor优雅地关停。( 经测试,前者将先于后者执行。)

观察Actor

由于Terminated信号只带有被观察者的ActorRef,所以为了添加额外的信息,在注册观察关系时可以用context.watchWith(watchee, SpecifiedMessageRef)取代context.watch(watchee)。这样在Terminated信号触发时,观察者将收到预定义的这个SpecifiedMessageRef。

注册、撤销注册和Terminated事件的到来,在时序上并不一定严格遵守先注册后Terminated这样的规则,因为消息是异步的,且有邮箱的存在。

交互模式

Actor之间的交互,只能通过彼此的ActorRef[Message]来进行。这些ActorRef和Message,构成了Protocol的全部,既表明了通信的双方,也表明了Actor能处理的消息、限制了能发给它的消息类型。

要运行示例代码,需要导入日志和Ask模式的支持:

import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.AskPattern._

并且在test/resources文件夹下的logback-test.xml里配置好日志:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
      <level>INFO</level>
    </filter>
    <encoder>
      <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
    </encoder>
  </appender>

  <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender"/>

  <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate">
    <appender-ref ref="STDOUT"/>
  </logger>

  <root level="DEBUG">
    <appender-ref ref="CapturingAppender"/>
  </root>
</configuration>

Fire-Forget

使用异步的、线程安全的tell发出消息,但不保证对方收到消息,也不关心该消息是否被对方处理完毕了。

实现要点

recipient ! message

适用场景
  • 当消息是否被处理无关紧要时;
  • 当对于消息未妥投或未处理的情形不需要处置预案时;
  • 当为了提高吞吐量而需要最小化消息数量时(通常为发送一条响应需要创建两倍数量的消息)。
缺点
  • 如果来信数量超过处理能力,会把邮箱撑破;
  • 发件人不会知晓消息是否丢失了。

Request & Response

发件人发出Request并附上回信地址,并以获得收件人的Response作为消息妥投并被处理的确信。

实现要点

先定义Request和Response,随后sender在发出Request时把self作为replyTo的ActorRef一并传出,方便recipient收到Request后回复Response。

适用场景
  • 当需要订阅对方的Response时。
缺点
  • Actor之间通常不会为彼此通信而专门定义一个Response消息(参见Adapted Response);
  • 如果未收到Response,很难确定究竟是因为Request未妥投还是未被对方处理(参见ask方式);
  • 如果没有Request与Response之间一一对应的甄别机制或上下文,必然毫无用处(参见ask方式,或者每会话子Actor模式)。

Adapted Response

把收件人的Response进行简单封装,即作为发件人可处理的消息类型,从而减少发件人定义Protocol的负担。

实现要点

定义收件人recipient的Response类型,再在sender里定义适配后的Response类型,然后在其Behavior.setup里用context.messageAdapter(rsp => WrappedResponse(rsp))注册一个消息适配器,最后在适配后消息的分支里取出原始的Response(当初由收件人回复的),再处理该消息。适配器能匹配预定义的响应类型及其派生类,总以最晚注册的为有效版本,属于sender且与sender同生命周期,所以当适配器触发异常时将导致其宿主停止。

适用场景
  • 当需要在2种不同协议间进行转译时;
  • 当需要订阅一个Actor返回的多种Response时。
缺点
  • 如果未收到Response,很难确定究竟是因为Request未妥投还是未被对方处理(参见ask方式);
  • 每种Response类型在任何时候只能有一个有效的适配器,所以若干个Actor只能共用一个适配器。
  • 如果没有Request与Response之间一一对应的甄别机制或上下文,必然毫无用处。

在Actor之间使用ask方式实现Request-Response

把Request-Response原本使用的tell方式改为ask,从而能限定Response发回的时间,超时则视作ask失败。

实现要点

在提问人中,通过Behaviors.setup定义隐式的超时时限,以context.ask(recipientRef, request) { case Success(Response(msg)) => AdaptedResponse(msg); case Failure(_) => AdaptedResponse(...) }使用ask方式发出Request,并备妥Response到来时的适配预案(无需再额外象Adapted Response那样注册消息适配器),最后用工厂Behaviors.receiveMessage定义适配后响应消息的处理函数。

适用场景
  • 当提问人需要查询单次的Response时;
  • 当提问人需要根据上一次Request的回复情况来决定下一步怎么做时;
  • 当提问人在指定时限内未收到Response,需要在这种情况下决定重发Request时;
  • 当提问人需要主动跟踪Request被处理的情况,而不是一味追问答复人时(参见RMP-93 Back Pressure回压模式,类似有最大容量限制的阻塞队列,超出的请求将被直接拒绝);
  • 当Protocol在设计时遗漏了必要的上下文信息,但又需要将信息临时添附到会话中时。(这是指提问人在使用context.ask发出Request前,在ask调用语句前放置的相关信息。 这安全吗,如果这些信息被其他代码修改了怎么办?真有必要的话,为什么不放进Request消息的结构里?)
缺点
  • ask一次只能得到一条Response消息;
  • 提问人给自己提问设置了时限,答复人却未必知晓。所以当ask超时那一刻,答复人可能还在处理Request甚至才刚收到正要处理;
  • 很难决策超时设置多长为妥,不当的时限设置可能导致过多的误报。

从Actor系统外部使用ask方式实现Request-Response

在Actor System以外直接用ask向某个Actor提问,最终得到用Future包装好的回复。

实现要点

定义隐式的ActorSystem实例和超时时限,用reply: Future[Response] = recipient.ask(ref => Request).flatMap { case response => Future.successful(response); case another => Future.failed(...) }定义Request-Response的对应关系,再通过system.executionContext启动执行,最后在reply的回调onComplete { case Success(Response) => ...; case Failure(_) => ...}里取出Response区别处理。

适用场景
  • 当需要从Actor系统之外向某个Actor提问时。
缺点
  • 处在不同线程内的Future回调将可能导致各种意外;
  • ask一次只能得到一条Response消息;
  • 提问人给自己提问设置了时限,答复人却未必知晓。

忽略回复

当不关心收件人的回应时,在Request里把回信地址设置为什么也不干的ignoreRef,使模式从Request-Response变为Fire-Forget。

实现要点

发件人发出Request时,把回复地址从context.self改为什么消息也不处理的context.system.ignoreRef

适用场景
  • 当协议里本设定有回复类型,但发件人偶尔不关心Response时。
缺点

由于ignoreRef将忽略所有发给它的消息,所以使用时必须小心。

  • 如果使用不当,将会中断两个Actor的已有联系;
  • 当有外部ask请求发来时,ignoreRef将必定导致超时。
  • Watch ignoreRef将变得没有意义。

自提Future结果

在Actor内部有Future类型的调用时,使用pipeToSelf获取回调结果。尽管直接用Future.onComplete也能取出结果,但会因此将Actor的内部状态暴露给外部线程(在onComplete里能直接访问Actor内部状态),所以并不安全。

实现要点

在Actor内部,先定义Future调用futureResult,再使用context.pipeToSelf(futureResult) { case Success(_) => WrappedResult(...); case Failure(_) => WrappedResult(...)}将回调结果封装入WrappedResult消息,最后在WrappedResult消息分支里再作回应。

适用场景
  • 当需要从Actor里使用Future访问诸如数据库之类的外部资源时;
  • 当Actor依赖Future返回结果才能完成消息处理时;
  • 当需要在Future返回结果时仍保持调用前上下文时。
缺点
  • 引入了额外的消息包装。

每会话子Actor

当一份响应需要综合多个Actor的回复信息才能作出时,由一个父Actor委托多个子Actor搜集信息,待信息齐备后才由父Actor汇总发回给Request的请求人,请求人除与父Actor之间的协议外,对其间细节一概不知。这些子Actor仅活在每次会话期间,故名为“每会话”的子Actor。

实现要点

由父Actor在Behaviors.setup里构造实际承担工作的一组子Actor,在Request处理过程中构造负责组织协调子Actor的管家Actor(其行为类型为Behavior[AnyRef],以保证类型最大程度地兼容)。随后在管家Actor的Behaviors.setup里向子Actor发出Request,接着在Behaviors.receiveMessage里,使用递归反复尝试从子Actor的Response里取出结果(生产条件下应该设定子Actor响应超时)。当所有结果都取出后,由管家Actor利用父Actor传入的replyTo直接向外发出Response,最后停止管家Actor。

这当中的关键点包括:一是在管家Actor里的几处,使用narrow限定Actor的类型T:<U,这也算是一种妥协,确保消息类型为子类型T而非父类型U,从而实现更严谨的约束;二是利用递归配合Option[T]取出子Actor的响应结果。

// 子Actor
case class Keys()
case class Wallet()

// 父Actor
object Home {
  sealed trait Command
  case class LeaveHome(who: String, replyTo: ActorRef[ReadyToLeaveHome]) extends Command
  case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet)

  def apply(): Behavior[Command] = {
    Behaviors.setup[Command] { context =>
      val keyCabinet: ActorRef[KeyCabinet.GetKeys] = context.spawn(KeyCabinet(), "key-cabinet")
      val drawer: ActorRef[Drawer.GetWallet] = context.spawn(Drawer(), "drawer")

      Behaviors.receiveMessage[Command] {
        case LeaveHome(who, replyTo) =>
          context.spawn(prepareToLeaveHome(who, replyTo, keyCabinet, drawer), s"leaving-$who")
          Behaviors.same
      }
    }
  }

  // 管家Actor
  def prepareToLeaveHome(whoIsLeaving: String, replyTo: ActorRef[ReadyToLeaveHome],
                         keyCabinet: ActorRef[KeyCabinet.GetKeys], drawer: ActorRef[Drawer.GetWallet]): Behavior[NotUsed] = {
    Behaviors.setup[AnyRef] { context =>
      var wallet: Option[Wallet] = None
      var keys: Option[Keys] = None

      keyCabinet ! KeyCabinet.GetKeys(whoIsLeaving, context.self.narrow[Keys])
      drawer ! Drawer.GetWallet(whoIsLeaving, context.self.narrow[Wallet])

      Behaviors.receiveMessage {
        case w: Wallet =>
          wallet = Some(w)
          nextBehavior()
        case k: Keys =>
          keys = Some(k)
          nextBehavior()
        case _ =>
          Behaviors.unhandled
      }

      def nextBehavior(): Behavior[AnyRef] = (keys, wallet) match {
        case (Some(w), Some(k)) =>
          // 已取得所有结果
          replyTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
          Behaviors.stopped
        case _ =>
          Behaviors.same
      }
    }.narrow[NotUsed]
  }
}
适用场景
  • 当需要的结果来自于数个Actor的响应汇总时;
  • 为保证至少送达一次而设计重试功能时(委托子Actor反复重试,直到获取结果)。
缺点
  • 由于子Actor是随管家Actor的停止而停止的,因此要切实防止资源泄漏;
  • 增加了实现的复杂度。

一般意义上的响应聚合器

本模式非常类似每会话子Actor模式,由聚合器负责收集子Actor回应的信息,再反馈给委托人Actor。

实现要点

实现与Per Session Child Actor近似,只是在具体代码上更具通用性而已。其中,context.spawnAnonymous是起联结作用的重要步骤。它不仅负责孵化聚合器,还要提前准备向子Actor发出Request的闭包,以及将子Actor回复转换为统一的格式的映射闭包。聚合器被启动后,即开始收集子Actor的回复,收集完成时即告终止。

// 允许子Actor有不同的协议,不必向Aggregator妥协
object Hotel1 {
  final case class RequestQuote(replyTo: ActorRef[Quote])
  final case class Quote(hotel: String, price: BigDecimal)
}

object Hotel2 {
  final case class RequestPrice(replyTo: ActorRef[Price])
  final case class Price(hotel: String, price: BigDecimal)
}

object HotelCustomer {
  sealed trait Command
  final case class AggregatedQuotes(quotes: List[Quote]) extends Command

  // 将子Actor的回复封装成统一的格式
  final case class Quote(hotel: String, price: BigDecimal)

  def apply(hotel1: ActorRef[Hotel1.RequestQuote], hotel2: ActorRef[Hotel2.RequestPrice]): Behavior[Command] = {
    Behaviors.setup[Command] { context =>
      context.spawnAnonymous(
        // 这个传递给聚合器工厂的sendRequests是衔接聚合器及其委托人的关键
        Aggregator[Reply, AggregatedQuotes](
          sendRequests = { replyTo =>
            hotel1 ! Hotel1.RequestQuote(replyTo)
            hotel2 ! Hotel2.RequestPrice(replyTo)
          },
          expectedReplies = 2,
          context.self,
          aggregateReplies = replies =>
            AggregatedQuotes(
              replies
                .map {
                  case Hotel1.Quote(hotel, price) => Quote(hotel, price)
                  case Hotel2.Price(hotel, price) => Quote(hotel, price)
                }
                .sortBy(_.price)
                .toList),
          timeout = 5.seconds))

      Behaviors.receiveMessage {
        case AggregatedQuotes(quotes) =>
          context.log.info("Best {}", quotes.headOption.getOrElse("Quote N/A"))
          Behaviors.same
      }
    }
  }
}

object Aggregator {
  // 用来兼容不同子Actor响应而定义的回复类型
  type Reply = Any

  sealed trait Command
  private case object ReceiveTimeout extends Command
  private case class WrappedReply[R](reply: R) extends Command

  def apply[Reply: ClassTag, Aggregate](
                                         sendRequests: ActorRef[Reply] => Unit,
                                         expectedReplies: Int,
                                         replyTo: ActorRef[Aggregate],
                                         aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate,
                                         timeout: FiniteDuration): Behavior[Command] = {
    Behaviors.setup { context =>
      context.setReceiveTimeout(timeout, ReceiveTimeout)
      val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))

      // 向子Actor发出Request并搜集整理回复信息
      sendRequests(replyAdapter)

      def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] = {
        Behaviors.receiveMessage {
          case WrappedReply(reply: Reply) =>
            val newReplies = replies :+ reply
            if (newReplies.size == expectedReplies) {
              val result = aggregateReplies(newReplies)
              replyTo ! result
              Behaviors.stopped
            } else
            collecting(newReplies)

          case ReceiveTimeout =>
            val aggregate = aggregateReplies(replies)
            replyTo ! aggregate
            Behaviors.stopped
        }
      }

    collecting(Vector.empty)
    }
  }
}
适用场景
  • 当需要以相同的方式,从分布多处的多个Actor获取信息,并以统一方式回复时;
  • 当需要聚合多个回复结果时;
  • 为保证至少送达一次而设计重试功能时。
缺点
  • 越是通用的消息类型,在运行时越缺少约束;
  • 子Actor可能造成资源泄漏;
  • 增加了实现复杂度。

延迟掐尾器 (Latency tail chopping)

这是聚合器模式的一种变形。类似于集群条件下,每个Actor承担着同样的工作职责,当其中某个Actor未按期响应时,将工作从这个迟延的Actor手里交给另一个Actor负责。

实现要点

这个例子不够完整,还需要进一步理解,比如为什么sendRequests需要一个Int参数,如果换作OO风格如何实现。

参考文献 Achieving Rapid Response Times in Large Online Services

  • 使用Behaviors.withTimers设置若干个定时器,由定时器负责向子Actor发出Request。

  • 设置2个超时,其中请求超时是单个Actor完成工作的时限,到期未完成就交出工作;另一个是最迟交付超时,是整个工作完成的时限,到期则说明无法交付委托人的工作。

  • 利用sendRequest函数(类型为(Int, ActorRef[Reply]) => Boolean)联结掐尾器和具体承担工作的Actor。如果sendRequest成功,说明请求已经发送给承担工作的子Actor,那么就调度一条由请求超时限定的单个Request的消息,否则就调度一条由最迟交付超时限定的消息。

    object TailChopping {
    sealed trait Command
    private case object RequestTimeout extends Command
    private case object FinalTimeout extends Command
    private case class WrappedReply[R](reply: R) extends Command

    def apply[Reply: ClassTag](
    sendRequest: (Int, ActorRef[Reply]) => Boolean,
    nextRequestAfter: FiniteDuration,
    replyTo: ActorRef[Reply],
    finalTimeout: FiniteDuration,
    timeoutReply: Reply): Behavior[Command] = {
    Behaviors.setup { context =>
    Behaviors.withTimers { timers =>
    val replyAdapter = context.messageAdapterReply
    sendNextRequest(1)

        def waiting(requestCount: Int): Behavior[Command] = {
          Behaviors.receiveMessage {
            case WrappedReply(reply: Reply) =>
              replyTo ! reply
              Behaviors.stopped
        // 单个任务没能按时完成,另外找人
        case RequestTimeout =&gt;
          sendNextRequest(requestCount + 1)
    
        // 整个工作交付不了,抱歉
        case FinalTimeout =&gt;
          replyTo ! timeoutReply
          Behaviors.stopped
      }
    }
    
    def sendNextRequest(requestCount: Int): Behavior[Command] = {
      if (sendRequest(requestCount, replyAdapter)) {
        timers.startSingleTimer(RequestTimeout, nextRequestAfter)
      } else {
        timers.startSingleTimer(FinalTimeout, finalTimeout)
      }
      waiting(requestCount)
    }
    } }

    }
    }

适用场景
  • 当需要快速响应而必须降低不必要的延迟时;
  • 当工作总是一味重复的内容时。
缺点
  • 因为引入了更多的消息并且要重复多次同样的工作,所以增加了整个系统的负担;
  • 工作的内容必须是幂等和可重复的,否则无法转交;
  • 越是通用的消息类型,在运行时越缺少约束;
  • 子Actor可能造成资源泄漏。

调度消息给自己

使用定时器,在指定时限到期时给自己发送一条指定的消息。

实现要点
  • 使用Behaviors.withTimers为Actor绑定TimerScheduler,该调度器将同样适用于Behaviors的setup、receive、receiveMessage等工厂方法创建的行为。

  • 在timers.startSingleTimer定义并启动定时器,在startSingleTimer设定的超时到期时将会收到预设的消息。

    object Buncher {
    sealed trait Command
    final case class ExcitingMessage(message: String) extends Command
    final case class Batch(messages: Vector[Command])
    private case object Timeout extends Command
    private case object TimerKey

    def apply(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Command] = {
    Behaviors.withTimers(timers => new Buncher(timers, target, after, maxSize).idle())
    }
    }

    class Buncher(
    timers: TimerScheduler[Buncher.Command],
    target: ActorRef[Buncher.Batch],
    after: FiniteDuration,
    maxSize: Int) {

    private def idle(): Behavior[Command] = {
    Behaviors.receiveMessage[Command] { message =>
    timers.startSingleTimer(TimerKey, Timeout, after)
    active(Vector(message))
    }
    }

    def active(buffer: Vector[Command]): Behavior[Command] = {
    Behaviors.receiveMessage[Command] {
    // 收到定时器发来的Timeout消息,缓冲区buffer停止接收,将结果回复给target。
    case Timeout =>
    target ! Batch(buffer)
    idle()

      // 时限到达前,新建缓冲区并把消息存入,直到缓冲区满
      case m =>
        val newBuffer = buffer :+ m
        if (newBuffer.size == maxSize) {
          timers.cancel(TimerKey)
          target ! Batch(newBuffer)
          idle()
        } else
          active(newBuffer)
    }

    }
    }

注意事项
  • 每个定时器都有一个Key,如果启动了具有相同Key的新定时器,则前一个定时器将被取消cancel,并且保证即便旧定时器的到期消息已经放入Mailbox,也不会再触发( 定时器的Key可以自定义吗?旧定时器的到期消息是被框架主动过滤掉的吗?)。
  • 定时器有周期性(PeriodicTimer)和一次性(SingleTimer)两种,它们的参数形式都一样:定时器键TimerKey、调度消息Message和时长Duration。区别在于最后一个参数对应周期时长或是超时时长。( 根据JAPI文档,PeriodicTimer已经作废,取而代之的是指定发送频率的startTimerAtFixedRate或者指定两次消息发送间隔时长的startTimerWithFixedDelay,区别参见下文调度周期的说明。​)
  • TimerScheduler本身是可变的,因为它要执行和管理诸如注册计划任务等副作用。( 所以不是线程安全的?)
  • TimerScheduler与其所属的Actor同生命周期。
  • Behaviors.withTimers也可以在Behaviors.supervise内部使用。当Actor重启时,它将自动取消旧的定时器,并确保新定时器不会收到旧定时器的预设到期消息。
关于调度周期的特别说明

调度周期有两种:一种是FixedDelay:指定前后两次消息发送的时间间隔;一种是FixedRate:指定两次任务执行的时间间隔。如果实难选择,建议使用FixedDelay。( 此处Task等价于一次消息处理过程,可见对Akka里的各种术语还需进一步规范。)

区别主要在于:Delay不会补偿两次消息间隔之间因各种原因导致的延误,前后两条消息的间隔时间是固定的,而不会关心前一条消息是何时才交付处理的;而Rate会对这之间的延误进行补偿,后一条消息发出的时间会根据前一条消息交付处理的时间而确定。( 换句话说,Delay以发出时间计,Rate以开始处理的时间计。)

长远来看,Delay方式下的消息处理的频率通常会略低于指定延迟的倒数,所以更适合短频快的工作;Rate方式下的消息处理频率恰好是指定间隔的倒数,所以适合注重完整执行次数的工作。

️ 在Rate方式下,如果任务延迟超出了预设的时间间隔,则将在前一条消息之后立即发送下一条消息。比如scheduleAtFixedRate的间隔为1秒,而消息处理过程因长时间暂停垃圾回收等原因造成JVM被挂起30秒钟,则ActorSystem将快速地连续发送30条消息进行追赶,从而造成短时间内的消息爆发,所以一般情况下Delay方式更被推崇。

响应集群条件下分片后的Actor

在集群条件下,通常采用的在Request中传递本Shard Actor之ActorRef的方法仍旧适用。但如果该Actor在发出Request后被移动或钝化(指Actor暂时地关闭自己以节约内存,需要时再重启),则回复的Response将会全部发至Dead Letters。此时,引入EntityId作为标识,取代ActorRef以解决之(参见RMP-68)。缺点是无法再使用消息适配器。

️ RMP-77:Actor的内部状态不会随Actor对象迁移,所以需要相应持久化机制来恢复Actor对象的状态。

实现要点

把通常设计中的ActorRef换成EntityId,再使用TypeKey和EntityId定位Actor的引用即可。

object CounterConsumer {
  sealed trait Command
  final case class NewCount(count: Long) extends Command
  val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response")
}

object Counter {
  trait Command
  case object Increment extends Command
  final case class GetValue(replyToEntityId: String) extends Command
  val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-counter")

  private def apply(): Behavior[Command] = Behaviors.setup { context =>
      counter(ClusterSharding(context.system), 0)
  }

  private def counter(sharding: ClusterSharding, value: Long): Behavior[Command] = Behaviors.receiveMessage {
      case Increment =>
        counter(sharding, value + 1)
      case GetValue(replyToEntityId) =>
        val replyToEntityRef = sharding.entityRefFor(CounterConsumer.TypeKey, replyToEntityId)
        replyToEntityRef ! CounterConsumer.NewCount(value)
        Behaviors.same
  }
}

容错能力

默认情况下,当Actor在初始化或处理消息时触发了异常、失败,则该Actor将被停止(️ 传统Akka默认是重启Actor)。

要区别校验错误与失败:校验错误Validate Error意味着发给Actor的Command本身就是无效的,所以将其界定为Protocol规范的内容,由发件人严格遵守,这远甚过收件人发现收到的是无效Command后直接抛出异常。失败Failure则是由于Actor不可控的外因导致的,这通常无法成为双方Protocol的一部分,发件人对此也无能为力。

发生失败时,通常采取“就让它崩”的原则。其思路在于,与其花费心思零敲碎打地在局部进行细粒度的修复和内部状态纠正,不如就让它崩溃停止,然后利用已有的灾备方案,重建一个肯定有效的新Actor重新来过。

监管 Supervise

监管就是一个放置灾备方案的好地方。默认监视策略是在引发异常时停止Actor,如果要自定义此策略,则应在spawn子Actor时,使用Behaviors.supervise进行指定。

策略有许多可选参数,也可以象下面这样进行嵌套,以应对不同的异常类型。

Behaviors.supervise(
  Behaviors.supervise(behavior)
    .onFailure[IllegalStateException](SupervisorStrategy.restart))
  .onFailure[IllegalArgumentException](SupervisorStrategy.stop)

️ 若Actor被重启,则传递给Behaviors.supervise的Behavior内定义的可变状态就需要在类似Behaviors.setup这样的工厂方法中进行初始化。若采用OO风格,则推荐在setup中完成初始化;若采用FP风格,由于通常不存在函数内的可变量,所以无需如此。

完整列表参见API指南:SupervisorStrategy

子Actor在父Actor重启时停止

第二个放置灾备的地方是Behaviors.setup里。因为当父Actor重启时,其Behaviors.setup会再次执行。同时,子Actor会随父Actor重启而停止运行,以防止资源泄漏等问题发生。

注意区别以下两种方式:

方式一:由supervise包裹setup

这种方式下,每当父Actor重启时,就会完全重构一次子Actor,从而总是回到父Actor刚创建时候的样子。

def child(size: Long): Behavior[String] =
  Behaviors.receiveMessage(msg => child(size + msg.length))

def parent: Behavior[String] = {
  Behaviors
    .supervise[String] {
      // setup被supervise包裹,意味着每次父Actor重启,该setup必被重新执行
      Behaviors.setup { ctx =>
        val child1 = ctx.spawn(child(0), "child1")
        val child2 = ctx.spawn(child(0), "child2")

        Behaviors.receiveMessage[String] { msg =>
          val parts = msg.split(" ")
          child1 ! parts(0)
          child2 ! parts(1)
          Behaviors.same
        }
      }
    }
    .onFailure(SupervisorStrategy.restart)
}
方式二:由setup包裹supervise

这种方式下,子Actor不会受到父Actor的重启影响,它们既不会停止,更不会被重建。

def parent2: Behavior[String] = {
  Behaviors.setup { ctx =>
    // 此setup只会在父Actor创建时运行一次
    val child1 = ctx.spawn(child(0), "child1")
    val child2 = ctx.spawn(child(0), "child2")

    Behaviors
      .supervise {
        // 在父Actor重启时,只有这段receiveMessage工厂会被执行
        Behaviors.receiveMessage[String] { msg =>
          val parts = msg.split(" ")
          child1 ! parts(0)
          child2 ! parts(1)
          Behaviors.same
        }
      }
      // 参数false决定了父Actor重启时不会停止子Actor
      .onFailure(SupervisorStrategy.restart.withStopChildren(false))
  }
}

PreRestart信号

第三个放置灾备方案的地方是在PreRestart信号处理过程里。和之前提过的PostStop信号一样,Actor因监测而重启前,会收到一个信号PreRestart信号,方便Actor自身在重启前完成清理扫尾工作。

RMP-47的对传统Akka的描述适用于Akka Typed吗?

  • PreStart:在Actor启动前触发
  • PostStop:在Actor停止后触发
  • PreRestart:在重启Actor前触发,完成任务后会触发PostStop
  • PostRestart:在Actor重启后触发,完成任务后会触发PreStart

把异常当水泡一样顺着遗传树往上传递

在传统Akka里,子Actor触发的异常将被上交给父Actor,由后者决定如何处置。而在Akka Typed里,提供了更丰富的手段处理这种情况。

方法就是由父Actor观察(watch)子Actor,这样当子Actor因失败而停止时,父Actor将会收到附上原因的ChildFailed信号。特别地,ChildFailed信号派生自Terminated,所以如果业务上不需要刻意区分的话,处理Terminated信号即可。

在子Actor触发异常后,如果它的祖先Actor(不仅仅是父亲)没有处理Terminated信号,那么将会触发akka.actor.typed.DeathPactException异常。

示例里用Boss -> MiddleManagement -> Work这样的层级进行了演示。当Boss发出Fail消息后,MiddleManagement将消息转发给Work,Work收到Fail消息后抛出异常。因MiddleManagement和Boss均未对Terminated信号进行处理,因此相继停止。随后Boss按预定策略重启,并顺次重建MiddleManagement和Work,从而确保测试脚本尝试在等候200毫秒后重新发送消息Hello成功。

发现Actor

除了通过创建Actor获得其引用外,还可以通过接线员Receptionist获取Actor的引用。

Receptionist采用了注册会员制,注册过程仍是基于Akka Protocol。在Receptionist上注册后的会员都持有key,方便集群上的其他Actor通过key找到它。当发出Find请求后,Receptionist会回复一个Listing,其中将包括一个由若干符合条件的Actor组成的集合。(️ 同一个key可以对应多个Actor)

由Receptionist维护的注册表是动态的,其中的Actor可能因其停止运行、手动从表中注销或是节点从集群中删除而从表中消失。如果需要关注这种动态变化,可以使用Receptionist.Subscribe(keyOfActor, replyTo)订阅关注的Actor,Receptionist会在注册表变化时将Listing消息发送给replyTo。

️ 切记:上述操作均是基于异步消息的,所以操作不是即时产生结果的。可能发出注销请求了,但Actor还在注册表里。

要点:

  • ServiceKey[Message]("name")创建Key
  • context.system.receptionist ! Receptionist.Register(key, replyTo)注册Actor,用Deregister注销
  • context.system.receptionist ! Receptionist.Subscribe(key, replyTo)订阅注册表变动事件
  • context.system.receptionist ! Receptionist.Find(key, messageAdapter)查找指定key对应的若干Actor

集群的接线员

在集群条件下,一个Actor注册到本地节点的接线员后,其他节点上的接线员也会通过分布式数据广播获悉,从而保证所有节点都能通过ServiceKey找到相同的Actor们。

但需要注意集群条件下与本地环境之间的差别:一是在集群条件下进行的Subscription与Find将只能得到可达Actor的集合。如果需要获得所有的已注册Actor(包括不可达的Actor),则得通过Listing.allServiceInstances获得。二是在集群内各节点之间传递的消息,都需要经过序列化。

接线员的可扩展性

接线员无法扩展到任意数量、也达不到异常高吞吐的接转要求,它通常最多就支持数千至上万的接转量。所以,如果应用确实需要超过Akka框架所能提供的接转服务水平的,就得自己去解决各节点Actor初始化连接的难题。

路由 Route

尽管Actor在任意时刻只能处理一条消息,但这不并妨碍同时有多个Actor处理同一条消息,这便是Akka的路由功能使然。

路由器本身也是一种Actor,但主要职责是转发消息而不是处理消息。与传统Akka一样,Akka Typed的路由也分为两种:池路由池与组路由。

池路由

在池路由方式下,由Router负责构建并管理所有的Routee。当这些作为子actor的Routee终止时,Router将会把它从Router中移除。当所有的Routee都移除后,Router本身停止运行。

示例要点
  • 使用val pool = Routers.pool(poolSize = 4)(Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))定义池路由,其中监管策略应是必不可少的内容,被监管的Worker()即是Routee,poolSize则是池中最多能创建并管理的Routee数目。
  • 接着用val router = ctx.spawn(pool, "worker-pool")创建路由器本身。
  • 之后便可以向路由器router发送消息了。
  • 最终,消息将被路由给所有的routee(此处将有4个Worker的实例负责处理消息)。
  • Behaviors.monitor(monitor, behaviorOfMonitee):将被监测的Monitee收到新消息的同时,将该消息抄送给监测者Monitor

由于Router本身也是Actor,Routee是其子Actor,因此可以指定其消息分发器。( Router中以with开头的API还有不少,需要仔细参考API文档。)

// 指定Routee使用默认的Blocking IO消息分发器
val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking())
// 指定Router使用与其父Actor一致的消息分发器
val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent())
// 使用轮循策略分发消息,保证每个Routee都尽量获得同样数量的任务,这是池路由默认策略
// 示例将获得a-b-a-b顺序的日志
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()

在学习Akka Typed的过程中,应引起重视和警醒的是,不能象传统Akka一样执着于定义Actor的Class或Object本身,而应该紧紧围绕Behavior来思考、认识和设计系统。

在Akka Typed的世界里,包括Behaviors各式工厂在内的许多API均是以Behavior为核心进行设计的。而Behavior又与特定类型的Message绑定,这便意味着Behavior与Protocol进行了绑定,于是消息Message及处理消息的Behavior[Message]便构成了完整的Protocol。

组路由

与池路由不同的是,组路由方式下的Routee均由外界其它Actor产生(自行创建、自行管理),Router只是负责将其编组在一起。

组路由基于ServiceKey和Receptionist,管理着属于同一个key的若干个Routee。虽然这种方式下对Routee构建和监控将更灵活和便捷,但也意味着组路由将完全依赖Receptionist维护的注册表才能工作。在Router启动之初,当注册表还是空白时,发来的消息将作为akka.actor.Dropped扔到事件流中。当注册表中注册有Routee后,若其可达,则消息将顺利送达,否则该Routee将被标记为不可达。

路由策略

  • 轮循策略 Round Robin

    轮循策略将公平调度各Routee,平均分配任务,所以适合于Routee数目不会经常变化的场合,是池路由的默认策略。它有一个可选的参数preferLocalRoutees,为true时将强制只使用本地的Routee(默认值为false)。

  • 随机策略 Random

    随机策略将随机选取Routee分配任务,适合Routee数目可能会变化的场合,是组路由的默认策略。它同样有可靠参数preferLocalRoutees

  • 一致的散列策略 Consistent Hashing

    散列策略将基于一张以传入消息为键的映射表选择Routee。

    参考文献:Consistent Hashing

    该文只展示了如何设计一个ConsistentHash[T]类,并提供add/remove/get等API函数,却没讲怎么使用它,所以需要完整示例!

关于性能

如果把Routee看作CPU的核心,那自然是多多益善。但由于Router本身也是一个Actor,所以其Mailbox的承载能力反而会成为整个路由器的瓶颈,而Akka Typed并未就此提供额外方案,因此遇到需要更高吞吐量的场合则需要自己去解决。

暂存 Stash

Stash(暂存),是指Actor将当前Behavior暂时还不能处理的消息全部或部分缓存起来,等完成初始化等准备工作或是处理完上一条幂等消息后,再切换至匹配的Behavior,从缓冲区取出消息进行处理的过程。

示例要点

trait DB {
  def save(id: String, value: String): Future[Done]
  def load(id: String): Future[String]
}

object DataAccess {
  sealed trait Command
  final case class Save(value: String, replyTo: ActorRef[Done]) extends Command
  final case class Get(replyTo: ActorRef[String]) extends Command
  private final case class InitialState(value: String) extends Command
  private case object SaveSuccess extends Command
  private final case class DBError(cause: Throwable) extends Command

  // 使用Behaviors.withStash(capacity)设置Stash容量
  // 随后切换到初始Behavior start()
  def apply(id: String, db: DB): Behavior[Command] = {
    Behaviors.withStash(100) { buffer =>
      Behaviors.setup[Command] { context =>
        new DataAccess(context, buffer, id, db).start()
      }
    }
  }
}

// 大量使用context.pipeToSelf进行Future交互
class DataAccess(
    context: ActorContext[DataAccess.Command],
    buffer: StashBuffer[DataAccess.Command],
    id: String,
    db: DB) {
  import DataAccess._

  private def start(): Behavior[Command] = {
    context.pipeToSelf(db.load(id)) {
      case Success(value) => InitialState(value)
      case Failure(cause) => DBError(cause)
    }

    Behaviors.receiveMessage {
      case InitialState(value) =>
        // 完成初始化,转至Behavior active()开始处理消息
        buffer.unstashAll(active(value))
      case DBError(cause) =>
        throw cause
      case other =>
        // 正在处理幂等消息,故暂存后续消息
        buffer.stash(other)
        Behaviors.same
    }
  }

  // Behaviors.receiveMessagePartial():从部分消息处理程序构造一个Behavior
  // 该行为将把未定义的消息视为未处理。
  private def active(state: String): Behavior[Command] = {
    Behaviors.receiveMessagePartial {
      case Get(replyTo) =>
        replyTo ! state
        Behaviors.same

      // 处理幂等的Save消息
      case Save(value, replyTo) =>
        context.pipeToSelf(db.save(id, value)) {
          case Success(_)     => SaveSuccess
          case Failure(cause) => DBError(cause)
        }
        // 转至Behavior saving(),反馈幂等消息处理结果
        saving(value, replyTo)
    }
  }

  private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = {
    Behaviors.receiveMessage {
      case SaveSuccess =>
        replyTo ! Done
        // 幂等消息处理结束并已反馈结果,转至Behavior active()开始处理下一条消息
        buffer.unstashAll(active(state))
      case DBError(cause) =>
        throw cause
      case other =>
        buffer.stash(other)
        Behaviors.same
    }
  }
}

注意事项

  • Stash所使用的缓冲区由Akka提供,其大小一定要在Behavior对象创建前进行设定,否则过多的消息被暂存将导致内存溢出,触发StashOverflowException异常。所以在往缓冲区里暂存消息前,应当使用StashBuffer.isFull提前进行检测。
  • unstashAll()将会停止Actor响应新的消息,直到当前暂存的所有消息被处理完毕,但这有可能因长时间占用消息处理线程而导致其他Actor陷入饥饿状态。为此,可改用方法unstash(numberOfMessages),确保一次只处理有限数量的暂存消息。

Behavior是一台有限状态机

有限状态机:当前处于状态S,发生E事件后,执行操作A,然后状态将转换为S’。

这部分内容对应传统Akka的FSM:Finite State Machine,可参考RMP及下文

参考示例:哲学家用餐问题,及其解析: Dining Hakkers

object Buncher {
  // 把FSM里驱动状态改变的事件,都用Message代替了
  sealed trait Event
  final case class SetTarget(ref: ActorRef[Batch]) extends Event
  final case class Queue(obj: Any) extends Event
  case object Flush extends Event
  private case object Timeout extends Event

  // 状态
  sealed trait Data
  case object Uninitialized extends Data
  final case class Todo(target: ActorRef[Batch], queue: immutable.Seq[Any]) extends Data
  final case class Batch(obj: immutable.Seq[Any])

  // 初始状态为Uninitialized,对应初始的Behavior为idle()
  def apply(): Behavior[Event] = idle(Uninitialized)

  private def idle(data: Data): Behavior[Event] =
    Behaviors.receiveMessage[Event] {
      message: Event => (message, data) match {
        case (SetTarget(ref), Uninitialized) =>
          idle(Todo(ref, Vector.empty))
        case (Queue(obj), t @ Todo(_, v)) =>
          active(t.copy(queue = v :+ obj))
        case _ =>
          Behaviors.unhandled
      }
  }

  // 处于激活状态时,对应Behavior active()
  private def active(data: Todo): Behavior[Event] =
    Behaviors.withTimers[Event] { timers =>
      // 设置超时条件
      timers.startSingleTimer(Timeout, 1.second)
      Behaviors.receiveMessagePartial {
        case Flush | Timeout =>
          data.target ! Batch(data.queue)
          idle(data.copy(queue = Vector.empty))
        case Queue(obj) =>
          active(data.copy(queue = data.queue :+ obj))
      }
  }
}

在Akka Typed里,由于Protocol和Behavior的出现,简化了传统Akka中有限状态机FSM的实现。不同的状态下,对应不同的Behavior,响应不同的请求,成为Akka Typed的典型作法,这在此前的大量示例里已经有所展示。

协调关机 Coordinated Shutdown

CoordinatedShutdown是一个扩展,通过提前注册好的任务Task,可以在系统关闭前完成一些清理扫尾工作,防止资源泄漏等问题产生。

关闭过程中,默认的各阶段(Phase)都定义在下面这个akka.coordinated-shutdown.phases里,各Task则后续再添加至相应的阶段中。

在application.conf配置里,可以通过定义不同的depends-on来覆盖缺省的设置。其中,before-service-unbindbefore-cluster-shutdownbefore-actor-system-terminate是最常被覆盖的。

各Phase原则上按照被依赖者先于依赖者的顺序执行,从而构成一个有向无环图(Directed Acyclic Graph,DAG),最终所有Phase按DAG的拓扑顺序执行。

# CoordinatedShutdown is enabled by default and will run the tasks that
# are added to these phases by individual Akka modules and user logic.
#
# The phases are ordered as a DAG by defining the dependencies between the phases
# to make sure shutdown tasks are run in the right order.
#
# In general user tasks belong in the first few phases, but there may be use
# cases where you would want to hook in new phases or register tasks later in
# the DAG.
#
# Each phase is defined as a named config section with the
# following optional properties:
# - timeout=15s: Override the default-phase-timeout for this phase.
# - recover=off: If the phase fails the shutdown is aborted
#                and depending phases will not be executed.
# - enabled=off: Skip all tasks registered in this phase. DO NOT use
#                this to disable phases unless you are absolutely sure what the
#                consequences are. Many of the built in tasks depend on other tasks
#                having been executed in earlier phases and may break if those are disabled.
# depends-on=[]: Run the phase after the given phases
phases {

  # The first pre-defined phase that applications can add tasks to.
  # Note that more phases can be added in the application's
  # configuration by overriding this phase with an additional
  # depends-on.
  before-service-unbind {
  }

  # Stop accepting new incoming connections.
  # This is where you can register tasks that makes a server stop accepting new connections. Already
  # established connections should be allowed to continue and complete if possible.
  service-unbind {
    depends-on = [before-service-unbind]
  }

  # Wait for requests that are in progress to be completed.
  # This is where you register tasks that will wait for already established connections to complete, potentially
  # also first telling them that it is time to close down.
  service-requests-done {
    depends-on = [service-unbind]
  }

  # Final shutdown of service endpoints.
  # This is where you would add tasks that forcefully kill connections that are still around.
  service-stop {
    depends-on = [service-requests-done]
  }

  # Phase for custom application tasks that are to be run
  # after service shutdown and before cluster shutdown.
  before-cluster-shutdown {
    depends-on = [service-stop]
  }

  # Graceful shutdown of the Cluster Sharding regions.
  # This phase is not meant for users to add tasks to.
  cluster-sharding-shutdown-region {
    timeout = 10 s
    depends-on = [before-cluster-shutdown]
  }

  # Emit the leave command for the node that is shutting down.
  # This phase is not meant for users to add tasks to.
  cluster-leave {
    depends-on = [cluster-sharding-shutdown-region]
  }

  # Shutdown cluster singletons
  # This is done as late as possible to allow the shard region shutdown triggered in
  # the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down.
  # This phase is not meant for users to add tasks to.
  cluster-exiting {
    timeout = 10 s
    depends-on = [cluster-leave]
  }

  # Wait until exiting has been completed
  # This phase is not meant for users to add tasks to.
  cluster-exiting-done {
    depends-on = [cluster-exiting]
  }

  # Shutdown the cluster extension
  # This phase is not meant for users to add tasks to.
  cluster-shutdown {
    depends-on = [cluster-exiting-done]
  }

  # Phase for custom application tasks that are to be run
  # after cluster shutdown and before ActorSystem termination.
  before-actor-system-terminate {
    depends-on = [cluster-shutdown]
  }

  # Last phase. See terminate-actor-system and exit-jvm above.
  # Don't add phases that depends on this phase because the
  # dispatcher and scheduler of the ActorSystem have been shutdown.
  # This phase is not meant for users to add tasks to.
  actor-system-terminate {
    timeout = 10 s
    depends-on = [before-actor-system-terminate]
  }
}
  • 通常应在系统启动后尽早注册任务,否则添加得太晚的任务将不会被运行。

  • 向同一个Phase添加的任务将并行执行,没有先后之分。

  • 下一个Phase会通常会等待上一个Phase里的Task都执行完毕或超时后才会启动。可以为Phase配置recover = off,从而在Task失败或超时后,中止整个系统的关机过程。

  • 通常情况下,使用CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { ... }向Phase中添加Task,此处的名称主要用作调试或者日志。

  • 使用CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () => Future { ... } }添加可取消的Task,之后可以用c.cancel()取消Task的执行。

  • 通常情况下,不需要Actor回复Task已完成的消息,因为这会拖慢关机进程,直接让Actor终止运行即可。如果要关注该Task何时完成,可以使用CoordinatedShutdown(system).addActorTerminationTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName", someActor, Some("stop"))添加任务,并且给这个someActor发送一条消息,随后watch该Actor的终止便可知晓Task完成情况。

  • 使用ActorSystem.terminate()val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)可以启动协调关机过程,且多次调用也只会执行一次。

  • ActorSystem会在最后一个Phase里的Task全部执行完毕后关闭,但JVM不一定会停止,除非所有守护进程均已停止运行。通过配置akka.coordinated-shutdown.exit-jvm = on,可以强制一并关闭JVM。

  • 在集群条件下,当节点正在从集群中离开或退出时,将会自动触发协调关机。而且系统会自动添加Cluster Singleton和Cluster Sharding等正常退出群集的任务。

  • 默认情况下,当通过杀死SIGTERM信号(Ctrl-C对SIGINT不起作用)终止JVM进程时,CoordinatedShutdown也将运行,该默认行为可以通过配置akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off禁用之。

  • 可以使用CoordinatedShutdown(system).addJvmShutdownHook { ... }添加JVM Hook任务,以保证其在Akka关机前得以执行。

  • 在测试时,如果不希望启用协调关机,可以采用以下配置禁用之:

    # Don't terminate ActorSystem via CoordinatedShutdown in tests
    akka.coordinated-shutdown.terminate-actor-system = off
    akka.coordinated-shutdown.run-by-actor-system-terminate = off
    akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
    akka.cluster.run-coordinated-shutdown-when-down = off

消息分发器 Dispatchers

MessageDispatcher是Akka的心脏,是它驱动着整个ActorSystem的正常运转,并且为所有的Actor提供了执行上下文ExecutionContext,方便在其中执行代码、进行Future回调等等。

  • 默认Dispatcher

    每个ActorSystem都有一个默认的Dispatcher,可以在akka.actor.default-dispatcher配置中细调,其默认的执行器Executor类型为 “fork-join-executor”,这在绝大多数情况下都能提供优越的性能,也可以在akka.actor.default-dispatcher.executor一节中进行设置。

  • 内部专用Dispatcher

    为保护Akka各模块内部维护的Actor,有一个独立的内部专用Dispatcher。它可以在akka.actor.internal-dispatcher配置中细调,也可以设置akka.actor.internal-dispatcher为其他Dispatcher名字(别名)来替换之。

  • 查找指定的Dispatcher

    Dispatcher均实现了ExecutionContext接口,所以象这样val executionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))就可加载不同的Dispatcher。

  • 选择指定的Dispatcher

    // 为新的Actor使用默认Dispatcher
    context.spawn(yourBehavior, "DefaultDispatcher")
    context.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default())
    
    // 为不支持Future的阻塞调用(比如访问一些老式的数据库),使用blocking Dispatcher
    context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking())
    
    // 使用和父Actor一样的Dispatcher
    context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent())
    
    // 从配置加载指定的Dispatcher
    context.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
    
    
    your-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size = 32
      }
      throughput = 1
    }

Dispatcher的两种类型

对比

Dispatcher

PinnedDispatcher

线程池

事件驱动,一组Actor共用一个线程池。

每个Actor都拥有专属的一个线程池,池中只有一个线程。

可否被共享

没有限制

不可共享

邮箱

每个Actor拥有一个

每个Actor拥有一个

适用场景

是Akka默认的Dispatcher, 支持隔板

支持隔板

驱动

java.util.concurrent.ExecutorService驱动。使用fork-join-executor、thread-pool-executor或基于akka.dispatcher.ExecutorServiceConfigurator实现的完全限定类名,可指定其使用的executor。

由任意的akka.dispatch.ThreadPoolExecutorConfigurator驱动,默认执行器为thread-pool-executor

一个Fork-Join执行器示例:

my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

自定义Dispatcher以尽可能避免阻塞

讲解阻塞危害的参考视频:Managing Blocking in Akka video,及其示例代码:https://github.com/raboof/akka-blocking-dispatcher

在使用默认Dispatcher的情况下,多个Actor共用一个线程池,所以当其中一些Actor因被阻塞而占用线程后,有可能导致可用线程耗尽,而使其他同组的Actor陷入线程饥饿状态。

监测工具推荐:YourKit,VisualVM,Java Mission Control,Lightbend出品的Thread Starvation Detector等等。

示例使用了两个Actor作对比,在(1 to 100)的循环里,新建的一个Actor在消息处理函数中sleep 5秒,导致同时新建的另一个Actor无法获得线程处理消息而卡住。

针对上述情况,首先可能想到的象下面这样,用Future来封装这样的长时调用,但这样的想法实际上过于简单。因为仍旧使用了由全体Actor共用的ExecutionContext作为Future的执行上下文,所以随着应用程序的负载不断增加,内存和线程都会飞快地被耗光。

object BlockingFutureActor {
  def apply(): Behavior[Int] =
    Behaviors.setup { context =>
      implicit val executionContext: ExecutionContext = context.executionContext

      Behaviors.receiveMessage { i =>
        triggerFutureBlockingOperation(i)
        Behaviors.same
      }
    }

  def triggerFutureBlockingOperation(i: Int)(implicit ec: ExecutionContext): Future[Unit] = {
    println(s"Calling blocking Future: $i")
    Future {
      Thread.sleep(5000) //block for 5 seconds
      println(s"Blocking future finished $i")
    }
  }
}

正确的解决方案,是为所有的阻塞调用提供一个独立的Dispatcher,这种技巧被称作“隔板 bulk-heading”或者“隔离阻塞 isolating blocking”。

在application.conf里对Dispatcher进行如下配置,其中thread-pool-executor.fixed-pool-size的数值可根据实际负载情况进行微调:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  throughput = 1
}

随后,使用该配置替换掉前述代码第4行加载的默认Dispatcher

implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-blocking-dispatcher"))

以上便是处理响应性应用程序中阻塞问题的推荐方法。对有关Akka HTTP中阻塞调用的类似讨论,请参阅 Handling blocking operations in Akka HTTP

其他一些建议:

  • 在Future中进行阻塞调用,但务必要确保任意时刻此类调用的数量上限,否则大量的此类任务将耗尽您的内存或线程。
  • 在Future中进行阻塞调用,为线程池提供一个线程数上限,该上限要匹配运行应用程序的硬件平台条件。
  • 专门使用一个线程来管理一组阻塞资源,例如用一个NIO选择器来管理多个通道,并在阻塞资源触发特定事件时作为Actor消息进行分发调度。
  • 使用路由器来管理进行阻塞调用的Actor,并确保相应配置足够大小的线程池。这种方案特别适用于访问传统数据库这样的单线程资源,使每个Actor对应一个数据库连接,由一个路由器进行集中管理。至于Actor的数量,则由数据库部署平台的硬件条件来决定。
  • 使用Akka的任务Task在application.conf中配置线程池,它,再通过ActorSystem进行实例化。

其他一些常见的Dispatcher配置

  • 固定的线程池大小

    blocking-io-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size = 32
      }
      throughput = 1
    }
  • 根据CPU核心数设置线程池大小

    my-thread-pool-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "thread-pool-executor"
      # Configuration for the thread pool
      thread-pool-executor {
        # minimum number of threads to cap factor-based core number to
        core-pool-size-min = 2
        # No of core threads ... ceil(available processors * factor)
        core-pool-size-factor = 2.0
        # maximum number of threads to cap factor-based number to
        core-pool-size-max = 10
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 100
    }
  • PinnedDispatcher

    my-pinned-dispatcher {
      executor = "thread-pool-executor"
      type = PinnedDispatcher
    }

    由于Actor每次获得的不一定都是同一个线程,所以当确有必要时,可以设置thread-pool-executor.allow-core-timeout=off,以确保始终使用同一线程。

  • 设置线程关闭超时

    无论是fork-join-executor还是thread-pool-executor,线程都将在无人使用时被关闭。如果想设置一个稍长点的时间,可进行如下调整。特别是当该Executor只是作为执行上下文使用(比如只进行Future调用),而没有关联Actor时更应如此,否则默认的1秒将会导致整个线程池过度频繁地被关闭。

    my-dispatcher-with-timeouts {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size = 16
        # Keep alive time for threads
        keep-alive-time = 60s
        # Allow core threads to time out
        allow-core-timeout = off
      }
      # How long time the dispatcher will wait for new actors until it shuts down
      shutdown-timeout = 60s
    }

邮箱 Mailbox

邮箱是Actor接收待处理消息的队列,默认是没有容量上限的。但当Actor的处理消息的速度低于消息送达的速度时,就有必要设置邮箱的容量上限了,这样当有更多消息到达时,将被转投至系统的DeadLetter。

选择特定的邮箱

如果没有特别指定,将使用默认的邮箱SingleConsumerOnlyUnboundedMailbox。否则在context.spawn时指定,且配置可从配置文件中动态加载。

context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100))

val props = MailboxSelector.fromConfig("my-app.my-special-mailbox")
context.spawn(childBehavior, "from-config-mailbox-child", props)


my-app {
  my-special-mailbox {
    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
  }
}

Akka提供的邮箱

  • 非阻塞类型的邮箱

    邮箱

    内部实现

    有否上限

    配置名称

    SingleConsumerOnlyUnboundedMailbox(默认)

    一个多生产者-单消费者队列,不能与BalancingDispatcher搭配

    akka.dispatch.SingleConsumerOnlyUnboundedMailbox

    UnboundedMailbox

    一个java.util.concurrent.ConcurrentLinkedQueue

    unbounded 或 akka.dispatch.UnboundedMailbox

    NonBlockingBoundedMailbox

    一个高效的多生产者-单消费者队列

    akka.dispatch.NonBlockingBoundedMailbox

    UnboundedControlAwareMailbox
    akka.dispatch.ControlMessage派生的控制消息将被优先投递

    两个java.util.concurrent.ConcurrentLinkedQueue

    akka.dispatch.UnboundedControlAwareMailbox

    UnboundedPriorityMailbox
    不保证同优先级消息的投递顺序

    一个java.util.concurrent.PriorityBlockingQueue

    akka.dispatch.UnboundedPriorityMailbox

    UnboundedStablePriorityMailbox
    严格按FIFO顺序投递同优先级消息

    一个使用akka.util.PriorityQueueStabilizer包装的java.util.concurrent.PriorityBlockingQueue

    akka.dispatch.UnboundedStablePriorityMailbox

  • 阻塞类型的邮箱:若mailbox-push-timeout-time设置为非零时将阻塞,否则不阻塞

    邮箱

    内部实现

    有否上限

    配置名称

    BoundedMailbox

    一个java.util.concurrent.LinkedBlockingQueue

    bounded 或 akka.dispatch.BoundedMailbox

    BoundedPriorityMailbox
    不保证同优先级消息的投递顺序

    一个使用akka.util.BoundedBlockingQueue包装的java.util.PriorityQueue

    akka.dispatch.BoundedPriorityMailbox

    BoundedStablePriorityMailbox
    严格按FIFO顺序投递同优先级消息

    一个使用akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue包装的java.util.PriorityQueue

    akka.dispatch.BoundedStablePriorityMailbox

    BoundedControlAwareMailbox
    akka.dispatch.ControlMessage派生的控制消息将被优先投递

    两个java.util.concurrent.ConcurrentLinkedQueue,且当塞满时将阻塞

    akka.dispatch.BoundedControlAwareMailbox

自定义邮箱

如果要自己实现邮箱,则需要从MailboxType派生。该类的构造函数有2个重要参数:一个是ActorSystem.Settings对象,一个是Config的节。后者需要在Dispatcher或者Mailbox的配置中,修改mailbox-type为自定义MailboxType的完全限定名。

标记用trait的需求映射指的是什么?是必须的吗?

// Marker trait used for mailbox requirements mapping
trait MyUnboundedMessageQueueSemantics

object MyUnboundedMailbox {
  // This is the MessageQueue implementation
  class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics {

    private final val queue = new ConcurrentLinkedQueue[Envelope]()

    // these should be implemented; queue used as example
    def enqueue(receiver: ActorRef, handle: Envelope): Unit =
      queue.offer(handle)
    def dequeue(): Envelope = queue.poll()
    def numberOfMessages: Int = queue.size
    def hasMessages: Boolean = !queue.isEmpty
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
      while (hasMessages) {
        deadLetters.enqueue(owner, dequeue())
      }
    }
  }
}

// This is the Mailbox implementation
class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._

  // This constructor signature must exist, it will be called by Akka
  def this(settings: ActorSystem.Settings, config: Config) = {
    // put your initialization code here
    this()
  }

  // The create method is called to create the MessageQueue
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new MyMessageQueue()
}

测试

com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5

org.scalatest:scalatest_2.13:3.1.1

测试可以是在真实的ActorSystem上进行的异步测试,也可以是在BehaviorTestKit工具提供的测试专用线程上进行的同步测试。

异步测试

  • ScalaTest提供了ActorTestKit作为真实ActorSystem的替代品,通过混入BeforeAndAfterAll,覆写其afterAll() = testKit.shutdownTestKit(),可实现测试后关闭ActorSystem。

    通过使用一个固定的testKit实例,可以直接spawn/stop某个Actor(可以是匿名的Actor),并以这种方式创建临时的Mock Actor,用以测试某个Actor的行为是否符合预期。

  • 同时,ScalaTest提供TestProbe用于接受Actor的回复,并附上一组probe.expectXXX对Actor的活动进行断言。

  • 当然,更简便的方式便是继承ScalaTestWithActorTestKit并混入AnyFeatureSpecLike之类的trait,从而将注意力完全集中在测试用例本身,而不用关心ActorSystem如何关闭之类的细枝末节。

  • ScalaTest的配置从application-test.conf中加载,否则将会自动加载Akka库自带的reference.conf配置,而不是应用程序自定义的application.conf。同时,ScalaTest支持用ConfigFactory.load()加载自定义配置文件,或用parseString()直接解决配置字符串,若再附以withFallback()将实现一次性完成配置及其后备的加载。

    ConfigFactory.parseString("""
      akka.loglevel = DEBUG
      akka.log-config-on-start = on
      """).withFallback(ConfigFactory.load())
  • 为测试与时间线关系密切的Actor活动,ScalaTest提供了手动的定时器ManualTime,可以象下面这样测试指定时间点的活动:

    class ManualTimerExampleSpec
        extends ScalaTestWithActorTestKit(ManualTime.config)
        with AnyWordSpecLike
        with LogCapturing {
    
      val manualTime: ManualTime = ManualTime()
    
      "A timer" must {
        "schedule non-repeated ticks" in {
          case object Tick
          case object Tock
      val probe = TestProbe[Tock.type]()
      val behavior = Behaviors.withTimers[Tick.type] { timer =&gt;
        // 10ms后才会调度消息
        timer.startSingleTimer(Tick, 10.millis)
        Behaviors.receiveMessage { _ =&gt;
          probe.ref ! Tock
          Behaviors.same
        }
      }
    
      spawn(behavior)
    
      // 在9ms时还没有任何消息
      manualTime.expectNoMessageFor(9.millis, probe)
    
      // 再经过2ms后,收到Tock消息
      manualTime.timePasses(2.millis)
      probe.expectMessage(Tock)
    
      // 在10ms之后再没有消息传来
      manualTime.expectNoMessageFor(10.seconds, probe)
    }
    } }
  • 为了验证Actor是否发出了某些日志事件,ScalaTest提供了LoggingTestKit。

    LoggingTestKit
      .error[IllegalArgumentException]
      .withMessageRegex(".*was rejected.*expecting ascii input.*")
      .withCustom { event =>
        event.marker match {
          case Some(m) => m.getName == "validation"
          case None    => false
        }
      }
      .withOccurrences(2)
      .expect {
        ref ! Message("hellö")
        ref ! Message("hejdå")
      }
  • 为了集中有序输出日志信息,ScalaTest提供了LogCapturing,把日志和控制台输出信息整理在一起,在测试失败的时候才一次性输出,方便分析错误原因。具体示例参见交互模式一章。

同步测试

  • ScalaTest提供BehaviorTestKit用于Actor的同步测试。

    val testKit = BehaviorTestKit(Hello())
    // 创建子Actor
    testKit.run(Hello.CreateChild("child"))
    testKit.expectEffect(Spawned(childActor, "child"))
    // 创建匿名的子Actor
    testKit.run(Hello.CreateAnonymousChild)
    testKit.expectEffect(SpawnedAnonymous(childActor))
    
    // 用一个InBox模拟Mailbox,方便测试收到的消息
    val inbox = TestInbox[String]()
    testKit.run(Hello.SayHello(inbox.ref))
    inbox.expectMessage("hello")
    // 测试子Actor的InBox
    testKit.run(Hello.SayHelloToChild("child"))
    val childInbox = testKit.childInbox[String]("child")
    childInbox.expectMessage("hello")
    // 测试匿名子Actor的InBox
    testKit.run(Hello.SayHelloToAnonymousChild)
    val child = testKit.expectEffectType[SpawnedAnonymous[String]]
    val childInbox = testKit.childInbox(child.ref)
    childInbox.expectMessage("hello stranger")
  • 在以下一些情况下,不推荐使用BehaviorTestKit(未来可能会逐步改善):

    • 涉及Future及类似的带异步回调的场景
    • 涉及定时器或消息定时调度的场景
    • 涉及EventSourcedBehavior的场景
    • 涉及必须实测的Stubbed Actor的场景
    • 黑盒测试
  • 除了Spawned和SpawnedAnonymous,BehaviorTestKit还支持以下一些Effect:

    • SpawnedAdapter
    • Stopped
    • Watched
    • WatchedWith
    • Unwatched
    • Scheduled
  • BehaviorTestKit也支持日志验证

    val testKit = BehaviorTestKit(Hello())
    val inbox = TestInbox[String]("Inboxer")
    testKit.run(Hello.LogAndSayHello(inbox.ref))
    testKit.logEntries() shouldBe Seq(CapturedLogEvent(Level.INFO, "Saying hello to Inboxer"))

Akka之Classic与Typed共存

现阶段的Akka Typed的内部,实质还是由传统Akka实现的,但未来将会有所改变。目前两类Akka有以下一些共存的方式:

  • Classic ActorSystem可以创建Typed Actor
  • Typed Actor与Classic Actor可以互发消息
  • Typed Actor与Classic Actor可以相互建立监管或观察关系
  • Classic Actor可以转换为Typed Actor

在导入命名空间时使用别名,以示区别:import akka.{ actor => classic }

️ 在监管策略方面,由于Classic默认为重启,而Typed为停止,所以Akka根据Child来决定实际策略。即如果被创建的Child是Classic,则默认采取重启策略,否则采取停止策略。

从Classic到Typed

// 导入Typed的Adapter几乎必不可少
import akka.actor.typed.scaladsl.adapter._

val system = akka.actor.ActorSystem("ClassicToTypedSystem")
val typedSystem: ActorSystem[Nothing] = system.toTyped

val classicActor = system.actorOf(Classic.props())
class Classic extends classic.Actor with ActorLogging {
  // context.spawn is an implicit extension method
  val second: ActorRef[Typed.Command] = context.spawn(Typed(), "second")

  // context.watch is an implicit extension method
  context.watch(second)

  // self can be used as the `replyTo` parameter here because
  // there is an implicit conversion from akka.actor.ActorRef to
  // akka.actor.typed.ActorRef
  // An equal alternative would be `self.toTyped`
  second ! Typed.Ping(self)

  override def receive = {
    case Typed.Pong =>
      log.info(s"$self got Pong from ${sender()}")
      // context.stop is an implicit extension method
      context.stop(second)
    case classic.Terminated(ref) =>
      log.info(s"$self observed termination of $ref")
      context.stop(self)
  }
}

从Typed到Classic

val system = classic.ActorSystem("TypedWatchingClassic")
val typed = system.spawn(Typed.behavior, "Typed")

object Typed {
  final case class Ping(replyTo: akka.actor.typed.ActorRef[Pong.type])
  sealed trait Command
  case object Pong extends Command

  val behavior: Behavior[Command] =
    Behaviors.setup { context =>
      // context.actorOf is an implicit extension method
      val classic = context.actorOf(Classic.props(), "second")

      // context.watch is an implicit extension method
      context.watch(classic)

      // illustrating how to pass sender, toClassic is an implicit extension method
      classic.tell(Typed.Ping(context.self), context.self.toClassic)

      Behaviors
        .receivePartial[Command] {
          case (context, Pong) =>
            // it's not possible to get the sender, that must be sent in message
            // context.stop is an implicit extension method
            context.stop(classic)
            Behaviors.same
        }
        .receiveSignal {
          case (_, akka.actor.typed.Terminated(_)) =>
            Behaviors.stopped
        }
    }
}

函数式与面向对象风格指南

区别

函数式编程风格

面向对象风格

组成结构

Singleton Object

Companion Object + AbstractBehavior[Message]派生类

工厂apply()

在工厂方法里完成Behavior定义及其他所有工作

在Companion Object工厂方法里采取Behaviors.setup {context => new MyActor(context)}这样的方式构造初始化的Behavior,然后把context和其他必要参数注入给类的构造函数,完成Behavior的链接

Actor扩展类

没有派生,所以只能用Behaviors.same

从AbstractBehavior[Message]派生实例,所以可以使用this等同于Behaviors.same

Behavior

在Singleton Object里给Behaviors.receive这样的工厂方法传入一个函数(闭包)进行定义

覆写派生类的onMessage函数

Context

Context与Message一起传入给receive

依赖Behaviors.setup等工厂方法传递给派生类,因此每实例对应一个context

状态

给工厂方法传入参数(通常会把包括context在内的所有参数封装成一个类似DTO的Class以适当解耦),返回带新状态的Behavior

在AbstractBehavior实例对象的内部维护所有的可变状态

推荐理由

  • 熟悉FP的方式,毕竟Akka并没有引入范畴论等高深理论

  • 习惯使用不变量保存状态,并使之可以传递给下一个Behavior

  • 能实现Behavior与状态无关

  • 用FP的风格,采取一个Behavior对应一种状态的方式实现有限状态机,相比OO风格更自然

  • 能降低在诸如Future等线程之间共享状态的风险

  • 熟悉OO编码的风格,喜欢方法method甚过函数function

  • 习惯使用变量保存状态

  • 更容易以OO风格升级已有的传统Akka代码

  • 使用可变量的性能,相较使用不可变量更好

推荐做法:

  • 不要把消息定义为顶层Class,而应与Behavior一起定义在Companion Object里,这样在使用时带着对象名作为前缀才不会引起歧义。
  • 如果某Protocol由几个Actor共享,那么建议是在一个单独的Object里定义完整的Protocol。
  • 诸如定时器调度消息或者再包装后的消息,通常作为Actor的私有消息用private修饰,但它们同样要从trait Command派生。
  • 另一种定义私有消息的方法,是所有消息均派生自trait Message,然后插入一个派生自Message的中间trait PrivateMessage,之后再从PrivateMessage派生所有的私有消息,使用这样的层次结构区分公有和私有消息。
  • 通常顶层的Message要定义为sealed,以避免case匹配时编译器提示匹配项不完整的错误。
  • 使用AskPattern从ActorSystem外部与Actor直接进行Request-Response方式的交互时,建议使用AskPattern.ask()而不是?的中缀语法,这样可以最大程度保证类型安全(在Actor之间的属于ActorContext.ask())。
  • Behaviors.setup可以嵌套,用以加载不同类型的资源。习惯上也把setup放在最外层,不过要注意supervise对setup的影响。

从传统Akka过渡

项目依赖的变化

Classic

Typed

akka-actor

akka-actor-typed

akka-cluster

akka-cluster-typed

akka-cluster-sharding

akka-cluster-sharding-typed

akka-cluster-tools

akka-cluster-typed

akka-distributed-data

akka-cluster-typed

akka-persistence

akka-persistence-typed

akka-stream

akka-stream-typed

akka-testkit

akka-actor-testkit-typed

import package的变化

Classic

Typed for Scala

akka.actor

akka.actor.typed.scaladsl

akka.cluster

akka.cluster.typed

akka.cluster.sharding

akka.cluster.sharding.typed.scaladsl

akka.persistence

akka.persistence.typed.scaladsl


com.typesafe.akka:akka-cluster-typed_2.13:2.6.5

Member状态图

消息妥投 Reliable Delivery

import akka.actor.typed.delivery._

️ 此模块目前仍不成熟,不建议在生产环境使用。

确保消息至少投递一次或恰好投递一次,是此模块的核心任务,但Akka框架没法自主实现,因为确认收到消息并且处理之,是属于业务逻辑的职责,所以必须在应用程序的配合下才能完全实现。而且,将消息妥投到目标邮箱还只是其中一个步骤(不丢失消息),确保目标Actor在消息到达前尚未崩溃(消息能被处理)也是其中重要的一环。

一个完整的消息妥投方案,包括发送消息、检测丢包、重发消息、防止过载、幂等处理等细节,这些工作绝大部分要由消费消息的一方来承担。比如消息重发,就要由消费者发现有丢包,然后向生产者提出,限流等其他一些工作亦是如此。Akka提供了以下三种模式(留意关于消息重发的细节):

点对点模式 Point to Point

点对点模式适用于2个单一Actor之间的消息妥投。

  • P:我准备舀了。
  • C:我坐好了。
  • P:舀好了,张嘴!
  • C:我吃完了,再来一口!

  • 运行时将检查并确保Producer与ProducerController都必须是本地Actor,以保证高效率,Consumer一侧亦如此。
  • 由应用程序负责使用ProducerController.RegisterConsumer或ConsumerController.RegisterToProducerController消息,建立并维护两个Controller之间的连接畅通。
  • 在前一条消息被处理完并Confirmed之前,ConsumerController不会把下一条消息Delivery发给Consumer。
  • 在两个Controller之间的消息数量将由一个ConsumerController负责的流控制窗口(flow control window)进行管理。
  • 无论是ProducerController亦或ConsumerController崩溃,所有未被Confirmed的消息都会被重新投递(即使事实上Consumer已经处理过的消息),以确保至少投递一次,否则消息将严格按Producer发出的顺序投递给Consumer。

拉取模式 Worker Pulling

Worker Pulling,是若干个Worker根据自己的消费进度,主动从一个WorkManager处拉取任务的模式。

  • P:我这有一堆活需要找人干。
  • M:没问题,我找人来做。
  • W(C):我来应聘。
  • M:你被录用了!
  • W1:给我点活干。
  • W2:也给我点活干。
  • M:这是今天的活,你们自己分吧!
  • W1:我抢到了3份!
  • W2:我抢到了4份!”

有新Worker加入时

  • 由Receptionist负责登记所有的Worker,由WorkPullingProducerController负责从Receptionist的Listing里指定执行任务的Worker。
  • 在WorkPullingProducerController与Worker之间建立联系后,仍由ProducerController与ConsumerController负责具体的一对一投递。

分片模式 Sharding

com.typesafe.akka:akka-cluster-sharding-typed_2.13:2.6.5

Sharding,是在集群进行了分片后的消息妥投模式,将由Producer与Consumer两端的ShardingController负责总协调,由ShardingController各自的小弟Controller负责点个端点的通信。

  • P:喂喂,SPC,我有一批特定款式的鞋需要找工厂代工。
  • SPC:好的,我在全世界找代工厂。
  • SCC1:作为一家中国的鞋类加工连锁企业,我OK。
  • SCC2:我是加工衬衣的,Sorry。
  • SPC:SCC1就你了,我的小弟PC稍后会直接和你联系。
  • PC:SCC1,订单发给你了。
  • SCC1:PC,我的小弟CC负责这批订单,你们2个实际干活的直接联系吧。
  • CC:OK,我交给流水线C专门生产这款鞋。
  • C:我这条线生产完了,货交给你了CC。
  • CC:PC,我按订单交付地址把货直接发给你了。
  • PC:SPC,货备妥了。
  • SPC:P老板,货备妥了你在哪?
  • P:送过来吧。

发送消息到另一个Entity

从另一个节点上的Producer发送消息(图中WorkPullingProducerController有误,应为ShardingProducerController)

  • 发送与接收方的任一端,均由本体(Producer或Consumer),Controller和ShardingController三个部件构成。其中,ShardingProducerController与ShardingConsumerController搭配,负责为ProducerController与ConsumerController牵线搭桥,但2个ShardingController之间不需要相互注册,而是通过EntityId找到对方。
  • 建立联系通道后,消息从ShardingProducerController发出,经ProducerController发往ShardingConsumerController,由ShardingConsumerController找到相应的ConsumerController,将消发给最终的Consumer。Consumer在处理完消息后,直接回复给ConsumerController,再经其发还给ProducerController,最终由ShardingProducerController回复Producer。
  • 消息RequestNext.entitiesWithDemand属性将指向Consumer端若干同EntityId的Actor,所以这可以是一对多的关系。

耐久的Producer

com.typesafe.akka:akka-persistence-typed_2.13:2.6.5

需要Producer支持消息重发,就意味着Producer得把发出去的消息保存一段时间,直到确信该消息已被处理后才删除之,所以能暂存消息的即为耐用的Producer。Akka为此提供了一个DurableProducerQueue的具体实现EventSourcedProducerQueue。其中,每个Producer必须对应一个唯一的PersistenceId。

import akka.persistence.typed.delivery.EventSourcedProducerQueue
import akka.persistence.typed.PersistenceId

val durableQueue =
  EventSourcedProducerQueue[ImageConverter.ConversionJob](PersistenceId.ofUniqueId("ImageWorkManager"))
val durableProducerController = context.spawn(
  WorkPullingProducerController(
    producerId = "workManager",
    workerServiceKey = ImageConverter.serviceKey,
    durableQueueBehavior = Some(durableQueue)),
  "producerController")

改用Ask模式

除了tell模式,Producer还可以改用ask模式发出消息,此时用askNext代替requestNext,回复将被包装在MessageWithConfirmation里。

context.ask[MessageWithConfirmation[ImageConverter.ConversionJob], Done](
        next.askNextTo,
        askReplyTo => MessageWithConfirmation(ImageConverter.ConversionJob(resultId, from, to, image), askReplyTo)) {
          case Success(done) => AskReply(resultId, originalReplyTo, timeout = false)
          case Failure(_)    => AskReply(resultId, originalReplyTo, timeout = true)
      }

序列化 Serialization

对同处一个JVM上的不同Actor,消息将直接发送给对方,而对于跨JVM的消息,则需要序列化成一串二进制字节后传出,再反序列化恢复成消息对象后接收。Akka推荐使用Jackson和Google Protocol Buffers,且使用后者用于其内部消息的序列化,但也允许使用自定义的序列化器。

使用序列化器

以配置方式使用序列化器

序列化的相关配置都保存在akka.actor.serializers一节,其中指向各种akka.serialization.Serializer的实现,并使用serialization-bindings为特定对象实例时绑定序列化器。由于对象可能同时继承了某个trait或者class,所以在判断应使用哪一个序列化器时,通常是找其最特化的那一个。若二者之间没有继承关系,则会触发警告。

akka {
  actor {
    serializers {
      jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
      jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
      myown = "docs.serialization.MyOwnSerializer"
    }

    serialization-bindings {
      "docs.serialization.JsonSerializable" = jackson-json
      "docs.serialization.CborSerializable" = jackson-cbor
      "com.google.protobuf.Message" = proto
      "docs.serialization.MyOwnSerializable" = myown
    }
  }
}

️ 如果待序列化的消息包含在Scala对象中,则为了引用这些消息,需要使用标准Java类名称。对于包含在名为Wrapper对象中名为Message的消息,正确的引用是Wrapper $ Message,而不是Wrapper.Message

以编程方式使用序列化器

完整的序列化信息包括三个部分:二进制字节串形式的有效载荷payload,序列化器的SerializerId及其适用类的清单manifest,所以它是自描述的,得以跨JVM使用。

而在启动ActorSystem时,序列化器由SerializationExtension负责初始化,因此序列化器本身不能从其构造函数访问SerializationExtension,而只能在完成初始化之后迟一点才能访问它。

import akka.actor._
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.Cluster
import akka.serialization._

val system = ActorSystem("example")

// Get the Serialization Extension
val serialization = SerializationExtension(system)

// Have something to serialize
val original = "woohoo"

// Turn it into bytes, and retrieve the serializerId and manifest, which are needed for deserialization
val bytes = serialization.serialize(original).get
val serializerId = serialization.findSerializerFor(original).identifier
val manifest = Serializers.manifestFor(serialization.findSerializerFor(original), original)

// Turn it back into an object
val back = serialization.deserialize(bytes, serializerId, manifest).get

自定义序列化器

创建序列化器

所有的序列化器均派生自akka.serialization.Serializer。

class MyOwnSerializer extends Serializer {
  // If you need logging here, introduce a constructor that takes an ExtendedActorSystem.
  // class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer
  // Get a logger using:
  // private val logger = Logging(actorSystem, this)

  // This is whether "fromBinary" requires a "clazz" or not
  def includeManifest: Boolean = true

  // Pick a unique identifier for your Serializer,
  // you've got a couple of billions to choose from,
  // 0 - 40 is reserved by Akka itself
  def identifier = 1234567

  // "toBinary" serializes the given object to an Array of Bytes
  def toBinary(obj: AnyRef): Array[Byte] = {
    // Put the code that serializes the object here
    //#...
    Array[Byte]()
    //#...
  }

  // "fromBinary" deserializes the given array,
  // using the type hint (if any, see "includeManifest" above)
  def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
    // Put your code that deserializes here
    //#...
    null
    //#...
  }
}

SerializerId必须是全局唯一的,该Id可以编码指定,也可以在配置中指定:

akka {
  actor {
    serialization-identifiers {
      "docs.serialization.MyOwnSerializer" = 1234567
    }
  }
}
使用StringManifest指定适用类

默认情况下,序列化器使用Class指定其适用目标,但也可以使用字符串名称指定,具体参见fromBinary的第2个参数:

class MyOwnSerializer2 extends SerializerWithStringManifest {
  val CustomerManifest = "customer"
  val UserManifest = "user"
  val UTF_8 = StandardCharsets.UTF_8.name()

  // Pick a unique identifier for your Serializer,
  // you've got a couple of billions to choose from,
  // 0 - 40 is reserved by Akka itself
  def identifier = 1234567

  // The manifest (type hint) that will be provided in the fromBinary method
  // Use `""` if manifest is not needed.
  def manifest(obj: AnyRef): String =
    obj match {
      case _: Customer => CustomerManifest
      case _: User     => UserManifest
    }

  // "toBinary" serializes the given object to an Array of Bytes
  def toBinary(obj: AnyRef): Array[Byte] = {
    // Put the real code that serializes the object here
    obj match {
      case Customer(name) => name.getBytes(UTF_8)
      case User(name)     => name.getBytes(UTF_8)
    }
  }

  // "fromBinary" deserializes the given array,
  // using the type hint
  def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    // Put the real code that deserializes here
    manifest match {
      case CustomerManifest =>
        Customer(new String(bytes, UTF_8))
      case UserManifest =>
        User(new String(bytes, UTF_8))
    }
  }
}
序列化ActorRef

ActorRef均可以使用Jackson进行序列化,但也可以自定义实现。

其中,要以字符串形式表示ActorRef,应借助ActorRefResolver实现。它主要有2个方法,分别对应序列化和反序列化:

  • def toSerializationFormat[T](ref: ActorRef[T]): String

  • def resolveActorRef[T](serializedActorRef: String): ActorRef[T]

    class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
    private val actorRefResolver = ActorRefResolver(system.toTyped)

    private val PingManifest = "a"
    private val PongManifest = "b"

    override def identifier = 41

    override def manifest(msg: AnyRef) = msg match {
    case : PingService.Ping => PingManifest case PingService.Pong => PongManifest case =>
    throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
    }

    override def toBinary(msg: AnyRef) = msg match {
    case PingService.Ping(who) =>
    actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
    case PingService.Pong =>
    Array.emptyByteArray
    case _ =>
    throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
    }

    override def fromBinary(bytes: Array[Byte], manifest: String) = {
    manifest match {
    case PingManifest =>
    val str = new String(bytes, StandardCharsets.UTF_8)
    val ref = actorRefResolver.resolveActorRefPingService.Pong.type
    PingService.Ping(ref)
    case PongManifest =>
    PingService.Pong
    case _ =>
    throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
    }
    }
    }

滚动升级 Rolling Updates

一个消息被反序列为消息对象,其决定因素只有3个:payload、serializerId和manifest。Akka根据Id选择Serializer,然后Serializer根据manifest匹配fromBinary,最后fromBinary使用payload解析出消息对象。在这个过程中,起关键作用的manifest并不等价于Serializer绑定的消息类型,所以一个Serializer可以应用于多个消息类型,这就给换用新的序列化器提供了机会。主要步骤包括两步:

  • 第一步:暂时只向akka.actor.serializers配置节中添加Serializer的定义,而不添加到akka.actor.serialization-bindings配置节中,然后执行一次滚动升级。这相当于注册Serializer,为切换到新的Serializer作准备。
  • 第二步:向akka.actor.serialization-bindings配置节中添加新的Serializer,然后再执行一次滚动升级。此时,旧的节点将继续使用旧的Serializer序列化消息,而新节点将切换使用新的Serializer进行序列化,并且它也可以反序列化旧的序列化格式。
  • 第三步(可选):完全删除旧的Serializer,因为新的Serializer已经能同时承担新旧两种版本的序列化格式。

校验

为了在本地测试时确认消息被正常地序列化与反序列化,可以采取如下配置启用本地消息的序列化。如果要将某个消息排除出此列,则需要继承trait akka.actor.NoSerializationVerificationNeeded,或者在配置akka.actor.no-serialization-verification-needed-class-prefix指定类名的前缀。

akka {
  actor {
    # 启用本地消息序列化
    serialize-messages = on

    # 启用Prop序列化
    serialize-creators = on
  }
}

使用Jackson进行序列化

com.typesafe.akka:akka-serialization-jackson_2.12:2.6.6

Jackson支持文本形式的JSON(jackson-json)和二进制形式的CBOR字节串(jackson-cbor)。

使用前准备

在使用Jackson进行序列化前,需要在Akka配置里加入序列化器声明和绑定声明,此处用的JSON格式。

akka.actor {
  serialization-bindings {
    "com.myservice.MySerializable" = jackson-json
  }
}

而所有要用Jackson序列化的消息也得扩展其trait以作标识。

// 约定的名称是CborSerializable或者JsonSerializable,此处用MySerializable是为了演示
trait MySerializable

final case class Message(name: String, nr: Int) extends MySerializable

安全要求

出于安全考虑,不能将Jackson序列化器应用到诸如java.lang.Object、java.io.Serializable、java.util.Comparable等开放类型。

注解 Annotations

适用于普通的多态类型 Polymorphic types

多态类型是指可能有多种不同实现的类型,这就导致在反序列化时将面对多种可能的子类型。所以在使用Jackson序列化前,需要用JsonTypeInfo和JsonSubTypes进行注解说明。

  • @JsonTypeInfo用来开启多态类型处理,它有以下几个属性:

    • use:定义使用哪一种类型识别码,其可选值包括:

      • JsonTypeInfo.Id.CLASS:使用完全限定类名做识别
      • JsonTypeInfo.Id.MINIMAL_CLASS:若基类和子类在同一包类,使用类名(忽略包名)作为识别码
      • JsonTypeInfo.Id.NAME:一个合乎逻辑的指定名称
      • JsonTypeInfo.Id.CUSTOM:自定义识别码,与@JsonTypeIdResolver相对应
      • JsonTypeInfo.Id.NONE:不使用识别码
    • include(可选):指定识别码是如何被包含进去的,其可选值包括:

      • JsonTypeInfo.As.PROPERTY:作为数据的兄弟属性
      • JsonTypeInfo.As.EXISTING_PROPERTY:作为POJO中已经存在的属性
      • JsonTypeInfo.As.EXTERNAL_PROPERTY:作为扩展属性
      • JsonTypeInfo.As.WRAPPER_OBJECT:作为一个包装的对象
      • JsonTypeInfo.As.WRAPPER_ARRAY:作为一个包装的数组
    • property(可选):制定识别码的属性名称。此属性只有当use为JsonTypeInfo.Id.CLASS(若不指定property则默认为@class)、JsonTypeInfo.Id.MINIMAL_CLASS(若不指定property则默认为@c)、JsonTypeInfo.Id.NAME(若不指定property默认为@type),include为JsonTypeInfo.As.PROPERTY、JsonTypeInfo.As.EXISTING_PROPERTY、JsonTypeInfo.As.EXTERNAL_PROPERTY时才有效。

    • defaultImpl(可选):如果类型识别码不存在或者无效,可以使用该属性来制定反序列化时使用的默认类型。

    • visible(可选):是否可见。该属性定义了类型标识符的值是否会通过JSON流成为反序列化器的一部分,默认为false,即jackson会从JSON内容中处理和删除类型标识符,再传递给JsonDeserializer。

  • @JsonSubTypes用来列出给定类的子类,只有当子类类型无法被检测到时才会使用它,一般是配合@JsonTypeInfo在基类上使用。它的的值是一个@JsonSubTypes.Type[]数组,里面枚举了多态类型(value对应子类)和类型的标识符值(name对应@JsonTypeInfo中的property标识名称的值。此为可选值,若未指定则需由@JsonTypeName在子类上指定)。

  • @JsonTypeName作用于子类,用来为多态子类指定类型标识符的值。

️ 切记不能使用@JsonTypeInfo(use = Id.CLASS)ObjectMapper.enableDefaultTyping,这会给多态类型带来安全隐患。

final case class Zoo(primaryAttraction: Animal) extends MySerializable

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
  Array(
    new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
    new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
sealed trait Animal

final case class Lion(name: String) extends Animal

final case class Elephant(name: String, age: Int) extends Animal
适用于trait和case object创建的ADT

由于上述注解只能用于class,所以case class可以直接使用,但case object就需要采取变通的方法,通过在case object继承的trait上使用注解@JsonSerialize和@JsonDeserialize,再使用StdSerializer和StdDeserializer实现序列化操作即可。

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.fasterxml.jackson.databind.ser.std.StdSerializer

@JsonSerialize(using = classOf[DirectionJsonSerializer])
@JsonDeserialize(using = classOf[DirectionJsonDeserializer])
sealed trait Direction

object Direction {
  case object North extends Direction
  case object East extends Direction
  case object South extends Direction
  case object West extends Direction
}

class DirectionJsonSerializer extends StdSerializer[Direction](classOf[Direction]) {
  import Direction._

  override def serialize(value: Direction, gen: JsonGenerator, provider: SerializerProvider): Unit = {
    val strValue = value match {
      case North => "N"
      case East  => "E"
      case South => "S"
      case West  => "W"
    }
    gen.writeString(strValue)
  }
}

class DirectionJsonDeserializer extends StdDeserializer[Direction](classOf[Direction]) {
  import Direction._

  override def deserialize(p: JsonParser, ctxt: DeserializationContext): Direction = {
    p.getText match {
      case "N" => North
      case "E" => East
      case "S" => South
      case "W" => West
    }
  }
}

final case class Compass(currentDirection: Direction) extends MySerializable
适用于枚举 Enumerations

Jackson默认会将Scala的枚举类型中的Value序列化为一个JsonObject,该JsonObject包含一个“value”字段和一个“type”字段(其值是枚举的完全限定类名FQCN)。为此,Jackson为每个字段提供了一个注解JsonScalaEnumeration,用于设定字段的类型,它将会把枚举值序列化为JsonString。

trait TestMessage

object Planet extends Enumeration {
  type Planet = Value
  val Mercury, Venus, Earth, Mars, Krypton = Value
}

// Uses default Jackson serialization format for Scala Enumerations
final case class Alien(name: String, planet: Planet.Planet) extends TestMessage

// Serializes planet values as a JsonString
class PlanetType extends TypeReference[Planet.type] {}

// Specifies the type of planet with @JsonScalaEnumeration
final case class Superhero(name: String, @JsonScalaEnumeration(classOf[PlanetType]) planet: Planet.Planet) extends TestMessage

纲要演进 Schema Evolution

参见Event Sourced一节中的Schema Evolution

删除字段

Jackson会自动忽略class中不存在的属性,所以不需要做额外工作。

添加字段

如果新增的字段是可选字段,那么该字段默认值是Option.None,不需要做额外工作。如果是必备字段,那么需要继承JacksonMigration并设定其默认值。示例如下:

// Old Event
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable

// New Event: optional property discount and field note added.
// 为什么要区分property与field?
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: String)
    extends MySerializable {
  // alternative constructor because `note` should have default value "" when not defined in json
  @JsonCreator
  def this(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: Option[String]) =
    this(shoppingCartId, productId, quantity, discount, note.getOrElse(""))
}

// New Event: mandatory field discount added.
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Double) extends MySerializable

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.DoubleNode
import com.fasterxml.jackson.databind.node.ObjectNode
import akka.serialization.jackson.JacksonMigration

class ItemAddedMigration extends JacksonMigration {
  // 注明这是第几个版本,之后还可以有更新的版本
  override def currentVersion: Int = 2

  override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
    val root = json.asInstanceOf[ObjectNode]
    if (fromVersion <= 1) {
      root.set("discount", DoubleNode.valueOf(0.0))
    }
    root
  }
}

ItemAddedMigration与ItemAdded的联系,需要在配置里设定,下同:

akka.serialization.jackson.migrations {
  "com.myservice.event.ItemAdded" = "com.myservice.event.ItemAddedMigration"
}
重命名字段
// 将productId重命名为itemId
case class ItemAdded(shoppingCartId: String, itemId: String, quantity: Int) extends MySerializable

import akka.serialization.jackson.JacksonMigration
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode

class ItemAddedMigration extends JacksonMigration {
  override def currentVersion: Int = 2

  override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
    val root = json.asInstanceOf[ObjectNode]
    if (fromVersion <= 1) {
      root.set("itemId", root.get("productId"))
      root.remove("productId")
    }
    root
  }
}
重定义类结构
// Old class
case class Customer(name: String, street: String, city: String, zipCode: String, country: String) extends MySerializable

// New class
case class Customer(name: String, shippingAddress: Address, billingAddress: Option[Address]) extends MySerializable

//Address class
case class Address(street: String, city: String, zipCode: String, country: String) extends MySerializable

import akka.serialization.jackson.JacksonMigration
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode

class CustomerMigration extends JacksonMigration {
  override def currentVersion: Int = 2

  override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
    val root = json.asInstanceOf[ObjectNode]
    if (fromVersion <= 1) {
      val shippingAddress = root.`with`("shippingAddress")
      shippingAddress.set("street", root.get("street"))
      shippingAddress.set("city", root.get("city"))
      shippingAddress.set("zipCode", root.get("zipCode"))
      shippingAddress.set("country", root.get("country"))
      root.remove("street")
      root.remove("city")
      root.remove("zipCode")
      root.remove("country")
    }
    root
  }
}
重命名类
// Old class
case class OrderAdded(shoppingCartId: String) extends MySerializable

// New class
case class OrderPlaced(shoppingCartId: String) extends MySerializable

class OrderPlacedMigration extends JacksonMigration {
  override def currentVersion: Int = 2

  override def transformClassName(fromVersion: Int, className: String): String = classOf[OrderPlaced].getName

  override def transform(fromVersion: Int, json: JsonNode): JsonNode = json
}
删除特定的序列化绑定

当某个类不再需要序列化,而只需要反序列化时,应将其加入序列化的白名单,名单是一组类名或其前缀:

akka.serialization.jackson.whitelist-class-prefix =
  ["com.myservice.event.OrderAdded", "com.myservice.command"]

Jackson模块

Akka默认启用了以下Jackson模块:

akka.serialization.jackson {
  # The Jackson JSON serializer will register these modules.
  jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
  # AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
  jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
  // FIXME how does that optional loading work??
  # AkkaStreamsModule optionally included if akka-streams is in classpath
  jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule"
  jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
  jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
  jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
  jackson-modules += "com.fasterxml.jackson.module.scala.DefaultScalaModule"
}
JSON压缩

默认的JSON压缩策略如下:

# Compression settings for the jackson-json binding
akka.serialization.jackson.jackson-json.compression {
  # Compression algorithm.
  # - off  : no compression (it will decompress payloads even it's off)
  # - gzip : using common java gzip (it's slower than lz4 generally)
  # - lz4 : using lz4-java
  algorithm = gzip

  # If compression is enabled with the `algorithm` setting the payload is compressed
  # when it's larger than this value.
  compress-larger-than = 32 KiB
}
每个绑定关系单独配置
# 共有配置
akka.serialization.jackson.jackson-json {
  serialization-features {
    WRITE_DATES_AS_TIMESTAMPS = off
  }
}
akka.serialization.jackson.jackson-cbor {
  serialization-features {
    WRITE_DATES_AS_TIMESTAMPS = on
  }
}

akka.actor {
  serializers {
    jackson-json-message = "akka.serialization.jackson.JacksonJsonSerializer"
    jackson-json-event   = "akka.serialization.jackson.JacksonJsonSerializer"
  }
  serialization-identifiers {
    jackson-json-message = 9001
    jackson-json-event = 9002
  }
  serialization-bindings {
    "com.myservice.MyMessage" = jackson-json-message
    "com.myservice.MyEvent" = jackson-json-event
  }
}

# 为每个绑定关系单独配置
akka.serialization.jackson {
  jackson-json-message {
    serialization-features {
      WRITE_DATES_AS_TIMESTAMPS = on
    }
  }
  jackson-json-event {
    serialization-features {
      WRITE_DATES_AS_TIMESTAMPS = off
    }
  }
}
使用Manifest无关的序列化

默认情况下,Jackson使用manifest里的完全限定类名进行序列化,但这比较耗费磁盘空间和IO资源,为此可以用type-in-manifest关闭之,使类名不再出现在manifest里,然后再使用deserialization-type指定即可,否则Jackson会在绑定关系里去查找匹配的类型。

Akka Remoting已经实现了manifest的压缩,所以这部分内容对它没有什么实际效果。

akka.actor {
  serializers {
    jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
  }
  serialization-identifiers {
    jackson-json-event = 9001
  }
  serialization-bindings {
    "com.myservice.MyEvent" = jackson-json-event
  }
}
# 由于manifest无关的序列化通常只适用于一个类型,所以通常采取每绑定关系单独配置的方式
akka.serialization.jackson {
  jackson-json-event {
    type-in-manifest = off
    # Since there is exactly one serialization binding declared for this
    # serializer above, this is optional, but if there were none or many,
    # this would be mandatory.
    deserialization-type = "com.myservice.MyEvent"
  }
}
日期与时间格式

WRITE_DATES_AS_TIMESTAMPSWRITE_DURATIONS_AS_TIMESTAMPS默认情况下是被禁用的,这意味着日期与时间字段将按ISO-8601(rfc3339)标准的yyyy-MM-dd'T'HH:mm:ss.SSSZZ格式,而不是数字数组进行序列化。虽然这样的互操作性更好,但速度较慢。所以如果不需要ISO格式即可与外部系统进行互操作,那么可以作如下配置,以拥有更佳的性能(反序列化不受此设置影响)。

akka.serialization.jackson.serialization-features {
  WRITE_DATES_AS_TIMESTAMPS = on
  WRITE_DURATIONS_AS_TIMESTAMPS = on
}

其他可用配置

akka.serialization.jackson {
  # Configuration of the ObjectMapper serialization features.
  # See com.fasterxml.jackson.databind.SerializationFeature
  # Enum values corresponding to the SerializationFeature and their boolean value.
  serialization-features {
    # Date/time in ISO-8601 (rfc3339) yyyy-MM-dd'T'HH:mm:ss.SSSZ format
    # as defined by com.fasterxml.jackson.databind.util.StdDateFormat
    # For interoperability it's better to use the ISO format, i.e. WRITE_DATES_AS_TIMESTAMPS=off,
    # but WRITE_DATES_AS_TIMESTAMPS=on has better performance.
    WRITE_DATES_AS_TIMESTAMPS = off
    WRITE_DURATIONS_AS_TIMESTAMPS = off
  }

  # Configuration of the ObjectMapper deserialization features.
  # See com.fasterxml.jackson.databind.DeserializationFeature
  # Enum values corresponding to the DeserializationFeature and their boolean value.
  deserialization-features {
    FAIL_ON_UNKNOWN_PROPERTIES = off
  }

  # Configuration of the ObjectMapper mapper features.
  # See com.fasterxml.jackson.databind.MapperFeature
  # Enum values corresponding to the MapperFeature and their
  # boolean values, for example:
  #
  # mapper-features {
  #   SORT_PROPERTIES_ALPHABETICALLY = on
  # }
  mapper-features {}

  # Configuration of the ObjectMapper JsonParser features.
  # See com.fasterxml.jackson.core.JsonParser.Feature
  # Enum values corresponding to the JsonParser.Feature and their
  # boolean value, for example:
  #
  # json-parser-features {
  #   ALLOW_SINGLE_QUOTES = on
  # }
  json-parser-features {}

  # Configuration of the ObjectMapper JsonParser features.
  # See com.fasterxml.jackson.core.JsonGenerator.Feature
  # Enum values corresponding to the JsonGenerator.Feature and
  # their boolean value, for example:
  #
  # json-generator-features {
  #   WRITE_NUMBERS_AS_STRINGS = on
  # }
  json-generator-features {}

  # Configuration of the JsonFactory StreamReadFeature.
  # See com.fasterxml.jackson.core.StreamReadFeature
  # Enum values corresponding to the StreamReadFeatures and
  # their boolean value, for example:
  #
  # stream-read-features {
  #   STRICT_DUPLICATE_DETECTION = on
  # }
  stream-read-features {}

  # Configuration of the JsonFactory StreamWriteFeature.
  # See com.fasterxml.jackson.core.StreamWriteFeature
  # Enum values corresponding to the StreamWriteFeatures and
  # their boolean value, for example:
  #
  # stream-write-features {
  #   WRITE_BIGDECIMAL_AS_PLAIN = on
  # }
  stream-write-features {}

  # Configuration of the JsonFactory JsonReadFeature.
  # See com.fasterxml.jackson.core.json.JsonReadFeature
  # Enum values corresponding to the JsonReadFeatures and
  # their boolean value, for example:
  #
  # json-read-features {
  #   ALLOW_SINGLE_QUOTES = on
  # }
  json-read-features {}

  # Configuration of the JsonFactory JsonWriteFeature.
  # See com.fasterxml.jackson.core.json.JsonWriteFeature
  # Enum values corresponding to the JsonWriteFeatures and
  # their boolean value, for example:
  #
  # json-write-features {
  #   WRITE_NUMBERS_AS_STRINGS = on
  # }
  json-write-features {}

  # Additional classes that are allowed even if they are not defined in `serialization-bindings`.
  # This is useful when a class is not used for serialization any more and therefore removed
  # from `serialization-bindings`, but should still be possible to deserialize.
  whitelist-class-prefix = []

  # settings for compression of the payload
  compression {
    # Compression algorithm.
    # - off  : no compression
    # - gzip : using common java gzip
    algorithm = off

    # If compression is enabled with the `algorithm` setting the payload is compressed
    # when it's larger than this value.
    compress-larger-than = 0 KiB
  }

  # Whether the type should be written to the manifest.
  # If this is off, then either deserialization-type must be defined, or there must be exactly
  # one serialization binding declared for this serializer, and the type in that binding will be
  # used as the deserialization type. This feature will only work if that type either is a
  # concrete class, or if it is a supertype that uses Jackson polymorphism (ie, the
  # @JsonTypeInfo annotation) to store type information in the JSON itself. The intention behind
  # disabling this is to remove extraneous type information (ie, fully qualified class names) when
  # serialized objects are persisted in Akka persistence or replicated using Akka distributed
  # data. Note that Akka remoting already has manifest compression optimizations that address this,
  # so for types that just get sent over remoting, this offers no optimization.
  type-in-manifest = on

  # The type to use for deserialization.
  # This is only used if type-in-manifest is disabled. If set, this type will be used to
  # deserialize all messages. This is useful if the binding configuration you want to use when
  # disabling type in manifest cannot be expressed as a single type. Examples of when you might
  # use this include when changing serializers, so you don't want this serializer used for
  # serialization and you haven't declared any bindings for it, but you still want to be able to
  # deserialize messages that were serialized with this serializer, as well as situations where
  # you only want some sub types of a given Jackson polymorphic type to be serialized using this
  # serializer.
  deserialization-type = ""

  # Specific settings for jackson-json binding can be defined in this section to
  # override the settings in 'akka.serialization.jackson'
  jackson-json {}

  # Specific settings for jackson-cbor binding can be defined in this section to
  # override the settings in 'akka.serialization.jackson'
  jackson-cbor {}

  # Issue #28918 for compatibility with data serialized with JacksonCborSerializer in
  # Akka 2.6.4 or earlier, which was plain JSON format.
  jackson-cbor-264 = ${akka.serialization.jackson.jackson-cbor}
}

Event Sourcing

com.typesafe.akka:akka-persistence-typed_2.13:2.6.5

Akka Persistence为带状态的Actor提供了持久化其状态以备崩溃后恢复的支持,其本质是持久化Actor相关的事件Event,从而在恢复时利用全部事件或阶段性快照重塑(Reconstruct/Replay/Rebuild)Actor。ES在现实生活中最典型的一个例子是会计使用的复式记账法

参考书目

  • MSDN上的 CQRS Journey

    该书以一个用C#编写的Conference预约售票系统为例,由浅入深地展示了实现CQRS的各个环节需要关注的重点。书中的配图和讨论非常精彩,而其中提到的Process Manager也是当下实现Saga的流行方式之一。

  • Randy Shoup所著 Events as First-Class Citizens

    文中的Stitch Fix是一家智能零售商,它通过整合零售、技术、仓储、数据分析等资源,使用数据分析软件和机器学习来匹配顾客的服装定制需求,为其挑选符合其个人风格、尺寸和偏好的服饰和配饰,提供了良好的消费体验。

    顾客按需订购服装或申请每月、每两个月或每季度交货。每个盒子有五件货物。如果顾客喜欢配送货物,可以选择以标签价购买,全部购买享受75%的折扣;如果不喜欢,则免费退货。如果顾客没有购买任何货物,则需支付20美元的设计费。Stitch Fix的平均商品单价约65美元,公司期望在每个盒子中,用户能够保存2件商品。造型师是兼职,薪水为每小时15美元。每小时,造型师会完成4个盒子,这样能产生较高的毛利率,以覆盖巨大的开销及库存成本。

️ 通用数据保护条例(General Data Protection Regulation,GDPR)要求,必须能根据用户的要求删除其个人信息。然而,在一个以Event Sourcing为基础的应用里,要彻底删除或修改带有个人信息的所有事件是非常困难的,所以改用“数据粉碎”的技术来实现。其原理是给每个人分配一个唯一的ID,然后以该ID作为密钥,对其相关的所有个人数据进行加密。当需要彻底删除该用户的信息时,直接删除该ID,即可保证其个人数据无法被解密,从而达到保护目的。Lightbend为Akka Persistence提供了相应的工具,以帮助构建具有GDPR功能的系统。

Akka Persistence提供了event sourced actor(又称为 persistent actor)作为实现。这类Actor在收到Command时会先进行检验Validate。如果Command各项条件通过了检验,则使之作用于当前实体,并产生相应的事件Event,待这些Event被持久化后,以更新实体的状态结束;否则,实体将直接拒绝Reject该Command。( 不该是先更新状态,然后才持久化事件吗?貌似先持久化再更新会更靠谱。)

而在重塑Actor时,所有的事件将被加载,并无需再校验地直接用于更新Actor的状态,直到恢复到最新状态。

一个典型的EventSourcedBehavior包括ID、初始State,CommandHandler与EventHandler四个组成部分,如果需要传入ActorContext,则在外层用Behaviors.setup传入即可:

import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId

object MyPersistentBehavior {
  sealed trait Command
  sealed trait Event
  final case class State()

  def apply(): Behavior[Command] =
    EventSourcedBehavior[Command, Event, State](
      // 1. 该Actor的唯一Id
      persistenceId = PersistenceId.ofUniqueId("abc"),
      // 2. 初始状态
      emptyState = State(),
      // 3. Command Handler
      commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
      // 4. Event Handler
      eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
}

PersistenceId

PersistenceId是Event Sourced Actor在其生命周期内唯一的身份标识(想想聚合Id)。因为Akka Cluster提供的EntityId可能为多个不同类型的Actor共享,所以一般配合EntityTypeKey一起组成唯一的PersistenceId。所以,PersistenceId.apply()用默认的分隔符|将entityType.name与entityId两个字符串连接成所需的Id。当然,也可以使用PersistenceId.ofUniqueId生成自定义分隔符的Id。

即使在集群条件下,持同一PersistanceId的Actor在任何时候只能存在一个,否则就世界大乱了。当然,因为有Recovery,这个Actor可以被分片甚至迁移到任何一个片及其节点上。

摘选自 https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#persistence-example

sharding.init(Entity(typeKey = HelloWorld.TypeKey) { entityContext =>
    HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
  })

Command Handler

一个CommandHandler有2个参数:当前的State、收到的Command,然后返回Effect。Effect由其工厂创建,创建动作包括:

  • persist:原子性地保存处理完Command后产生的若干Event,若保存其中一个Event时失败则所有Event都将失败。但是在底层的事件存储不支持一次写入多个事件的情况下,CommandHandler为拒绝一次性持久化多个事件,可以抛出EventRejectedException(通常带有UnsupportedOperationException),从而由父Actor进行监管处理。
  • none:什么也不做,比如一个只包括读操作的Query Command。
  • unhandled:表明该命令不适用于当前状态。
  • stop:停止该Actor。
  • stash:暂存当前命令。
  • unstashAll:处理所有被Effect.stash暂存起来的命令。
  • reply:向发来命令的Actor发送一条回复。

在返回Effect的同时,还可以在该Effect后接副作用SideEffect,比如Effect.persist(…).thenRun(…)。具体包括:

  • thenRun:运行某个副作用函数。
  • thenStop:停止该Actor。
  • thenUnstashAll:处理所有被Effect.stash暂存起来的命令。
  • thenReply:向发来命令的Actor发送一条回复。

任何SideEffect都最多只能执行一次。如果持久化失败,或者Actor直接重启、停止后再启动,都不会执行任何副作用。所以通常是响应RecoveryCompleted信号,在其中去执行需要被确认的副作用,这种情况下,则可能会出现同一个副作用多次执行的情况。

副作用都是按注册的顺序同步执行,但也不能避免因为发送消息等而导致操作的并发执行。副作用也可能在事件被持久化之前就被执行,这样的话,即使持久化失败导致事件未被保存,副作用也生效了。

关于翻译:Akka用“日记”——Journal指代的Event Store,并与“日志”Log相区别。虽然我更喜欢用“事件簿”这样的称谓,但一来请教了师姐说“日记”更准确,二来电影《Joker》里做心理咨询的社工在问Frank时也用的Journal这个词,于是就此作罢。

Event Handler

一个EventHandler有2个参数:当前State,触发的Event,然后返回新的State。

CommandHandler触发并持久化事件,EventHandler处理事件并更新状态,所以Actor的状态实际是在EventHandler里才真正被改变的!

当事件Event被持久化后,EventHandler将使用它去修改作为参数传入的当前状态State,从而产生新的State。至于State的具体实现,可以是FP风格的不可变量,也可以是OO风格的可变量,但通常都会封装在诸如Class这样的一个容器里。

不同于Command Handler的是,Event Handler不会产生副作用,所以它将直接用于Actor的重塑Recovery操作上。如果需要在Recovery之后做点什么,那么恰当的楔入点包括:CommandHandler最后创建的Effect附加的thenRun(),或者是RecoveryCompleted事件的处理函数里。

改变Actor的行为

因为不同的消息将触发Actor不同的行为,所以行为也是Actor状态的一部分。所以在Recovery时除了恢复数据,还要小心恢复其相应的行为。尽管行为是函数,而函数是一等公民,所以行为理应可以象数据一样保存,但困难的地方在于怎么保存编码,因此Akka Persistence不提供Behavior的持久化。

面对这个棘手的问题,最容易想到的办法是根据State定义不同的CommandHandler,并随State变化而切换,从而使Actor成为一台有限状态机。于是,由此得到的便是由State与Command两级匹配构成的逻辑,利用继承定义State的不同实现,然后先case State、再case Command,最后根据匹配结果将消息分发至相应的处理函数(处理函数亦相对独立,以凸显不同的逻辑分支)。而在代码实现的结构上,就是在一个CommandHandler里,定义若干个协助完成消息处理的private function。这些处理函数的参数由Handler在case分支里赋与,返回类型则统一为与CommandHandler相同的Effect[Event, State]。最后,只需要将这个CommandHandler连壳带肉交给EventSourcedBehavior工厂即可。

更规范的方式是把Handler定义在State里,具体参见后续的Handler设计指南

强制回复

Request-Response是最常见的通信模式之一。为了保证Persistent Actor一定会回复,EventSourcedBehavior推出了ReplyEffect,从而保证CommandHandler一定会发回Reply。它与Effect的唯一区别是必须用工厂Effect.replyEffect.noReplyEffect.thenReply或者Effect.thenNoReply之一创建的结果作为返回值,而不再是Effect,否则编译器会提示类型不匹配的错误。

为此,在定义Command时必须包含一个replyTo属性,同时得用EventSourcedBehavior.withEnforcedReplies(id, state, cmdHandler, evtHandler)来创建Behavior。

序列化

常见的序列化方案和工具也适用于Akka,推荐使用 Jackson

在序列化时,必须考虑不同版本事件之间的向下兼容性,参考纲要演进 Schema Evolution( 统一个中文名真难。architecture 架构,pattern 模式,structure 结构,style 风格/样式,template 模板,boilerplate 样板,schema 纲要)

重塑

相比Recovery,我更喜欢Replay或者Reconstruct,使用“重塑实体”和“事件重播”在语义上也更生动。

Akka Persistence在Actor启动或重启时,将自动地直接使用EventHandler进行Actor的重塑。要注意的是,不要在EventHandler中执行副作用,而应该在重塑完成后,在receiveSignal里响应RecoveryCompleted信号,在响应程序里执行副作用。在RecoveryCompleted信号里带有重塑后的当前状态。而即使对于一个新的、还没有任何已记录事件的Actor,在执行Recovery之后也会触发RecoveryCompleted信号。

由于在重塑完成前,所有新消息将会被Stash,所以为防止失去响应,Akka提供了最大并发的重塑数,可以按akka.persistence.max-concurrent-recoveries = 50的方式进行配置。

重塑过滤 Replay Filter

在某些情况下,事件流可能会损坏,而此时多个写入者(即多个Persistent Actor实例)准备写入具有相同序列号的不同消息,则会引发不一致的冲突。为此,Akka Persistence提供了Replay Filter,通过消息序列号和写入者的UUID来检测并解决消息之间的冲突。具体配置需要写入配置文件中的如下区段(leveldb视具体插件而不同):

理解不能:为什么会有多个Actor实例要写入有相同序列号的消息?PersistenceId不该是唯一的吗?消息序列号是什么鬼?