近两年,一直在折腾用FP与OO共存的编程语言Scala,采取以函数式编程为主的方式,结合TDD和BDD的手段,采用Domain Driven Design的方法学,去构造DDDD应用(Domain Driven Design & Distributed)。期间,尝试了大量的框架:业务领域主要适用Akka、Scalaz等框架,UI和基础设施方面主要适用Spring、Kafka等框架,测试则主要适用Spock、ScalaTest、Selenium等框架。
两年的折腾,不能说没有一点收获。在核心的领域模型设计方面,通过尝试用传统Akka的Actor包裹聚合,以自定义的Command和Event进行信息交换,用Free Monad构造表达树实现延迟计算,用Akka Persistence实现Event Store,用Kafka维护命令和事件队列,让我依稀看到了以FP的方式实现ES+CQRS架构应用的曙光。
但如此多的框架不仅让人眼花缭乱,还要学习Scala、Groovy、Java等编程语言以及Gradle等Build工具的DSL,如果再加上Cluster、Cloud和Micro Service,这无疑是一个浩大的工程。所幸,伴随Akka 2.6版本推出的Akka Typed,解决了消息难以管理、框架入侵过甚、节点部署繁复等痛点和难点。特别是在ES和CQRS方面,Akka Typed可以说是提供了一个完整的解决方案。就如同当初LINQ的出现一样,给我打开了一扇新的窗口。
以下,便是我学习Akka Typed官方文档时随看随写留下的笔记(语种Scala,版本2.6.5,在学习过程中已更新至2.6.6),以备查考。内容和路线均以我个人的关注点为主,所以只是节选且难免有失偏颇。
文中提到的RMP是指Vaughn Vernon撰写的Reactive Messaging Patterns with the Actor Model一书。它是我学习传统Actor模型时的重要参考书籍。
在Github上,Paweł Kaczor开发出的akka-ddd框架,是一个非常棒的学习案例。
- akka-ddd:一个基于Akka和EventStore实现的CQRS架构的DDD框架。
- ddd-leaven-akka-v2:使用akka-ddd框架实现的电子商务应用。
https://doc.akka.io/docs/akka/current/typed/guide/tutorial.html
这是一个在物联时代,让用户可以借助遍布家中的温度监测仪,随时掌握屋内各个角落温度情况的应用。
在IDEA的Editor/General/Auto Import
中排除对*javadsl*
的自动导入。
需要的依赖:
implementation 'org.scala-lang:scala-library:2.13.2'
implementation 'com.typesafe.akka:akka-actor-typed_2.13:2.6.5'
implementation 'ch.qos.logback:logback-classic:1.2.3'
testImplementation 'org.scalatest:scalatest_2.13:3.1.2'
testImplementation 'com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5'
val child = context.spawn(Behaviors.supervise(child()).onFailure(SupervisorStrategy.restart), name = "child-actor")
改变默认的监督策略Stop。传统Akka采取的方式是override supervisorStrategy,用一个闭包声明Decider函数(参见RMP-52)。context.watchWith(watcheeActor, WatcheeTerminated(watcheeActor,...))
建立观察关系。( WatcheeTerminated会被context自动填充吗?貌似是的。)为防止死锁(大家都抢,结果都吃不上)、饥饿(弱肉强食,弱者老是吃不上)和活锁(大家都谦让,结果都不好意思吃),有三种确保死锁无忧的无阻塞设计,其能力由强到弱如下:
整个Actor System的体系,如同一个组织,任务总是逐级下派,命令总是下级服从上级。这与常见的分层软件设计(Layered Software Design)是不同的,后者总是想方设法把问题隐藏和解决在自己那一层,而不是交给上级去处理或与其他人协商。推荐的做法主要包括:
https://doc.akka.io/docs/akka/current/coordinated-shutdown.html
当应用的所有工作完成后,可以通知/user监督者停止运行,或者调用ActorSystem.terminate方法,从而通过运行协调关机CoordinatedShutdown来停止所有正在运行的Actor。在此过程中,你还可以执行其他一些清理和扫尾工作。
官方文档有专章讲解Actor的方方面面,本章只是介绍基本概念。
Actor的主要作用包括:向熟识的其他Actor发送消息,创建新的Actor,指定处理下一条消息的行为。它作为一个容器,包括有状态State、行为Behavior、邮箱Mailbox、监督策略Supervisor Strategy以及若干的子Actor等内容物,且该容器只能通过指定消息类型的参数化ActorRef进行引用,以确保最基本的隔离:
在Actor终止后,其持有的所有资源将被回收,剩下未处理的消息将转入Actor System的死信邮箱Dead Letter Mailbox,而后续新传来的消息也将悉数转到System的EventStream作为死信处理。
️ Akka Typed的监管已经重新设计,与传统Akka有显著区别
监管的对象是意料之外的失败(Unexpected Failure),而不是校验错误或者try-catch能处理的预期异常。所以,监管是Actor的额外装饰,并不属于Actor消息处理的组成部分。而后者,则是属于Actor业务逻辑的一部分。
当失败发生时,监管的策略包括以下三种:
要注意的是,引发失败的那条消息将不会再被处理,而且期间Actor发生的这些变化,在父Actor以外的范围都是不可知的。
Lifecycle Monitoring通常是指DeathWatch( 之前叫Dead Watch,Watch观察,Monitoring监测,译为观察更为妥帖)。这是除了父子间的监管关系外,Actor之间另一种监测关系。由于Supervision导致的Actor Restart对外是不可知的,所以要用Monitoring在一对Actor之间建立监测关系。但从目的上讲二者是有区别的,Supervision主要为应对失败情形,Monitoring主要为确保另一方知悉本方已终止运行。
使用context.watch(targetActorRef)
及unwatch来建立或撤销监测关系。当被监测Actor终止时,监测方Actor将收到一条Terminated消息(不是Signal吗?),而默认的消息处理是抛出一个DeathPactException
。
要注意的是,监测关系的建立和目标Actor终止时间无关。这就意味着在建立监测关系时,即使目标Actor已经终止,此时监测Actor仍将收到一条Terminated消息。
https://doc.akka.io/docs/akka/current/typed/fault-tolerance.html
为防止Actor相互可见和消息乱序问题,Akka严格遵守以下两条“发生之前(happens before)”守则:
Delivery翻译为“投递”更为妥帖,更好模仿邮政业务的妥投等术语。“送达”侧重结果,“发送"侧重动作本身。
借助Akka Persistence确保消息妥投。(参见RMP-164)
https://doc.akka.io/docs/akka/current/typed/reliable-delivery.html
事件溯源的本质,是执行一条Command,衍生出若干条Event,这些Event既是Command产生的副作用,也是改变对象状态的动因,及其生命周期内不可变的历史。
Akka Persistence对事件溯源提供了直接支持。
https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing-concepts
可以通过自定义邮箱,实现消息投递的重试。但这多数仅限于本地通讯的场景,具体原因参见 The Rules for In-JVM (Local) Message Sends
无法妥投的而不是因网络故障等原因被丢失了的消息,将被送往名为/deadLetters的Actor,因此这些消息被称为Dead Letter(参见RMP-161)。产生死信的原因主要是收件人不详或已经死亡,而死信Actor也主要用于系统调试。
由于死信不能通过网络传递,所以要搜集一个集群内的所有死信,则需要一台一台地收集每台主机本地的死信后再进行汇总。通过在系统的Event Stream对象akka.actor.DeadLetter
中注册,普通Actor将可以订阅到本地的所有死信消息。
Akka使用Typesafe Config Library管理配置信息。该库独立于Akka,也可用于其他应用的配置信息管理。
Akka的ActorSystem在启动时,所有的配置信息均会通过解析class path根目录处的application.conf/.json/.properties等文件而加载入Config对象,并通过合并所有的reference.conf形成后备配置。
️ 若正在编写的属于Akka应用程序,则Akka配置信息应写入application.conf;若是基于Akka的库,则配置信息应写入reference.conf。并且,Akka不支持从另一个库中覆写(override)当前库中的config property。
配置信息既可以从外部配置文件加载,也可用代码实现运行时解析,还可以利用ConfigFactory.load()从不同地方加载。
import akka.actor.typed.ActorSystem
import com.typesafe.config.ConfigFactory
val customConf = ConfigFactory.parseString("""
akka.log-config-on-start = on
""")
// ConfigFactory.load sandwiches customConfig between default reference
// config and default overrides, and then resolves it.
val system = ActorSystem(rootBehavior, "MySystem", ConfigFactory.load(customConf))
一个典型的多项目配置示例:
myapp1 {
akka.loglevel = "WARNING"
my.own.setting = 43
}
myapp2 {
akka.loglevel = "ERROR"
app2.setting = "appname"
}
my.own.setting = 42
my.other.setting = "hello"
相应的配置信息加载代码示例:
val config = ConfigFactory.load()
val app1 = ActorSystem(rootBehavior, "MyApp1", config.getConfig("myapp1").withFallback(config))
val app2 = ActorSystem(rootBehavior, "MyApp2", config.getConfig("myapp2").withOnlyPath("akka").withFallback(config))
Akka的默认配置列表,长达近千行……
Akka Config Checker是用于查找Akka配置冲突的有力工具。
com.typesafe.akka:akka-actor-typed:2.6.5
示例HelloWorld是由HelloWorldMain创建一个HelloWorld(即Greeter),在每次ActorSystem要求HelloWorld SayHello的时候,就创建一个SayHello消息所赋名称对应的HelloWorldBot(所以会有若干个动作相同但名称不同的Bot),然后要求Greeter去向这个Bot问好,最后以Greeter与Bot相互问候数次作为结束。
示例采用了FP风格,Actor的状态和行为均在Singleton对象里定义,采用了类似传统Akka receive()
的函数Behaviors.receive { (context, message) => ... }
,以消息类型作为约束,实现了Actor的互动与组合。在每个Bot里,利用消息的递归重入维持一个Greeting的计数值,届满则用Behaviors.stopped停止响应,否则递归重入。
Behaviors.receive {…}与receiveMessage {…}的区别,在于前者将把context带入闭包。
这是一个类似聊天室功能的示例,各Actor的职责、定义和联系如下表:
Actor
职责
Behavior类型
Command
Event
Main
创建聊天室ChatRoom和客户Gabbler,并为二者牵线搭桥
NotUsed
ChatRoom
创建并管理一组Session
RoomCommand
Session
负责播发诸如Gabbler这样的Client的发言
SessionCommand
Gabbler
响应Session
SessionEvent
示例先采用FP风格实现。比如ChatRoom在处理GetSession消息时,最后以chatRoom(ses :: sessions)返回一个新的Behavior实例结束,这里的sessions正是Actor ChatRoom维护的状态。
示例演示了如何限制消息的发件人。比如session及其工厂方法,以及PublishSessionMessage类型均为chatroom私有,外部不可访问;在session Behavior的PostMessage分支中,chatroom的ActorRef通过工厂方法传入session,且类型被限制为ActorRef[PublishSessionMessage]。这样外界只能与ChatRoom通信,然后由ChatRoom在内部将消息转交Session处理。
处理消息的参数来源于工厂方法的传入参数,还是封装在消息的字段里,这个示例也分别给出了样板。 在设计通信协议时,消息定义为Command还是Event,消息的主人是谁,处理消息需要的参数如何传入等等,都是需要考虑的问题。
为实现程序安全退出,示例在Main的Behavior里,设置了Dead Watch观察gabbler,并定义了Behaviors.receiveSignal {…},在收到gabbler处理完MessagePosted消息,因返回Behaviors.stopped而发出的Terminated信号后,以Main自身的Behaviors.stopped作为结束。
Behaviors.setup是一个Behavior的工厂方法,该Behavior的实例将在Actor启动后才创建。而Behaviors.receive虽也是Behavior的工厂方法之一,但Behavior的实例却是在Actor启动的那一刻就同时创建的。
Actor是一个需要显式启停并且自带状态的资源(子Actor与随父Actor虽不共生、但定共死),所以回想在GC出现前需要自己管理内存句柄的时代吧。
Actor System是一个高能耗的系统,所以通常一个应用或者一个JVM里只有一个Actor System。
ActorContext可用作:
self
。ActorContext本身并不是完全线程安全的,主要有以下限制:
孵化有两层含义:创建并启动。
在使用Behaviors.setup启用SpawnProtocol后,在应用中任何地方都将可以不直接引用context,改用telling或asking方式完成Actor系统的组装。其中,Ask方式的使用类似传统Akka,它将返回Future[ActorRef[XX]]。
留意示例代码里的几处泛型约束,由这些Message串起了应用的流程。
// 启用SpawnProtocol的Actor
object HelloWorldMain {
def apply(): Behavior[SpawnProtocol.Command] =
Behaviors.setup { context =>
// Start initial tasks
// context.spawn(...)
SpawnProtocol()
}
}
implicit val system: ActorSystem[SpawnProtocol.Command] =
ActorSystem(HelloWorldMain(), "hello")
val greeter: Future[ActorRef[HelloWorld.Greet]] =
system.ask(SpawnProtocol.Spawn(behavior = HelloWorld(), name = "greeter", props = Props.empty, _))
val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) =>
context.log.info2("Greeting for {} from {}", message.whom, message.from)
Behaviors.stopped
}
val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] =
system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty, _))
for (greeterRef <- greeter; replyToRef <- greetedReplyTo) {
greeterRef ! HelloWorld.Greet("Akka", replyToRef)
}
Actor可以通过返回Behaviors.stopped作为接替Behavior来停止自身运行。
子Actor可以在处理完当前消息后,被其父Actor使用ActorContext.stop方法强行关停。
所有子Actor都将伴随其父Actor关停而关停。
当Actor停止后将会收到一个PostStop信号,可以用Behaviors.receiveSignal在该信号的处理方法里完成其他的清理扫尾工作,或者提前给Behaviors.stopped传入一个负责扫尾的闭包函数,以实现Actor优雅地关停。( 经测试,前者将先于后者执行。)
由于Terminated信号只带有被观察者的ActorRef,所以为了添加额外的信息,在注册观察关系时可以用context.watchWith(watchee, SpecifiedMessageRef)取代context.watch(watchee)。这样在Terminated信号触发时,观察者将收到预定义的这个SpecifiedMessageRef。
注册、撤销注册和Terminated事件的到来,在时序上并不一定严格遵守先注册后Terminated这样的规则,因为消息是异步的,且有邮箱的存在。
Actor之间的交互,只能通过彼此的ActorRef[Message]来进行。这些ActorRef和Message,构成了Protocol的全部,既表明了通信的双方,也表明了Actor能处理的消息、限制了能发给它的消息类型。
要运行示例代码,需要导入日志和Ask模式的支持:
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.AskPattern._
并且在test/resources文件夹下的logback-test.xml里配置好日志:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender"/>
<logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate">
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>
使用异步的、线程安全的tell发出消息,但不保证对方收到消息,也不关心该消息是否被对方处理完毕了。
recipient ! message
发件人发出Request并附上回信地址,并以获得收件人的Response作为消息妥投并被处理的确信。
先定义Request和Response,随后sender在发出Request时把self作为replyTo的ActorRef一并传出,方便recipient收到Request后回复Response。
把收件人的Response进行简单封装,即作为发件人可处理的消息类型,从而减少发件人定义Protocol的负担。
定义收件人recipient的Response类型,再在sender里定义适配后的Response类型,然后在其Behavior.setup里用context.messageAdapter(rsp => WrappedResponse(rsp))
注册一个消息适配器,最后在适配后消息的分支里取出原始的Response(当初由收件人回复的),再处理该消息。适配器能匹配预定义的响应类型及其派生类,总以最晚注册的为有效版本,属于sender且与sender同生命周期,所以当适配器触发异常时将导致其宿主停止。
把Request-Response原本使用的tell方式改为ask,从而能限定Response发回的时间,超时则视作ask失败。
在提问人中,通过Behaviors.setup定义隐式的超时时限,以context.ask(recipientRef, request) { case Success(Response(msg)) => AdaptedResponse(msg); case Failure(_) => AdaptedResponse(...) }
使用ask方式发出Request,并备妥Response到来时的适配预案(无需再额外象Adapted Response那样注册消息适配器),最后用工厂Behaviors.receiveMessage定义适配后响应消息的处理函数。
在Actor System以外直接用ask向某个Actor提问,最终得到用Future包装好的回复。
定义隐式的ActorSystem实例和超时时限,用reply: Future[Response] = recipient.ask(ref => Request).flatMap { case response => Future.successful(response); case another => Future.failed(...) }
定义Request-Response的对应关系,再通过system.executionContext
启动执行,最后在reply的回调onComplete { case Success(Response) => ...; case Failure(_) => ...}
里取出Response区别处理。
当不关心收件人的回应时,在Request里把回信地址设置为什么也不干的ignoreRef
,使模式从Request-Response变为Fire-Forget。
发件人发出Request时,把回复地址从context.self
改为什么消息也不处理的context.system.ignoreRef
。
由于ignoreRef将忽略所有发给它的消息,所以使用时必须小心。
在Actor内部有Future类型的调用时,使用pipeToSelf获取回调结果。尽管直接用Future.onComplete也能取出结果,但会因此将Actor的内部状态暴露给外部线程(在onComplete里能直接访问Actor内部状态),所以并不安全。
在Actor内部,先定义Future调用futureResult,再使用context.pipeToSelf(futureResult) { case Success(_) => WrappedResult(...); case Failure(_) => WrappedResult(...)}
将回调结果封装入WrappedResult消息,最后在WrappedResult消息分支里再作回应。
当一份响应需要综合多个Actor的回复信息才能作出时,由一个父Actor委托多个子Actor搜集信息,待信息齐备后才由父Actor汇总发回给Request的请求人,请求人除与父Actor之间的协议外,对其间细节一概不知。这些子Actor仅活在每次会话期间,故名为“每会话”的子Actor。
由父Actor在Behaviors.setup里构造实际承担工作的一组子Actor,在Request处理过程中构造负责组织协调子Actor的管家Actor(其行为类型为Behavior[AnyRef],以保证类型最大程度地兼容)。随后在管家Actor的Behaviors.setup里向子Actor发出Request,接着在Behaviors.receiveMessage里,使用递归反复尝试从子Actor的Response里取出结果(生产条件下应该设定子Actor响应超时)。当所有结果都取出后,由管家Actor利用父Actor传入的replyTo直接向外发出Response,最后停止管家Actor。
这当中的关键点包括:一是在管家Actor里的几处,使用narrow限定Actor的类型T:<U,这也算是一种妥协,确保消息类型为子类型T而非父类型U,从而实现更严谨的约束;二是利用递归配合Option[T]取出子Actor的响应结果。
// 子Actor
case class Keys()
case class Wallet()
// 父Actor
object Home {
sealed trait Command
case class LeaveHome(who: String, replyTo: ActorRef[ReadyToLeaveHome]) extends Command
case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet)
def apply(): Behavior[Command] = {
Behaviors.setup[Command] { context =>
val keyCabinet: ActorRef[KeyCabinet.GetKeys] = context.spawn(KeyCabinet(), "key-cabinet")
val drawer: ActorRef[Drawer.GetWallet] = context.spawn(Drawer(), "drawer")
Behaviors.receiveMessage[Command] {
case LeaveHome(who, replyTo) =>
context.spawn(prepareToLeaveHome(who, replyTo, keyCabinet, drawer), s"leaving-$who")
Behaviors.same
}
}
}
// 管家Actor
def prepareToLeaveHome(whoIsLeaving: String, replyTo: ActorRef[ReadyToLeaveHome],
keyCabinet: ActorRef[KeyCabinet.GetKeys], drawer: ActorRef[Drawer.GetWallet]): Behavior[NotUsed] = {
Behaviors.setup[AnyRef] { context =>
var wallet: Option[Wallet] = None
var keys: Option[Keys] = None
keyCabinet ! KeyCabinet.GetKeys(whoIsLeaving, context.self.narrow[Keys])
drawer ! Drawer.GetWallet(whoIsLeaving, context.self.narrow[Wallet])
Behaviors.receiveMessage {
case w: Wallet =>
wallet = Some(w)
nextBehavior()
case k: Keys =>
keys = Some(k)
nextBehavior()
case _ =>
Behaviors.unhandled
}
def nextBehavior(): Behavior[AnyRef] = (keys, wallet) match {
case (Some(w), Some(k)) =>
// 已取得所有结果
replyTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
Behaviors.stopped
case _ =>
Behaviors.same
}
}.narrow[NotUsed]
}
}
本模式非常类似每会话子Actor模式,由聚合器负责收集子Actor回应的信息,再反馈给委托人Actor。
实现与Per Session Child Actor近似,只是在具体代码上更具通用性而已。其中,context.spawnAnonymous
是起联结作用的重要步骤。它不仅负责孵化聚合器,还要提前准备向子Actor发出Request的闭包,以及将子Actor回复转换为统一的格式的映射闭包。聚合器被启动后,即开始收集子Actor的回复,收集完成时即告终止。
// 允许子Actor有不同的协议,不必向Aggregator妥协
object Hotel1 {
final case class RequestQuote(replyTo: ActorRef[Quote])
final case class Quote(hotel: String, price: BigDecimal)
}
object Hotel2 {
final case class RequestPrice(replyTo: ActorRef[Price])
final case class Price(hotel: String, price: BigDecimal)
}
object HotelCustomer {
sealed trait Command
final case class AggregatedQuotes(quotes: List[Quote]) extends Command
// 将子Actor的回复封装成统一的格式
final case class Quote(hotel: String, price: BigDecimal)
def apply(hotel1: ActorRef[Hotel1.RequestQuote], hotel2: ActorRef[Hotel2.RequestPrice]): Behavior[Command] = {
Behaviors.setup[Command] { context =>
context.spawnAnonymous(
// 这个传递给聚合器工厂的sendRequests是衔接聚合器及其委托人的关键
Aggregator[Reply, AggregatedQuotes](
sendRequests = { replyTo =>
hotel1 ! Hotel1.RequestQuote(replyTo)
hotel2 ! Hotel2.RequestPrice(replyTo)
},
expectedReplies = 2,
context.self,
aggregateReplies = replies =>
AggregatedQuotes(
replies
.map {
case Hotel1.Quote(hotel, price) => Quote(hotel, price)
case Hotel2.Price(hotel, price) => Quote(hotel, price)
}
.sortBy(_.price)
.toList),
timeout = 5.seconds))
Behaviors.receiveMessage {
case AggregatedQuotes(quotes) =>
context.log.info("Best {}", quotes.headOption.getOrElse("Quote N/A"))
Behaviors.same
}
}
}
}
object Aggregator {
// 用来兼容不同子Actor响应而定义的回复类型
type Reply = Any
sealed trait Command
private case object ReceiveTimeout extends Command
private case class WrappedReply[R](reply: R) extends Command
def apply[Reply: ClassTag, Aggregate](
sendRequests: ActorRef[Reply] => Unit,
expectedReplies: Int,
replyTo: ActorRef[Aggregate],
aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate,
timeout: FiniteDuration): Behavior[Command] = {
Behaviors.setup { context =>
context.setReceiveTimeout(timeout, ReceiveTimeout)
val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))
// 向子Actor发出Request并搜集整理回复信息
sendRequests(replyAdapter)
def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] = {
Behaviors.receiveMessage {
case WrappedReply(reply: Reply) =>
val newReplies = replies :+ reply
if (newReplies.size == expectedReplies) {
val result = aggregateReplies(newReplies)
replyTo ! result
Behaviors.stopped
} else
collecting(newReplies)
case ReceiveTimeout =>
val aggregate = aggregateReplies(replies)
replyTo ! aggregate
Behaviors.stopped
}
}
collecting(Vector.empty)
}
}
}
这是聚合器模式的一种变形。类似于集群条件下,每个Actor承担着同样的工作职责,当其中某个Actor未按期响应时,将工作从这个迟延的Actor手里交给另一个Actor负责。
这个例子不够完整,还需要进一步理解,比如为什么sendRequests需要一个Int参数,如果换作OO风格如何实现。
参考文献 Achieving Rapid Response Times in Large Online Services
使用Behaviors.withTimers设置若干个定时器,由定时器负责向子Actor发出Request。
设置2个超时,其中请求超时是单个Actor完成工作的时限,到期未完成就交出工作;另一个是最迟交付超时,是整个工作完成的时限,到期则说明无法交付委托人的工作。
利用sendRequest函数(类型为(Int, ActorRef[Reply]) => Boolean
)联结掐尾器和具体承担工作的Actor。如果sendRequest成功,说明请求已经发送给承担工作的子Actor,那么就调度一条由请求超时限定的单个Request的消息,否则就调度一条由最迟交付超时限定的消息。
object TailChopping {
sealed trait Command
private case object RequestTimeout extends Command
private case object FinalTimeout extends Command
private case class WrappedReply[R](reply: R) extends Command
def apply[Reply: ClassTag](
sendRequest: (Int, ActorRef[Reply]) => Boolean,
nextRequestAfter: FiniteDuration,
replyTo: ActorRef[Reply],
finalTimeout: FiniteDuration,
timeoutReply: Reply): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
val replyAdapter = context.messageAdapterReply
sendNextRequest(1)
def waiting(requestCount: Int): Behavior[Command] = {
Behaviors.receiveMessage {
case WrappedReply(reply: Reply) =>
replyTo ! reply
Behaviors.stopped // 单个任务没能按时完成,另外找人
case RequestTimeout =>
sendNextRequest(requestCount + 1)
// 整个工作交付不了,抱歉
case FinalTimeout =>
replyTo ! timeoutReply
Behaviors.stopped
}
}
def sendNextRequest(requestCount: Int): Behavior[Command] = {
if (sendRequest(requestCount, replyAdapter)) {
timers.startSingleTimer(RequestTimeout, nextRequestAfter)
} else {
timers.startSingleTimer(FinalTimeout, finalTimeout)
}
waiting(requestCount)
}
}
}
}
}
使用定时器,在指定时限到期时给自己发送一条指定的消息。
使用Behaviors.withTimers为Actor绑定TimerScheduler,该调度器将同样适用于Behaviors的setup、receive、receiveMessage等工厂方法创建的行为。
在timers.startSingleTimer定义并启动定时器,在startSingleTimer设定的超时到期时将会收到预设的消息。
object Buncher {
sealed trait Command
final case class ExcitingMessage(message: String) extends Command
final case class Batch(messages: Vector[Command])
private case object Timeout extends Command
private case object TimerKey
def apply(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Command] = {
Behaviors.withTimers(timers => new Buncher(timers, target, after, maxSize).idle())
}
}
class Buncher(
timers: TimerScheduler[Buncher.Command],
target: ActorRef[Buncher.Batch],
after: FiniteDuration,
maxSize: Int) {
private def idle(): Behavior[Command] = {
Behaviors.receiveMessage[Command] { message =>
timers.startSingleTimer(TimerKey, Timeout, after)
active(Vector(message))
}
}
def active(buffer: Vector[Command]): Behavior[Command] = {
Behaviors.receiveMessage[Command] {
// 收到定时器发来的Timeout消息,缓冲区buffer停止接收,将结果回复给target。
case Timeout =>
target ! Batch(buffer)
idle()
// 时限到达前,新建缓冲区并把消息存入,直到缓冲区满
case m =>
val newBuffer = buffer :+ m
if (newBuffer.size == maxSize) {
timers.cancel(TimerKey)
target ! Batch(newBuffer)
idle()
} else
active(newBuffer)
}
}
}
调度周期有两种:一种是FixedDelay:指定前后两次消息发送的时间间隔;一种是FixedRate:指定两次任务执行的时间间隔。如果实难选择,建议使用FixedDelay。( 此处Task等价于一次消息处理过程,可见对Akka里的各种术语还需进一步规范。)
区别主要在于:Delay不会补偿两次消息间隔之间因各种原因导致的延误,前后两条消息的间隔时间是固定的,而不会关心前一条消息是何时才交付处理的;而Rate会对这之间的延误进行补偿,后一条消息发出的时间会根据前一条消息交付处理的时间而确定。( 换句话说,Delay以发出时间计,Rate以开始处理的时间计。)
长远来看,Delay方式下的消息处理的频率通常会略低于指定延迟的倒数,所以更适合短频快的工作;Rate方式下的消息处理频率恰好是指定间隔的倒数,所以适合注重完整执行次数的工作。
️ 在Rate方式下,如果任务延迟超出了预设的时间间隔,则将在前一条消息之后立即发送下一条消息。比如scheduleAtFixedRate的间隔为1秒,而消息处理过程因长时间暂停垃圾回收等原因造成JVM被挂起30秒钟,则ActorSystem将快速地连续发送30条消息进行追赶,从而造成短时间内的消息爆发,所以一般情况下Delay方式更被推崇。
在集群条件下,通常采用的在Request中传递本Shard Actor之ActorRef的方法仍旧适用。但如果该Actor在发出Request后被移动或钝化(指Actor暂时地关闭自己以节约内存,需要时再重启),则回复的Response将会全部发至Dead Letters。此时,引入EntityId作为标识,取代ActorRef以解决之(参见RMP-68)。缺点是无法再使用消息适配器。
️ RMP-77:Actor的内部状态不会随Actor对象迁移,所以需要相应持久化机制来恢复Actor对象的状态。
把通常设计中的ActorRef换成EntityId,再使用TypeKey和EntityId定位Actor的引用即可。
object CounterConsumer {
sealed trait Command
final case class NewCount(count: Long) extends Command
val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response")
}
object Counter {
trait Command
case object Increment extends Command
final case class GetValue(replyToEntityId: String) extends Command
val TypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-counter")
private def apply(): Behavior[Command] = Behaviors.setup { context =>
counter(ClusterSharding(context.system), 0)
}
private def counter(sharding: ClusterSharding, value: Long): Behavior[Command] = Behaviors.receiveMessage {
case Increment =>
counter(sharding, value + 1)
case GetValue(replyToEntityId) =>
val replyToEntityRef = sharding.entityRefFor(CounterConsumer.TypeKey, replyToEntityId)
replyToEntityRef ! CounterConsumer.NewCount(value)
Behaviors.same
}
}
默认情况下,当Actor在初始化或处理消息时触发了异常、失败,则该Actor将被停止(️ 传统Akka默认是重启Actor)。
要区别校验错误与失败:校验错误Validate Error意味着发给Actor的Command本身就是无效的,所以将其界定为Protocol规范的内容,由发件人严格遵守,这远甚过收件人发现收到的是无效Command后直接抛出异常。失败Failure则是由于Actor不可控的外因导致的,这通常无法成为双方Protocol的一部分,发件人对此也无能为力。
发生失败时,通常采取“就让它崩”的原则。其思路在于,与其花费心思零敲碎打地在局部进行细粒度的修复和内部状态纠正,不如就让它崩溃停止,然后利用已有的灾备方案,重建一个肯定有效的新Actor重新来过。
监管就是一个放置灾备方案的好地方。默认监视策略是在引发异常时停止Actor,如果要自定义此策略,则应在spawn子Actor时,使用Behaviors.supervise进行指定。
策略有许多可选参数,也可以象下面这样进行嵌套,以应对不同的异常类型。
Behaviors.supervise(
Behaviors.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy.restart))
.onFailure[IllegalArgumentException](SupervisorStrategy.stop)
️ 若Actor被重启,则传递给Behaviors.supervise的Behavior内定义的可变状态就需要在类似Behaviors.setup这样的工厂方法中进行初始化。若采用OO风格,则推荐在setup中完成初始化;若采用FP风格,由于通常不存在函数内的可变量,所以无需如此。
完整列表参见API指南:SupervisorStrategy
第二个放置灾备的地方是Behaviors.setup里。因为当父Actor重启时,其Behaviors.setup会再次执行。同时,子Actor会随父Actor重启而停止运行,以防止资源泄漏等问题发生。
注意区别以下两种方式:
这种方式下,每当父Actor重启时,就会完全重构一次子Actor,从而总是回到父Actor刚创建时候的样子。
def child(size: Long): Behavior[String] =
Behaviors.receiveMessage(msg => child(size + msg.length))
def parent: Behavior[String] = {
Behaviors
.supervise[String] {
// setup被supervise包裹,意味着每次父Actor重启,该setup必被重新执行
Behaviors.setup { ctx =>
val child1 = ctx.spawn(child(0), "child1")
val child2 = ctx.spawn(child(0), "child2")
Behaviors.receiveMessage[String] { msg =>
val parts = msg.split(" ")
child1 ! parts(0)
child2 ! parts(1)
Behaviors.same
}
}
}
.onFailure(SupervisorStrategy.restart)
}
这种方式下,子Actor不会受到父Actor的重启影响,它们既不会停止,更不会被重建。
def parent2: Behavior[String] = {
Behaviors.setup { ctx =>
// 此setup只会在父Actor创建时运行一次
val child1 = ctx.spawn(child(0), "child1")
val child2 = ctx.spawn(child(0), "child2")
Behaviors
.supervise {
// 在父Actor重启时,只有这段receiveMessage工厂会被执行
Behaviors.receiveMessage[String] { msg =>
val parts = msg.split(" ")
child1 ! parts(0)
child2 ! parts(1)
Behaviors.same
}
}
// 参数false决定了父Actor重启时不会停止子Actor
.onFailure(SupervisorStrategy.restart.withStopChildren(false))
}
}
第三个放置灾备方案的地方是在PreRestart信号处理过程里。和之前提过的PostStop信号一样,Actor因监测而重启前,会收到一个信号PreRestart信号,方便Actor自身在重启前完成清理扫尾工作。
RMP-47的对传统Akka的描述适用于Akka Typed吗?
- PreStart:在Actor启动前触发
- PostStop:在Actor停止后触发
- PreRestart:在重启Actor前触发,完成任务后会触发PostStop
- PostRestart:在Actor重启后触发,完成任务后会触发PreStart
在传统Akka里,子Actor触发的异常将被上交给父Actor,由后者决定如何处置。而在Akka Typed里,提供了更丰富的手段处理这种情况。
方法就是由父Actor观察(watch)子Actor,这样当子Actor因失败而停止时,父Actor将会收到附上原因的ChildFailed信号。特别地,ChildFailed信号派生自Terminated,所以如果业务上不需要刻意区分的话,处理Terminated信号即可。
在子Actor触发异常后,如果它的祖先Actor(不仅仅是父亲)没有处理Terminated信号,那么将会触发akka.actor.typed.DeathPactException异常。
示例里用Boss -> MiddleManagement -> Work这样的层级进行了演示。当Boss发出Fail消息后,MiddleManagement将消息转发给Work,Work收到Fail消息后抛出异常。因MiddleManagement和Boss均未对Terminated信号进行处理,因此相继停止。随后Boss按预定策略重启,并顺次重建MiddleManagement和Work,从而确保测试脚本尝试在等候200毫秒后重新发送消息Hello成功。
除了通过创建Actor获得其引用外,还可以通过接线员Receptionist获取Actor的引用。
Receptionist采用了注册会员制,注册过程仍是基于Akka Protocol。在Receptionist上注册后的会员都持有key,方便集群上的其他Actor通过key找到它。当发出Find请求后,Receptionist会回复一个Listing,其中将包括一个由若干符合条件的Actor组成的集合。(️ 同一个key可以对应多个Actor)
由Receptionist维护的注册表是动态的,其中的Actor可能因其停止运行、手动从表中注销或是节点从集群中删除而从表中消失。如果需要关注这种动态变化,可以使用Receptionist.Subscribe(keyOfActor, replyTo)订阅关注的Actor,Receptionist会在注册表变化时将Listing消息发送给replyTo。
️ 切记:上述操作均是基于异步消息的,所以操作不是即时产生结果的。可能发出注销请求了,但Actor还在注册表里。
要点:
ServiceKey[Message]("name")
创建Keycontext.system.receptionist ! Receptionist.Register(key, replyTo)
注册Actor,用Deregister注销context.system.receptionist ! Receptionist.Subscribe(key, replyTo)
订阅注册表变动事件context.system.receptionist ! Receptionist.Find(key, messageAdapter)
查找指定key对应的若干Actor在集群条件下,一个Actor注册到本地节点的接线员后,其他节点上的接线员也会通过分布式数据广播获悉,从而保证所有节点都能通过ServiceKey找到相同的Actor们。
但需要注意集群条件下与本地环境之间的差别:一是在集群条件下进行的Subscription与Find将只能得到可达Actor的集合。如果需要获得所有的已注册Actor(包括不可达的Actor),则得通过Listing.allServiceInstances获得。二是在集群内各节点之间传递的消息,都需要经过序列化。
接线员无法扩展到任意数量、也达不到异常高吞吐的接转要求,它通常最多就支持数千至上万的接转量。所以,如果应用确实需要超过Akka框架所能提供的接转服务水平的,就得自己去解决各节点Actor初始化连接的难题。
尽管Actor在任意时刻只能处理一条消息,但这不并妨碍同时有多个Actor处理同一条消息,这便是Akka的路由功能使然。
路由器本身也是一种Actor,但主要职责是转发消息而不是处理消息。与传统Akka一样,Akka Typed的路由也分为两种:池路由池与组路由。
在池路由方式下,由Router负责构建并管理所有的Routee。当这些作为子actor的Routee终止时,Router将会把它从Router中移除。当所有的Routee都移除后,Router本身停止运行。
val pool = Routers.pool(poolSize = 4)(Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))
定义池路由,其中监管策略应是必不可少的内容,被监管的Worker()即是Routee,poolSize则是池中最多能创建并管理的Routee数目。val router = ctx.spawn(pool, "worker-pool")
创建路由器本身。由于Router本身也是Actor,Routee是其子Actor,因此可以指定其消息分发器。( Router中以with开头的API还有不少,需要仔细参考API文档。)
// 指定Routee使用默认的Blocking IO消息分发器
val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking())
// 指定Router使用与其父Actor一致的消息分发器
val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent())
// 使用轮循策略分发消息,保证每个Routee都尽量获得同样数量的任务,这是池路由默认策略
// 示例将获得a-b-a-b顺序的日志
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
在学习Akka Typed的过程中,应引起重视和警醒的是,不能象传统Akka一样执着于定义Actor的Class或Object本身,而应该紧紧围绕Behavior来思考、认识和设计系统。
在Akka Typed的世界里,包括Behaviors各式工厂在内的许多API均是以Behavior为核心进行设计的。而Behavior又与特定类型的Message绑定,这便意味着Behavior与Protocol进行了绑定,于是消息Message及处理消息的Behavior[Message]便构成了完整的Protocol。
与池路由不同的是,组路由方式下的Routee均由外界其它Actor产生(自行创建、自行管理),Router只是负责将其编组在一起。
组路由基于ServiceKey和Receptionist,管理着属于同一个key的若干个Routee。虽然这种方式下对Routee构建和监控将更灵活和便捷,但也意味着组路由将完全依赖Receptionist维护的注册表才能工作。在Router启动之初,当注册表还是空白时,发来的消息将作为akka.actor.Dropped扔到事件流中。当注册表中注册有Routee后,若其可达,则消息将顺利送达,否则该Routee将被标记为不可达。
轮循策略 Round Robin
轮循策略将公平调度各Routee,平均分配任务,所以适合于Routee数目不会经常变化的场合,是池路由的默认策略。它有一个可选的参数preferLocalRoutees
,为true时将强制只使用本地的Routee(默认值为false)。
随机策略 Random
随机策略将随机选取Routee分配任务,适合Routee数目可能会变化的场合,是组路由的默认策略。它同样有可靠参数preferLocalRoutees
。
一致的散列策略 Consistent Hashing
散列策略将基于一张以传入消息为键的映射表选择Routee。
参考文献:Consistent Hashing
该文只展示了如何设计一个ConsistentHash[T]类,并提供add/remove/get等API函数,却没讲怎么使用它,所以需要完整示例!
如果把Routee看作CPU的核心,那自然是多多益善。但由于Router本身也是一个Actor,所以其Mailbox的承载能力反而会成为整个路由器的瓶颈,而Akka Typed并未就此提供额外方案,因此遇到需要更高吞吐量的场合则需要自己去解决。
Stash(暂存),是指Actor将当前Behavior暂时还不能处理的消息全部或部分缓存起来,等完成初始化等准备工作或是处理完上一条幂等消息后,再切换至匹配的Behavior,从缓冲区取出消息进行处理的过程。
trait DB {
def save(id: String, value: String): Future[Done]
def load(id: String): Future[String]
}
object DataAccess {
sealed trait Command
final case class Save(value: String, replyTo: ActorRef[Done]) extends Command
final case class Get(replyTo: ActorRef[String]) extends Command
private final case class InitialState(value: String) extends Command
private case object SaveSuccess extends Command
private final case class DBError(cause: Throwable) extends Command
// 使用Behaviors.withStash(capacity)设置Stash容量
// 随后切换到初始Behavior start()
def apply(id: String, db: DB): Behavior[Command] = {
Behaviors.withStash(100) { buffer =>
Behaviors.setup[Command] { context =>
new DataAccess(context, buffer, id, db).start()
}
}
}
}
// 大量使用context.pipeToSelf进行Future交互
class DataAccess(
context: ActorContext[DataAccess.Command],
buffer: StashBuffer[DataAccess.Command],
id: String,
db: DB) {
import DataAccess._
private def start(): Behavior[Command] = {
context.pipeToSelf(db.load(id)) {
case Success(value) => InitialState(value)
case Failure(cause) => DBError(cause)
}
Behaviors.receiveMessage {
case InitialState(value) =>
// 完成初始化,转至Behavior active()开始处理消息
buffer.unstashAll(active(value))
case DBError(cause) =>
throw cause
case other =>
// 正在处理幂等消息,故暂存后续消息
buffer.stash(other)
Behaviors.same
}
}
// Behaviors.receiveMessagePartial():从部分消息处理程序构造一个Behavior
// 该行为将把未定义的消息视为未处理。
private def active(state: String): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case Get(replyTo) =>
replyTo ! state
Behaviors.same
// 处理幂等的Save消息
case Save(value, replyTo) =>
context.pipeToSelf(db.save(id, value)) {
case Success(_) => SaveSuccess
case Failure(cause) => DBError(cause)
}
// 转至Behavior saving(),反馈幂等消息处理结果
saving(value, replyTo)
}
}
private def saving(state: String, replyTo: ActorRef[Done]): Behavior[Command] = {
Behaviors.receiveMessage {
case SaveSuccess =>
replyTo ! Done
// 幂等消息处理结束并已反馈结果,转至Behavior active()开始处理下一条消息
buffer.unstashAll(active(state))
case DBError(cause) =>
throw cause
case other =>
buffer.stash(other)
Behaviors.same
}
}
}
注意事项
StashOverflowException
异常。所以在往缓冲区里暂存消息前,应当使用StashBuffer.isFull
提前进行检测。unstashAll()
将会停止Actor响应新的消息,直到当前暂存的所有消息被处理完毕,但这有可能因长时间占用消息处理线程而导致其他Actor陷入饥饿状态。为此,可改用方法unstash(numberOfMessages)
,确保一次只处理有限数量的暂存消息。有限状态机:当前处于状态S,发生E事件后,执行操作A,然后状态将转换为S’。
这部分内容对应传统Akka的FSM:Finite State Machine,可参考RMP及下文
参考示例:哲学家用餐问题,及其解析: Dining Hakkers
object Buncher {
// 把FSM里驱动状态改变的事件,都用Message代替了
sealed trait Event
final case class SetTarget(ref: ActorRef[Batch]) extends Event
final case class Queue(obj: Any) extends Event
case object Flush extends Event
private case object Timeout extends Event
// 状态
sealed trait Data
case object Uninitialized extends Data
final case class Todo(target: ActorRef[Batch], queue: immutable.Seq[Any]) extends Data
final case class Batch(obj: immutable.Seq[Any])
// 初始状态为Uninitialized,对应初始的Behavior为idle()
def apply(): Behavior[Event] = idle(Uninitialized)
private def idle(data: Data): Behavior[Event] =
Behaviors.receiveMessage[Event] {
message: Event => (message, data) match {
case (SetTarget(ref), Uninitialized) =>
idle(Todo(ref, Vector.empty))
case (Queue(obj), t @ Todo(_, v)) =>
active(t.copy(queue = v :+ obj))
case _ =>
Behaviors.unhandled
}
}
// 处于激活状态时,对应Behavior active()
private def active(data: Todo): Behavior[Event] =
Behaviors.withTimers[Event] { timers =>
// 设置超时条件
timers.startSingleTimer(Timeout, 1.second)
Behaviors.receiveMessagePartial {
case Flush | Timeout =>
data.target ! Batch(data.queue)
idle(data.copy(queue = Vector.empty))
case Queue(obj) =>
active(data.copy(queue = data.queue :+ obj))
}
}
}
在Akka Typed里,由于Protocol和Behavior的出现,简化了传统Akka中有限状态机FSM的实现。不同的状态下,对应不同的Behavior,响应不同的请求,成为Akka Typed的典型作法,这在此前的大量示例里已经有所展示。
CoordinatedShutdown是一个扩展,通过提前注册好的任务Task,可以在系统关闭前完成一些清理扫尾工作,防止资源泄漏等问题产生。
关闭过程中,默认的各阶段(Phase)都定义在下面这个akka.coordinated-shutdown.phases
里,各Task则后续再添加至相应的阶段中。
在application.conf配置里,可以通过定义不同的depends-on来覆盖缺省的设置。其中,before-service-unbind
、before-cluster-shutdown
和before-actor-system-terminate
是最常被覆盖的。
各Phase原则上按照被依赖者先于依赖者的顺序执行,从而构成一个有向无环图(Directed Acyclic Graph,DAG),最终所有Phase按DAG的拓扑顺序执行。
# CoordinatedShutdown is enabled by default and will run the tasks that
# are added to these phases by individual Akka modules and user logic.
#
# The phases are ordered as a DAG by defining the dependencies between the phases
# to make sure shutdown tasks are run in the right order.
#
# In general user tasks belong in the first few phases, but there may be use
# cases where you would want to hook in new phases or register tasks later in
# the DAG.
#
# Each phase is defined as a named config section with the
# following optional properties:
# - timeout=15s: Override the default-phase-timeout for this phase.
# - recover=off: If the phase fails the shutdown is aborted
# and depending phases will not be executed.
# - enabled=off: Skip all tasks registered in this phase. DO NOT use
# this to disable phases unless you are absolutely sure what the
# consequences are. Many of the built in tasks depend on other tasks
# having been executed in earlier phases and may break if those are disabled.
# depends-on=[]: Run the phase after the given phases
phases {
# The first pre-defined phase that applications can add tasks to.
# Note that more phases can be added in the application's
# configuration by overriding this phase with an additional
# depends-on.
before-service-unbind {
}
# Stop accepting new incoming connections.
# This is where you can register tasks that makes a server stop accepting new connections. Already
# established connections should be allowed to continue and complete if possible.
service-unbind {
depends-on = [before-service-unbind]
}
# Wait for requests that are in progress to be completed.
# This is where you register tasks that will wait for already established connections to complete, potentially
# also first telling them that it is time to close down.
service-requests-done {
depends-on = [service-unbind]
}
# Final shutdown of service endpoints.
# This is where you would add tasks that forcefully kill connections that are still around.
service-stop {
depends-on = [service-requests-done]
}
# Phase for custom application tasks that are to be run
# after service shutdown and before cluster shutdown.
before-cluster-shutdown {
depends-on = [service-stop]
}
# Graceful shutdown of the Cluster Sharding regions.
# This phase is not meant for users to add tasks to.
cluster-sharding-shutdown-region {
timeout = 10 s
depends-on = [before-cluster-shutdown]
}
# Emit the leave command for the node that is shutting down.
# This phase is not meant for users to add tasks to.
cluster-leave {
depends-on = [cluster-sharding-shutdown-region]
}
# Shutdown cluster singletons
# This is done as late as possible to allow the shard region shutdown triggered in
# the "cluster-sharding-shutdown-region" phase to complete before the shard coordinator is shut down.
# This phase is not meant for users to add tasks to.
cluster-exiting {
timeout = 10 s
depends-on = [cluster-leave]
}
# Wait until exiting has been completed
# This phase is not meant for users to add tasks to.
cluster-exiting-done {
depends-on = [cluster-exiting]
}
# Shutdown the cluster extension
# This phase is not meant for users to add tasks to.
cluster-shutdown {
depends-on = [cluster-exiting-done]
}
# Phase for custom application tasks that are to be run
# after cluster shutdown and before ActorSystem termination.
before-actor-system-terminate {
depends-on = [cluster-shutdown]
}
# Last phase. See terminate-actor-system and exit-jvm above.
# Don't add phases that depends on this phase because the
# dispatcher and scheduler of the ActorSystem have been shutdown.
# This phase is not meant for users to add tasks to.
actor-system-terminate {
timeout = 10 s
depends-on = [before-actor-system-terminate]
}
}
通常应在系统启动后尽早注册任务,否则添加得太晚的任务将不会被运行。
向同一个Phase添加的任务将并行执行,没有先后之分。
下一个Phase会通常会等待上一个Phase里的Task都执行完毕或超时后才会启动。可以为Phase配置recover = off
,从而在Task失败或超时后,中止整个系统的关机过程。
通常情况下,使用CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { ... }
向Phase中添加Task,此处的名称主要用作调试或者日志。
使用CoordinatedShutdown(system).addCancellableTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "cleanup") { () => Future { ... } }
添加可取消的Task,之后可以用c.cancel()取消Task的执行。
通常情况下,不需要Actor回复Task已完成的消息,因为这会拖慢关机进程,直接让Actor终止运行即可。如果要关注该Task何时完成,可以使用CoordinatedShutdown(system).addActorTerminationTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName", someActor, Some("stop"))
添加任务,并且给这个someActor发送一条消息,随后watch该Actor的终止便可知晓Task完成情况。
使用ActorSystem.terminate()
或val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)
可以启动协调关机过程,且多次调用也只会执行一次。
ActorSystem会在最后一个Phase里的Task全部执行完毕后关闭,但JVM不一定会停止,除非所有守护进程均已停止运行。通过配置akka.coordinated-shutdown.exit-jvm = on
,可以强制一并关闭JVM。
在集群条件下,当节点正在从集群中离开或退出时,将会自动触发协调关机。而且系统会自动添加Cluster Singleton和Cluster Sharding等正常退出群集的任务。
默认情况下,当通过杀死SIGTERM信号(Ctrl-C对SIGINT不起作用)终止JVM进程时,CoordinatedShutdown也将运行,该默认行为可以通过配置akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off
禁用之。
可以使用CoordinatedShutdown(system).addJvmShutdownHook { ... }
添加JVM Hook任务,以保证其在Akka关机前得以执行。
在测试时,如果不希望启用协调关机,可以采用以下配置禁用之:
# Don't terminate ActorSystem via CoordinatedShutdown in tests
akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.cluster.run-coordinated-shutdown-when-down = off
MessageDispatcher是Akka的心脏,是它驱动着整个ActorSystem的正常运转,并且为所有的Actor提供了执行上下文ExecutionContext,方便在其中执行代码、进行Future回调等等。
默认Dispatcher
每个ActorSystem都有一个默认的Dispatcher,可以在akka.actor.default-dispatcher
配置中细调,其默认的执行器Executor类型为 “fork-join-executor”,这在绝大多数情况下都能提供优越的性能,也可以在akka.actor.default-dispatcher.executor
一节中进行设置。
内部专用Dispatcher
为保护Akka各模块内部维护的Actor,有一个独立的内部专用Dispatcher。它可以在akka.actor.internal-dispatcher
配置中细调,也可以设置akka.actor.internal-dispatcher为其他Dispatcher名字(别名)来替换之。
查找指定的Dispatcher
Dispatcher均实现了ExecutionContext接口,所以象这样val executionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))
就可加载不同的Dispatcher。
选择指定的Dispatcher
// 为新的Actor使用默认Dispatcher
context.spawn(yourBehavior, "DefaultDispatcher")
context.spawn(yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.default())
// 为不支持Future的阻塞调用(比如访问一些老式的数据库),使用blocking Dispatcher
context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking())
// 使用和父Actor一样的Dispatcher
context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent())
// 从配置加载指定的Dispatcher
context.spawn(yourBehavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"))
your-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
对比
Dispatcher
PinnedDispatcher
线程池
事件驱动,一组Actor共用一个线程池。
每个Actor都拥有专属的一个线程池,池中只有一个线程。
可否被共享
没有限制
不可共享
邮箱
每个Actor拥有一个
每个Actor拥有一个
适用场景
是Akka默认的Dispatcher, 支持隔板
支持隔板
驱动
由java.util.concurrent.ExecutorService
驱动。使用fork-join-executor、thread-pool-executor或基于akka.dispatcher.ExecutorServiceConfigurator实现的完全限定类名,可指定其使用的executor。
由任意的akka.dispatch.ThreadPoolExecutorConfigurator
驱动,默认执行器为thread-pool-executor
。
一个Fork-Join执行器示例:
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
讲解阻塞危害的参考视频:Managing Blocking in Akka video,及其示例代码:https://github.com/raboof/akka-blocking-dispatcher
在使用默认Dispatcher的情况下,多个Actor共用一个线程池,所以当其中一些Actor因被阻塞而占用线程后,有可能导致可用线程耗尽,而使其他同组的Actor陷入线程饥饿状态。
监测工具推荐:YourKit,VisualVM,Java Mission Control,Lightbend出品的Thread Starvation Detector等等。
示例使用了两个Actor作对比,在(1 to 100)的循环里,新建的一个Actor在消息处理函数中sleep 5秒,导致同时新建的另一个Actor无法获得线程处理消息而卡住。
针对上述情况,首先可能想到的象下面这样,用Future来封装这样的长时调用,但这样的想法实际上过于简单。因为仍旧使用了由全体Actor共用的ExecutionContext作为Future的执行上下文,所以随着应用程序的负载不断增加,内存和线程都会飞快地被耗光。
object BlockingFutureActor {
def apply(): Behavior[Int] =
Behaviors.setup { context =>
implicit val executionContext: ExecutionContext = context.executionContext
Behaviors.receiveMessage { i =>
triggerFutureBlockingOperation(i)
Behaviors.same
}
}
def triggerFutureBlockingOperation(i: Int)(implicit ec: ExecutionContext): Future[Unit] = {
println(s"Calling blocking Future: $i")
Future {
Thread.sleep(5000) //block for 5 seconds
println(s"Blocking future finished $i")
}
}
}
正确的解决方案,是为所有的阻塞调用提供一个独立的Dispatcher,这种技巧被称作“隔板 bulk-heading”或者“隔离阻塞 isolating blocking”。
在application.conf里对Dispatcher进行如下配置,其中thread-pool-executor.fixed-pool-size
的数值可根据实际负载情况进行微调:
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
随后,使用该配置替换掉前述代码第4行加载的默认Dispatcher
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-blocking-dispatcher"))
以上便是处理响应性应用程序中阻塞问题的推荐方法。对有关Akka HTTP中阻塞调用的类似讨论,请参阅 Handling blocking operations in Akka HTTP。
其他一些建议:
固定的线程池大小
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
根据CPU核心数设置线程池大小
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
PinnedDispatcher
my-pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
由于Actor每次获得的不一定都是同一个线程,所以当确有必要时,可以设置thread-pool-executor.allow-core-timeout=off
,以确保始终使用同一线程。
设置线程关闭超时
无论是fork-join-executor还是thread-pool-executor,线程都将在无人使用时被关闭。如果想设置一个稍长点的时间,可进行如下调整。特别是当该Executor只是作为执行上下文使用(比如只进行Future调用),而没有关联Actor时更应如此,否则默认的1秒将会导致整个线程池过度频繁地被关闭。
my-dispatcher-with-timeouts {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
# Keep alive time for threads
keep-alive-time = 60s
# Allow core threads to time out
allow-core-timeout = off
}
# How long time the dispatcher will wait for new actors until it shuts down
shutdown-timeout = 60s
}
邮箱是Actor接收待处理消息的队列,默认是没有容量上限的。但当Actor的处理消息的速度低于消息送达的速度时,就有必要设置邮箱的容量上限了,这样当有更多消息到达时,将被转投至系统的DeadLetter。
如果没有特别指定,将使用默认的邮箱SingleConsumerOnlyUnboundedMailbox
。否则在context.spawn时指定,且配置可从配置文件中动态加载。
context.spawn(childBehavior, "bounded-mailbox-child", MailboxSelector.bounded(100))
val props = MailboxSelector.fromConfig("my-app.my-special-mailbox")
context.spawn(childBehavior, "from-config-mailbox-child", props)
my-app {
my-special-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
}
非阻塞类型的邮箱
邮箱
内部实现
有否上限
配置名称
SingleConsumerOnlyUnboundedMailbox(默认)
一个多生产者-单消费者队列,不能与BalancingDispatcher搭配
否
akka.dispatch.SingleConsumerOnlyUnboundedMailbox
UnboundedMailbox
一个java.util.concurrent.ConcurrentLinkedQueue
否
unbounded 或 akka.dispatch.UnboundedMailbox
NonBlockingBoundedMailbox
一个高效的多生产者-单消费者队列
是
akka.dispatch.NonBlockingBoundedMailbox
UnboundedControlAwareMailbox
akka.dispatch.ControlMessage派生的控制消息将被优先投递
两个java.util.concurrent.ConcurrentLinkedQueue
否
akka.dispatch.UnboundedControlAwareMailbox
UnboundedPriorityMailbox
不保证同优先级消息的投递顺序
一个java.util.concurrent.PriorityBlockingQueue
否
akka.dispatch.UnboundedPriorityMailbox
UnboundedStablePriorityMailbox
严格按FIFO顺序投递同优先级消息
一个使用akka.util.PriorityQueueStabilizer包装的java.util.concurrent.PriorityBlockingQueue
否
akka.dispatch.UnboundedStablePriorityMailbox
阻塞类型的邮箱:若mailbox-push-timeout-time设置为非零时将阻塞,否则不阻塞
邮箱
内部实现
有否上限
配置名称
BoundedMailbox
一个java.util.concurrent.LinkedBlockingQueue
是
bounded 或 akka.dispatch.BoundedMailbox
BoundedPriorityMailbox
不保证同优先级消息的投递顺序
一个使用akka.util.BoundedBlockingQueue包装的java.util.PriorityQueue
是
akka.dispatch.BoundedPriorityMailbox
BoundedStablePriorityMailbox
严格按FIFO顺序投递同优先级消息
一个使用akka.util.PriorityQueueStabilizer和akka.util.BoundedBlockingQueue包装的java.util.PriorityQueue
是
akka.dispatch.BoundedStablePriorityMailbox
BoundedControlAwareMailbox
akka.dispatch.ControlMessage派生的控制消息将被优先投递
两个java.util.concurrent.ConcurrentLinkedQueue,且当塞满时将阻塞
是
akka.dispatch.BoundedControlAwareMailbox
如果要自己实现邮箱,则需要从MailboxType派生。该类的构造函数有2个重要参数:一个是ActorSystem.Settings对象,一个是Config的节。后者需要在Dispatcher或者Mailbox的配置中,修改mailbox-type
为自定义MailboxType的完全限定名。
标记用trait的需求映射指的是什么?是必须的吗?
// Marker trait used for mailbox requirements mapping
trait MyUnboundedMessageQueueSemantics
object MyUnboundedMailbox {
// This is the MessageQueue implementation
class MyMessageQueue extends MessageQueue with MyUnboundedMessageQueueSemantics {
private final val queue = new ConcurrentLinkedQueue[Envelope]()
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
queue.offer(handle)
def dequeue(): Envelope = queue.poll()
def numberOfMessages: Int = queue.size
def hasMessages: Boolean = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
}
// This is the Mailbox implementation
class MyUnboundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = {
// put your initialization code here
this()
}
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new MyMessageQueue()
}
com.typesafe.akka:akka-actor-testkit-typed_2.13:2.6.5
org.scalatest:scalatest_2.13:3.1.1
测试可以是在真实的ActorSystem上进行的异步测试,也可以是在BehaviorTestKit工具提供的测试专用线程上进行的同步测试。
ScalaTest提供了ActorTestKit作为真实ActorSystem的替代品,通过混入BeforeAndAfterAll
,覆写其afterAll() = testKit.shutdownTestKit()
,可实现测试后关闭ActorSystem。
通过使用一个固定的testKit实例,可以直接spawn/stop某个Actor(可以是匿名的Actor),并以这种方式创建临时的Mock Actor,用以测试某个Actor的行为是否符合预期。
同时,ScalaTest提供TestProbe用于接受Actor的回复,并附上一组probe.expectXXX对Actor的活动进行断言。
当然,更简便的方式便是继承ScalaTestWithActorTestKit并混入AnyFeatureSpecLike之类的trait,从而将注意力完全集中在测试用例本身,而不用关心ActorSystem如何关闭之类的细枝末节。
ScalaTest的配置从application-test.conf中加载,否则将会自动加载Akka库自带的reference.conf配置,而不是应用程序自定义的application.conf。同时,ScalaTest支持用ConfigFactory.load()加载自定义配置文件,或用parseString()直接解决配置字符串,若再附以withFallback()将实现一次性完成配置及其后备的加载。
ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.log-config-on-start = on
""").withFallback(ConfigFactory.load())
为测试与时间线关系密切的Actor活动,ScalaTest提供了手动的定时器ManualTime,可以象下面这样测试指定时间点的活动:
class ManualTimerExampleSpec
extends ScalaTestWithActorTestKit(ManualTime.config)
with AnyWordSpecLike
with LogCapturing {
val manualTime: ManualTime = ManualTime()
"A timer" must {
"schedule non-repeated ticks" in {
case object Tick
case object Tock val probe = TestProbe[Tock.type]()
val behavior = Behaviors.withTimers[Tick.type] { timer =>
// 10ms后才会调度消息
timer.startSingleTimer(Tick, 10.millis)
Behaviors.receiveMessage { _ =>
probe.ref ! Tock
Behaviors.same
}
}
spawn(behavior)
// 在9ms时还没有任何消息
manualTime.expectNoMessageFor(9.millis, probe)
// 再经过2ms后,收到Tock消息
manualTime.timePasses(2.millis)
probe.expectMessage(Tock)
// 在10ms之后再没有消息传来
manualTime.expectNoMessageFor(10.seconds, probe)
}
}
}
为了验证Actor是否发出了某些日志事件,ScalaTest提供了LoggingTestKit。
LoggingTestKit
.error[IllegalArgumentException]
.withMessageRegex(".*was rejected.*expecting ascii input.*")
.withCustom { event =>
event.marker match {
case Some(m) => m.getName == "validation"
case None => false
}
}
.withOccurrences(2)
.expect {
ref ! Message("hellö")
ref ! Message("hejdå")
}
为了集中有序输出日志信息,ScalaTest提供了LogCapturing,把日志和控制台输出信息整理在一起,在测试失败的时候才一次性输出,方便分析错误原因。具体示例参见交互模式一章。
ScalaTest提供BehaviorTestKit用于Actor的同步测试。
val testKit = BehaviorTestKit(Hello())
// 创建子Actor
testKit.run(Hello.CreateChild("child"))
testKit.expectEffect(Spawned(childActor, "child"))
// 创建匿名的子Actor
testKit.run(Hello.CreateAnonymousChild)
testKit.expectEffect(SpawnedAnonymous(childActor))
// 用一个InBox模拟Mailbox,方便测试收到的消息
val inbox = TestInbox[String]()
testKit.run(Hello.SayHello(inbox.ref))
inbox.expectMessage("hello")
// 测试子Actor的InBox
testKit.run(Hello.SayHelloToChild("child"))
val childInbox = testKit.childInbox[String]("child")
childInbox.expectMessage("hello")
// 测试匿名子Actor的InBox
testKit.run(Hello.SayHelloToAnonymousChild)
val child = testKit.expectEffectType[SpawnedAnonymous[String]]
val childInbox = testKit.childInbox(child.ref)
childInbox.expectMessage("hello stranger")
在以下一些情况下,不推荐使用BehaviorTestKit(未来可能会逐步改善):
除了Spawned和SpawnedAnonymous,BehaviorTestKit还支持以下一些Effect:
BehaviorTestKit也支持日志验证
val testKit = BehaviorTestKit(Hello())
val inbox = TestInbox[String]("Inboxer")
testKit.run(Hello.LogAndSayHello(inbox.ref))
testKit.logEntries() shouldBe Seq(CapturedLogEvent(Level.INFO, "Saying hello to Inboxer"))
现阶段的Akka Typed的内部,实质还是由传统Akka实现的,但未来将会有所改变。目前两类Akka有以下一些共存的方式:
在导入命名空间时使用别名,以示区别:
import akka.{ actor => classic }
️ 在监管策略方面,由于Classic默认为重启,而Typed为停止,所以Akka根据Child来决定实际策略。即如果被创建的Child是Classic,则默认采取重启策略,否则采取停止策略。
// 导入Typed的Adapter几乎必不可少
import akka.actor.typed.scaladsl.adapter._
val system = akka.actor.ActorSystem("ClassicToTypedSystem")
val typedSystem: ActorSystem[Nothing] = system.toTyped
val classicActor = system.actorOf(Classic.props())
class Classic extends classic.Actor with ActorLogging {
// context.spawn is an implicit extension method
val second: ActorRef[Typed.Command] = context.spawn(Typed(), "second")
// context.watch is an implicit extension method
context.watch(second)
// self can be used as the `replyTo` parameter here because
// there is an implicit conversion from akka.actor.ActorRef to
// akka.actor.typed.ActorRef
// An equal alternative would be `self.toTyped`
second ! Typed.Ping(self)
override def receive = {
case Typed.Pong =>
log.info(s"$self got Pong from ${sender()}")
// context.stop is an implicit extension method
context.stop(second)
case classic.Terminated(ref) =>
log.info(s"$self observed termination of $ref")
context.stop(self)
}
}
val system = classic.ActorSystem("TypedWatchingClassic")
val typed = system.spawn(Typed.behavior, "Typed")
object Typed {
final case class Ping(replyTo: akka.actor.typed.ActorRef[Pong.type])
sealed trait Command
case object Pong extends Command
val behavior: Behavior[Command] =
Behaviors.setup { context =>
// context.actorOf is an implicit extension method
val classic = context.actorOf(Classic.props(), "second")
// context.watch is an implicit extension method
context.watch(classic)
// illustrating how to pass sender, toClassic is an implicit extension method
classic.tell(Typed.Ping(context.self), context.self.toClassic)
Behaviors
.receivePartial[Command] {
case (context, Pong) =>
// it's not possible to get the sender, that must be sent in message
// context.stop is an implicit extension method
context.stop(classic)
Behaviors.same
}
.receiveSignal {
case (_, akka.actor.typed.Terminated(_)) =>
Behaviors.stopped
}
}
}
区别
函数式编程风格
面向对象风格
组成结构
Singleton Object
Companion Object + AbstractBehavior[Message]派生类
工厂apply()
在工厂方法里完成Behavior定义及其他所有工作
在Companion Object工厂方法里采取Behaviors.setup {context => new MyActor(context)}
这样的方式构造初始化的Behavior,然后把context和其他必要参数注入给类的构造函数,完成Behavior的链接
Actor扩展类
没有派生,所以只能用Behaviors.same
从AbstractBehavior[Message]派生实例,所以可以使用this等同于Behaviors.same
Behavior
在Singleton Object里给Behaviors.receive这样的工厂方法传入一个函数(闭包)进行定义
覆写派生类的onMessage函数
Context
Context与Message一起传入给receive
依赖Behaviors.setup等工厂方法传递给派生类,因此每实例对应一个context
状态
给工厂方法传入参数(通常会把包括context在内的所有参数封装成一个类似DTO的Class以适当解耦),返回带新状态的Behavior
在AbstractBehavior实例对象的内部维护所有的可变状态
推荐理由
熟悉FP的方式,毕竟Akka并没有引入范畴论等高深理论
习惯使用不变量保存状态,并使之可以传递给下一个Behavior
能实现Behavior与状态无关
用FP的风格,采取一个Behavior对应一种状态的方式实现有限状态机,相比OO风格更自然
能降低在诸如Future等线程之间共享状态的风险
熟悉OO编码的风格,喜欢方法method甚过函数function
习惯使用变量保存状态
更容易以OO风格升级已有的传统Akka代码
使用可变量的性能,相较使用不可变量更好
推荐做法:
Classic
Typed
akka-actor
akka-actor-typed
akka-cluster
akka-cluster-typed
akka-cluster-sharding
akka-cluster-sharding-typed
akka-cluster-tools
akka-cluster-typed
akka-distributed-data
akka-cluster-typed
akka-persistence
akka-persistence-typed
akka-stream
akka-stream-typed
akka-testkit
akka-actor-testkit-typed
Classic
Typed for Scala
akka.actor
akka.actor.typed.scaladsl
akka.cluster
akka.cluster.typed
akka.cluster.sharding
akka.cluster.sharding.typed.scaladsl
akka.persistence
akka.persistence.typed.scaladsl
com.typesafe.akka:akka-cluster-typed_2.13:2.6.5
import akka.actor.typed.delivery._
️ 此模块目前仍不成熟,不建议在生产环境使用。
确保消息至少投递一次或恰好投递一次,是此模块的核心任务,但Akka框架没法自主实现,因为确认收到消息并且处理之,是属于业务逻辑的职责,所以必须在应用程序的配合下才能完全实现。而且,将消息妥投到目标邮箱还只是其中一个步骤(不丢失消息),确保目标Actor在消息到达前尚未崩溃(消息能被处理)也是其中重要的一环。
一个完整的消息妥投方案,包括发送消息、检测丢包、重发消息、防止过载、幂等处理等细节,这些工作绝大部分要由消费消息的一方来承担。比如消息重发,就要由消费者发现有丢包,然后向生产者提出,限流等其他一些工作亦是如此。Akka提供了以下三种模式(留意关于消息重发的细节):
点对点模式适用于2个单一Actor之间的消息妥投。
Worker Pulling,是若干个Worker根据自己的消费进度,主动从一个WorkManager处拉取任务的模式。
有新Worker加入时
com.typesafe.akka:akka-cluster-sharding-typed_2.13:2.6.5
Sharding,是在集群进行了分片后的消息妥投模式,将由Producer与Consumer两端的ShardingController负责总协调,由ShardingController各自的小弟Controller负责点个端点的通信。
发送消息到另一个Entity
从另一个节点上的Producer发送消息(图中WorkPullingProducerController有误,应为ShardingProducerController)
com.typesafe.akka:akka-persistence-typed_2.13:2.6.5
需要Producer支持消息重发,就意味着Producer得把发出去的消息保存一段时间,直到确信该消息已被处理后才删除之,所以能暂存消息的即为耐用的Producer。Akka为此提供了一个DurableProducerQueue的具体实现EventSourcedProducerQueue。其中,每个Producer必须对应一个唯一的PersistenceId。
import akka.persistence.typed.delivery.EventSourcedProducerQueue
import akka.persistence.typed.PersistenceId
val durableQueue =
EventSourcedProducerQueue[ImageConverter.ConversionJob](PersistenceId.ofUniqueId("ImageWorkManager"))
val durableProducerController = context.spawn(
WorkPullingProducerController(
producerId = "workManager",
workerServiceKey = ImageConverter.serviceKey,
durableQueueBehavior = Some(durableQueue)),
"producerController")
除了tell模式,Producer还可以改用ask模式发出消息,此时用askNext代替requestNext,回复将被包装在MessageWithConfirmation里。
context.ask[MessageWithConfirmation[ImageConverter.ConversionJob], Done](
next.askNextTo,
askReplyTo => MessageWithConfirmation(ImageConverter.ConversionJob(resultId, from, to, image), askReplyTo)) {
case Success(done) => AskReply(resultId, originalReplyTo, timeout = false)
case Failure(_) => AskReply(resultId, originalReplyTo, timeout = true)
}
对同处一个JVM上的不同Actor,消息将直接发送给对方,而对于跨JVM的消息,则需要序列化成一串二进制字节后传出,再反序列化恢复成消息对象后接收。Akka推荐使用Jackson和Google Protocol Buffers,且使用后者用于其内部消息的序列化,但也允许使用自定义的序列化器。
序列化的相关配置都保存在akka.actor.serializers
一节,其中指向各种akka.serialization.Serializer
的实现,并使用serialization-bindings
为特定对象实例时绑定序列化器。由于对象可能同时继承了某个trait或者class,所以在判断应使用哪一个序列化器时,通常是找其最特化的那一个。若二者之间没有继承关系,则会触发警告。
akka {
actor {
serializers {
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
myown = "docs.serialization.MyOwnSerializer"
}
serialization-bindings {
"docs.serialization.JsonSerializable" = jackson-json
"docs.serialization.CborSerializable" = jackson-cbor
"com.google.protobuf.Message" = proto
"docs.serialization.MyOwnSerializable" = myown
}
}
}
️ 如果待序列化的消息包含在Scala对象中,则为了引用这些消息,需要使用标准Java类名称。对于包含在名为Wrapper对象中名为Message的消息,正确的引用是Wrapper $ Message
,而不是Wrapper.Message
。
完整的序列化信息包括三个部分:二进制字节串形式的有效载荷payload,序列化器的SerializerId及其适用类的清单manifest,所以它是自描述的,得以跨JVM使用。
而在启动ActorSystem时,序列化器由SerializationExtension负责初始化,因此序列化器本身不能从其构造函数访问SerializationExtension,而只能在完成初始化之后迟一点才能访问它。
import akka.actor._
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.Cluster
import akka.serialization._
val system = ActorSystem("example")
// Get the Serialization Extension
val serialization = SerializationExtension(system)
// Have something to serialize
val original = "woohoo"
// Turn it into bytes, and retrieve the serializerId and manifest, which are needed for deserialization
val bytes = serialization.serialize(original).get
val serializerId = serialization.findSerializerFor(original).identifier
val manifest = Serializers.manifestFor(serialization.findSerializerFor(original), original)
// Turn it back into an object
val back = serialization.deserialize(bytes, serializerId, manifest).get
所有的序列化器均派生自akka.serialization.Serializer。
class MyOwnSerializer extends Serializer {
// If you need logging here, introduce a constructor that takes an ExtendedActorSystem.
// class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer
// Get a logger using:
// private val logger = Logging(actorSystem, this)
// This is whether "fromBinary" requires a "clazz" or not
def includeManifest: Boolean = true
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 40 is reserved by Akka itself
def identifier = 1234567
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the code that serializes the object here
//#...
Array[Byte]()
//#...
}
// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
// Put your code that deserializes here
//#...
null
//#...
}
}
SerializerId必须是全局唯一的,该Id可以编码指定,也可以在配置中指定:
akka {
actor {
serialization-identifiers {
"docs.serialization.MyOwnSerializer" = 1234567
}
}
}
默认情况下,序列化器使用Class指定其适用目标,但也可以使用字符串名称指定,具体参见fromBinary的第2个参数:
class MyOwnSerializer2 extends SerializerWithStringManifest {
val CustomerManifest = "customer"
val UserManifest = "user"
val UTF_8 = StandardCharsets.UTF_8.name()
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 40 is reserved by Akka itself
def identifier = 1234567
// The manifest (type hint) that will be provided in the fromBinary method
// Use `""` if manifest is not needed.
def manifest(obj: AnyRef): String =
obj match {
case _: Customer => CustomerManifest
case _: User => UserManifest
}
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the real code that serializes the object here
obj match {
case Customer(name) => name.getBytes(UTF_8)
case User(name) => name.getBytes(UTF_8)
}
}
// "fromBinary" deserializes the given array,
// using the type hint
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
// Put the real code that deserializes here
manifest match {
case CustomerManifest =>
Customer(new String(bytes, UTF_8))
case UserManifest =>
User(new String(bytes, UTF_8))
}
}
}
ActorRef均可以使用Jackson进行序列化,但也可以自定义实现。
其中,要以字符串形式表示ActorRef,应借助ActorRefResolver实现。它主要有2个方法,分别对应序列化和反序列化:
def toSerializationFormat[T](ref: ActorRef[T]): String
def resolveActorRef[T](serializedActorRef: String): ActorRef[T]
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
private val actorRefResolver = ActorRefResolver(system.toTyped)
private val PingManifest = "a"
private val PongManifest = "b"
override def identifier = 41
override def manifest(msg: AnyRef) = msg match {
case : PingService.Ping => PingManifest
case PingService.Pong => PongManifest
case =>
throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
}
override def toBinary(msg: AnyRef) = msg match {
case PingService.Ping(who) =>
actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
case PingService.Pong =>
Array.emptyByteArray
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${msg.getClass} in [${getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
manifest match {
case PingManifest =>
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = actorRefResolver.resolveActorRefPingService.Pong.type
PingService.Ping(ref)
case PongManifest =>
PingService.Pong
case _ =>
throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
}
}
}
一个消息被反序列为消息对象,其决定因素只有3个:payload、serializerId和manifest。Akka根据Id选择Serializer,然后Serializer根据manifest匹配fromBinary,最后fromBinary使用payload解析出消息对象。在这个过程中,起关键作用的manifest并不等价于Serializer绑定的消息类型,所以一个Serializer可以应用于多个消息类型,这就给换用新的序列化器提供了机会。主要步骤包括两步:
为了在本地测试时确认消息被正常地序列化与反序列化,可以采取如下配置启用本地消息的序列化。如果要将某个消息排除出此列,则需要继承trait akka.actor.NoSerializationVerificationNeeded
,或者在配置akka.actor.no-serialization-verification-needed-class-prefix
指定类名的前缀。
akka {
actor {
# 启用本地消息序列化
serialize-messages = on
# 启用Prop序列化
serialize-creators = on
}
}
com.typesafe.akka:akka-serialization-jackson_2.12:2.6.6
Jackson支持文本形式的JSON(jackson-json)和二进制形式的CBOR字节串(jackson-cbor)。
在使用Jackson进行序列化前,需要在Akka配置里加入序列化器声明和绑定声明,此处用的JSON格式。
akka.actor {
serialization-bindings {
"com.myservice.MySerializable" = jackson-json
}
}
而所有要用Jackson序列化的消息也得扩展其trait以作标识。
// 约定的名称是CborSerializable或者JsonSerializable,此处用MySerializable是为了演示
trait MySerializable
final case class Message(name: String, nr: Int) extends MySerializable
出于安全考虑,不能将Jackson序列化器应用到诸如java.lang.Object、java.io.Serializable、java.util.Comparable等开放类型。
多态类型是指可能有多种不同实现的类型,这就导致在反序列化时将面对多种可能的子类型。所以在使用Jackson序列化前,需要用JsonTypeInfo和JsonSubTypes进行注解说明。
@JsonTypeInfo用来开启多态类型处理,它有以下几个属性:
use:定义使用哪一种类型识别码,其可选值包括:
include(可选):指定识别码是如何被包含进去的,其可选值包括:
property(可选):制定识别码的属性名称。此属性只有当use为JsonTypeInfo.Id.CLASS(若不指定property则默认为@class)、JsonTypeInfo.Id.MINIMAL_CLASS(若不指定property则默认为@c)、JsonTypeInfo.Id.NAME(若不指定property默认为@type),include为JsonTypeInfo.As.PROPERTY、JsonTypeInfo.As.EXISTING_PROPERTY、JsonTypeInfo.As.EXTERNAL_PROPERTY时才有效。
defaultImpl(可选):如果类型识别码不存在或者无效,可以使用该属性来制定反序列化时使用的默认类型。
visible(可选):是否可见。该属性定义了类型标识符的值是否会通过JSON流成为反序列化器的一部分,默认为false,即jackson会从JSON内容中处理和删除类型标识符,再传递给JsonDeserializer。
@JsonSubTypes用来列出给定类的子类,只有当子类类型无法被检测到时才会使用它,一般是配合@JsonTypeInfo在基类上使用。它的的值是一个@JsonSubTypes.Type[]数组,里面枚举了多态类型(value对应子类)和类型的标识符值(name对应@JsonTypeInfo中的property标识名称的值。此为可选值,若未指定则需由@JsonTypeName在子类上指定)。
@JsonTypeName作用于子类,用来为多态子类指定类型标识符的值。
️ 切记不能使用@JsonTypeInfo(use = Id.CLASS)
或ObjectMapper.enableDefaultTyping
,这会给多态类型带来安全隐患。
final case class Zoo(primaryAttraction: Animal) extends MySerializable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
Array(
new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
sealed trait Animal
final case class Lion(name: String) extends Animal
final case class Elephant(name: String, age: Int) extends Animal
由于上述注解只能用于class,所以case class可以直接使用,但case object就需要采取变通的方法,通过在case object继承的trait上使用注解@JsonSerialize和@JsonDeserialize,再使用StdSerializer和StdDeserializer实现序列化操作即可。
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.fasterxml.jackson.databind.ser.std.StdSerializer
@JsonSerialize(using = classOf[DirectionJsonSerializer])
@JsonDeserialize(using = classOf[DirectionJsonDeserializer])
sealed trait Direction
object Direction {
case object North extends Direction
case object East extends Direction
case object South extends Direction
case object West extends Direction
}
class DirectionJsonSerializer extends StdSerializer[Direction](classOf[Direction]) {
import Direction._
override def serialize(value: Direction, gen: JsonGenerator, provider: SerializerProvider): Unit = {
val strValue = value match {
case North => "N"
case East => "E"
case South => "S"
case West => "W"
}
gen.writeString(strValue)
}
}
class DirectionJsonDeserializer extends StdDeserializer[Direction](classOf[Direction]) {
import Direction._
override def deserialize(p: JsonParser, ctxt: DeserializationContext): Direction = {
p.getText match {
case "N" => North
case "E" => East
case "S" => South
case "W" => West
}
}
}
final case class Compass(currentDirection: Direction) extends MySerializable
Jackson默认会将Scala的枚举类型中的Value序列化为一个JsonObject,该JsonObject包含一个“value”字段和一个“type”字段(其值是枚举的完全限定类名FQCN)。为此,Jackson为每个字段提供了一个注解JsonScalaEnumeration,用于设定字段的类型,它将会把枚举值序列化为JsonString。
trait TestMessage
object Planet extends Enumeration {
type Planet = Value
val Mercury, Venus, Earth, Mars, Krypton = Value
}
// Uses default Jackson serialization format for Scala Enumerations
final case class Alien(name: String, planet: Planet.Planet) extends TestMessage
// Serializes planet values as a JsonString
class PlanetType extends TypeReference[Planet.type] {}
// Specifies the type of planet with @JsonScalaEnumeration
final case class Superhero(name: String, @JsonScalaEnumeration(classOf[PlanetType]) planet: Planet.Planet) extends TestMessage
参见Event Sourced一节中的Schema Evolution。
Jackson会自动忽略class中不存在的属性,所以不需要做额外工作。
如果新增的字段是可选字段,那么该字段默认值是Option.None,不需要做额外工作。如果是必备字段,那么需要继承JacksonMigration并设定其默认值。示例如下:
// Old Event
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable
// New Event: optional property discount and field note added.
// 为什么要区分property与field?
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: String)
extends MySerializable {
// alternative constructor because `note` should have default value "" when not defined in json
@JsonCreator
def this(shoppingCartId: String, productId: String, quantity: Int, discount: Option[Double], note: Option[String]) =
this(shoppingCartId, productId, quantity, discount, note.getOrElse(""))
}
// New Event: mandatory field discount added.
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int, discount: Double) extends MySerializable
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.DoubleNode
import com.fasterxml.jackson.databind.node.ObjectNode
import akka.serialization.jackson.JacksonMigration
class ItemAddedMigration extends JacksonMigration {
// 注明这是第几个版本,之后还可以有更新的版本
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val root = json.asInstanceOf[ObjectNode]
if (fromVersion <= 1) {
root.set("discount", DoubleNode.valueOf(0.0))
}
root
}
}
ItemAddedMigration与ItemAdded的联系,需要在配置里设定,下同:
akka.serialization.jackson.migrations {
"com.myservice.event.ItemAdded" = "com.myservice.event.ItemAddedMigration"
}
// 将productId重命名为itemId
case class ItemAdded(shoppingCartId: String, itemId: String, quantity: Int) extends MySerializable
import akka.serialization.jackson.JacksonMigration
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
class ItemAddedMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val root = json.asInstanceOf[ObjectNode]
if (fromVersion <= 1) {
root.set("itemId", root.get("productId"))
root.remove("productId")
}
root
}
}
// Old class
case class Customer(name: String, street: String, city: String, zipCode: String, country: String) extends MySerializable
// New class
case class Customer(name: String, shippingAddress: Address, billingAddress: Option[Address]) extends MySerializable
//Address class
case class Address(street: String, city: String, zipCode: String, country: String) extends MySerializable
import akka.serialization.jackson.JacksonMigration
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
class CustomerMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
val root = json.asInstanceOf[ObjectNode]
if (fromVersion <= 1) {
val shippingAddress = root.`with`("shippingAddress")
shippingAddress.set("street", root.get("street"))
shippingAddress.set("city", root.get("city"))
shippingAddress.set("zipCode", root.get("zipCode"))
shippingAddress.set("country", root.get("country"))
root.remove("street")
root.remove("city")
root.remove("zipCode")
root.remove("country")
}
root
}
}
// Old class
case class OrderAdded(shoppingCartId: String) extends MySerializable
// New class
case class OrderPlaced(shoppingCartId: String) extends MySerializable
class OrderPlacedMigration extends JacksonMigration {
override def currentVersion: Int = 2
override def transformClassName(fromVersion: Int, className: String): String = classOf[OrderPlaced].getName
override def transform(fromVersion: Int, json: JsonNode): JsonNode = json
}
当某个类不再需要序列化,而只需要反序列化时,应将其加入序列化的白名单,名单是一组类名或其前缀:
akka.serialization.jackson.whitelist-class-prefix =
["com.myservice.event.OrderAdded", "com.myservice.command"]
Akka默认启用了以下Jackson模块:
akka.serialization.jackson {
# The Jackson JSON serializer will register these modules.
jackson-modules += "akka.serialization.jackson.AkkaJacksonModule"
# AkkaTypedJacksonModule optionally included if akka-actor-typed is in classpath
jackson-modules += "akka.serialization.jackson.AkkaTypedJacksonModule"
// FIXME how does that optional loading work??
# AkkaStreamsModule optionally included if akka-streams is in classpath
jackson-modules += "akka.serialization.jackson.AkkaStreamJacksonModule"
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
jackson-modules += "com.fasterxml.jackson.module.scala.DefaultScalaModule"
}
默认的JSON压缩策略如下:
# Compression settings for the jackson-json binding
akka.serialization.jackson.jackson-json.compression {
# Compression algorithm.
# - off : no compression (it will decompress payloads even it's off)
# - gzip : using common java gzip (it's slower than lz4 generally)
# - lz4 : using lz4-java
algorithm = gzip
# If compression is enabled with the `algorithm` setting the payload is compressed
# when it's larger than this value.
compress-larger-than = 32 KiB
}
# 共有配置
akka.serialization.jackson.jackson-json {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = off
}
}
akka.serialization.jackson.jackson-cbor {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
}
}
akka.actor {
serializers {
jackson-json-message = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
}
serialization-identifiers {
jackson-json-message = 9001
jackson-json-event = 9002
}
serialization-bindings {
"com.myservice.MyMessage" = jackson-json-message
"com.myservice.MyEvent" = jackson-json-event
}
}
# 为每个绑定关系单独配置
akka.serialization.jackson {
jackson-json-message {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
}
}
jackson-json-event {
serialization-features {
WRITE_DATES_AS_TIMESTAMPS = off
}
}
}
默认情况下,Jackson使用manifest里的完全限定类名进行序列化,但这比较耗费磁盘空间和IO资源,为此可以用type-in-manifest关闭之,使类名不再出现在manifest里,然后再使用deserialization-type指定即可,否则Jackson会在绑定关系里去查找匹配的类型。
Akka Remoting已经实现了manifest的压缩,所以这部分内容对它没有什么实际效果。
akka.actor {
serializers {
jackson-json-event = "akka.serialization.jackson.JacksonJsonSerializer"
}
serialization-identifiers {
jackson-json-event = 9001
}
serialization-bindings {
"com.myservice.MyEvent" = jackson-json-event
}
}
# 由于manifest无关的序列化通常只适用于一个类型,所以通常采取每绑定关系单独配置的方式
akka.serialization.jackson {
jackson-json-event {
type-in-manifest = off
# Since there is exactly one serialization binding declared for this
# serializer above, this is optional, but if there were none or many,
# this would be mandatory.
deserialization-type = "com.myservice.MyEvent"
}
}
WRITE_DATES_AS_TIMESTAMPS
和WRITE_DURATIONS_AS_TIMESTAMPS
默认情况下是被禁用的,这意味着日期与时间字段将按ISO-8601(rfc3339)标准的yyyy-MM-dd'T'HH:mm:ss.SSSZZ
格式,而不是数字数组进行序列化。虽然这样的互操作性更好,但速度较慢。所以如果不需要ISO格式即可与外部系统进行互操作,那么可以作如下配置,以拥有更佳的性能(反序列化不受此设置影响)。
akka.serialization.jackson.serialization-features {
WRITE_DATES_AS_TIMESTAMPS = on
WRITE_DURATIONS_AS_TIMESTAMPS = on
}
akka.serialization.jackson {
# Configuration of the ObjectMapper serialization features.
# See com.fasterxml.jackson.databind.SerializationFeature
# Enum values corresponding to the SerializationFeature and their boolean value.
serialization-features {
# Date/time in ISO-8601 (rfc3339) yyyy-MM-dd'T'HH:mm:ss.SSSZ format
# as defined by com.fasterxml.jackson.databind.util.StdDateFormat
# For interoperability it's better to use the ISO format, i.e. WRITE_DATES_AS_TIMESTAMPS=off,
# but WRITE_DATES_AS_TIMESTAMPS=on has better performance.
WRITE_DATES_AS_TIMESTAMPS = off
WRITE_DURATIONS_AS_TIMESTAMPS = off
}
# Configuration of the ObjectMapper deserialization features.
# See com.fasterxml.jackson.databind.DeserializationFeature
# Enum values corresponding to the DeserializationFeature and their boolean value.
deserialization-features {
FAIL_ON_UNKNOWN_PROPERTIES = off
}
# Configuration of the ObjectMapper mapper features.
# See com.fasterxml.jackson.databind.MapperFeature
# Enum values corresponding to the MapperFeature and their
# boolean values, for example:
#
# mapper-features {
# SORT_PROPERTIES_ALPHABETICALLY = on
# }
mapper-features {}
# Configuration of the ObjectMapper JsonParser features.
# See com.fasterxml.jackson.core.JsonParser.Feature
# Enum values corresponding to the JsonParser.Feature and their
# boolean value, for example:
#
# json-parser-features {
# ALLOW_SINGLE_QUOTES = on
# }
json-parser-features {}
# Configuration of the ObjectMapper JsonParser features.
# See com.fasterxml.jackson.core.JsonGenerator.Feature
# Enum values corresponding to the JsonGenerator.Feature and
# their boolean value, for example:
#
# json-generator-features {
# WRITE_NUMBERS_AS_STRINGS = on
# }
json-generator-features {}
# Configuration of the JsonFactory StreamReadFeature.
# See com.fasterxml.jackson.core.StreamReadFeature
# Enum values corresponding to the StreamReadFeatures and
# their boolean value, for example:
#
# stream-read-features {
# STRICT_DUPLICATE_DETECTION = on
# }
stream-read-features {}
# Configuration of the JsonFactory StreamWriteFeature.
# See com.fasterxml.jackson.core.StreamWriteFeature
# Enum values corresponding to the StreamWriteFeatures and
# their boolean value, for example:
#
# stream-write-features {
# WRITE_BIGDECIMAL_AS_PLAIN = on
# }
stream-write-features {}
# Configuration of the JsonFactory JsonReadFeature.
# See com.fasterxml.jackson.core.json.JsonReadFeature
# Enum values corresponding to the JsonReadFeatures and
# their boolean value, for example:
#
# json-read-features {
# ALLOW_SINGLE_QUOTES = on
# }
json-read-features {}
# Configuration of the JsonFactory JsonWriteFeature.
# See com.fasterxml.jackson.core.json.JsonWriteFeature
# Enum values corresponding to the JsonWriteFeatures and
# their boolean value, for example:
#
# json-write-features {
# WRITE_NUMBERS_AS_STRINGS = on
# }
json-write-features {}
# Additional classes that are allowed even if they are not defined in `serialization-bindings`.
# This is useful when a class is not used for serialization any more and therefore removed
# from `serialization-bindings`, but should still be possible to deserialize.
whitelist-class-prefix = []
# settings for compression of the payload
compression {
# Compression algorithm.
# - off : no compression
# - gzip : using common java gzip
algorithm = off
# If compression is enabled with the `algorithm` setting the payload is compressed
# when it's larger than this value.
compress-larger-than = 0 KiB
}
# Whether the type should be written to the manifest.
# If this is off, then either deserialization-type must be defined, or there must be exactly
# one serialization binding declared for this serializer, and the type in that binding will be
# used as the deserialization type. This feature will only work if that type either is a
# concrete class, or if it is a supertype that uses Jackson polymorphism (ie, the
# @JsonTypeInfo annotation) to store type information in the JSON itself. The intention behind
# disabling this is to remove extraneous type information (ie, fully qualified class names) when
# serialized objects are persisted in Akka persistence or replicated using Akka distributed
# data. Note that Akka remoting already has manifest compression optimizations that address this,
# so for types that just get sent over remoting, this offers no optimization.
type-in-manifest = on
# The type to use for deserialization.
# This is only used if type-in-manifest is disabled. If set, this type will be used to
# deserialize all messages. This is useful if the binding configuration you want to use when
# disabling type in manifest cannot be expressed as a single type. Examples of when you might
# use this include when changing serializers, so you don't want this serializer used for
# serialization and you haven't declared any bindings for it, but you still want to be able to
# deserialize messages that were serialized with this serializer, as well as situations where
# you only want some sub types of a given Jackson polymorphic type to be serialized using this
# serializer.
deserialization-type = ""
# Specific settings for jackson-json binding can be defined in this section to
# override the settings in 'akka.serialization.jackson'
jackson-json {}
# Specific settings for jackson-cbor binding can be defined in this section to
# override the settings in 'akka.serialization.jackson'
jackson-cbor {}
# Issue #28918 for compatibility with data serialized with JacksonCborSerializer in
# Akka 2.6.4 or earlier, which was plain JSON format.
jackson-cbor-264 = ${akka.serialization.jackson.jackson-cbor}
}
com.typesafe.akka:akka-persistence-typed_2.13:2.6.5
Akka Persistence为带状态的Actor提供了持久化其状态以备崩溃后恢复的支持,其本质是持久化Actor相关的事件Event,从而在恢复时利用全部事件或阶段性快照重塑(Reconstruct/Replay/Rebuild)Actor。ES在现实生活中最典型的一个例子是会计使用的复式记账法。
参考书目
MSDN上的 CQRS Journey。
该书以一个用C#编写的Conference预约售票系统为例,由浅入深地展示了实现CQRS的各个环节需要关注的重点。书中的配图和讨论非常精彩,而其中提到的Process Manager也是当下实现Saga的流行方式之一。
Randy Shoup所著 Events as First-Class Citizens。
文中的Stitch Fix是一家智能零售商,它通过整合零售、技术、仓储、数据分析等资源,使用数据分析软件和机器学习来匹配顾客的服装定制需求,为其挑选符合其个人风格、尺寸和偏好的服饰和配饰,提供了良好的消费体验。
顾客按需订购服装或申请每月、每两个月或每季度交货。每个盒子有五件货物。如果顾客喜欢配送货物,可以选择以标签价购买,全部购买享受75%的折扣;如果不喜欢,则免费退货。如果顾客没有购买任何货物,则需支付20美元的设计费。Stitch Fix的平均商品单价约65美元,公司期望在每个盒子中,用户能够保存2件商品。造型师是兼职,薪水为每小时15美元。每小时,造型师会完成4个盒子,这样能产生较高的毛利率,以覆盖巨大的开销及库存成本。
️ 通用数据保护条例(General Data Protection Regulation,GDPR)要求,必须能根据用户的要求删除其个人信息。然而,在一个以Event Sourcing为基础的应用里,要彻底删除或修改带有个人信息的所有事件是非常困难的,所以改用“数据粉碎”的技术来实现。其原理是给每个人分配一个唯一的ID,然后以该ID作为密钥,对其相关的所有个人数据进行加密。当需要彻底删除该用户的信息时,直接删除该ID,即可保证其个人数据无法被解密,从而达到保护目的。Lightbend为Akka Persistence提供了相应的工具,以帮助构建具有GDPR功能的系统。
Akka Persistence提供了event sourced actor(又称为 persistent actor)作为实现。这类Actor在收到Command时会先进行检验Validate。如果Command各项条件通过了检验,则使之作用于当前实体,并产生相应的事件Event,待这些Event被持久化后,以更新实体的状态结束;否则,实体将直接拒绝Reject该Command。( 不该是先更新状态,然后才持久化事件吗?貌似先持久化再更新会更靠谱。)
而在重塑Actor时,所有的事件将被加载,并无需再校验地直接用于更新Actor的状态,直到恢复到最新状态。
一个典型的EventSourcedBehavior包括ID、初始State,CommandHandler与EventHandler四个组成部分,如果需要传入ActorContext,则在外层用Behaviors.setup传入即可:
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
object MyPersistentBehavior {
sealed trait Command
sealed trait Event
final case class State()
def apply(): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
// 1. 该Actor的唯一Id
persistenceId = PersistenceId.ofUniqueId("abc"),
// 2. 初始状态
emptyState = State(),
// 3. Command Handler
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
// 4. Event Handler
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
}
PersistenceId是Event Sourced Actor在其生命周期内唯一的身份标识(想想聚合Id)。因为Akka Cluster提供的EntityId可能为多个不同类型的Actor共享,所以一般配合EntityTypeKey一起组成唯一的PersistenceId。所以,PersistenceId.apply()用默认的分隔符|
将entityType.name与entityId两个字符串连接成所需的Id。当然,也可以使用PersistenceId.ofUniqueId生成自定义分隔符的Id。
即使在集群条件下,持同一PersistanceId的Actor在任何时候只能存在一个,否则就世界大乱了。当然,因为有Recovery,这个Actor可以被分片甚至迁移到任何一个片及其节点上。
摘选自 https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#persistence-example
sharding.init(Entity(typeKey = HelloWorld.TypeKey) { entityContext =>
HelloWorld(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
一个CommandHandler有2个参数:当前的State、收到的Command,然后返回Effect。Effect由其工厂创建,创建动作包括:
在返回Effect的同时,还可以在该Effect后接副作用SideEffect,比如Effect.persist(…).thenRun(…)。具体包括:
任何SideEffect都最多只能执行一次。如果持久化失败,或者Actor直接重启、停止后再启动,都不会执行任何副作用。所以通常是响应RecoveryCompleted信号,在其中去执行需要被确认的副作用,这种情况下,则可能会出现同一个副作用多次执行的情况。
副作用都是按注册的顺序同步执行,但也不能避免因为发送消息等而导致操作的并发执行。副作用也可能在事件被持久化之前就被执行,这样的话,即使持久化失败导致事件未被保存,副作用也生效了。
关于翻译:Akka用“日记”——Journal指代的Event Store,并与“日志”Log相区别。虽然我更喜欢用“事件簿”这样的称谓,但一来请教了师姐说“日记”更准确,二来电影《Joker》里做心理咨询的社工在问Frank时也用的Journal这个词,于是就此作罢。
一个EventHandler有2个参数:当前State,触发的Event,然后返回新的State。
CommandHandler触发并持久化事件,EventHandler处理事件并更新状态,所以Actor的状态实际是在EventHandler里才真正被改变的!
当事件Event被持久化后,EventHandler将使用它去修改作为参数传入的当前状态State,从而产生新的State。至于State的具体实现,可以是FP风格的不可变量,也可以是OO风格的可变量,但通常都会封装在诸如Class这样的一个容器里。
不同于Command Handler的是,Event Handler不会产生副作用,所以它将直接用于Actor的重塑Recovery操作上。如果需要在Recovery之后做点什么,那么恰当的楔入点包括:CommandHandler最后创建的Effect附加的thenRun(),或者是RecoveryCompleted事件的处理函数里。
因为不同的消息将触发Actor不同的行为,所以行为也是Actor状态的一部分。所以在Recovery时除了恢复数据,还要小心恢复其相应的行为。尽管行为是函数,而函数是一等公民,所以行为理应可以象数据一样保存,但困难的地方在于怎么保存编码,因此Akka Persistence不提供Behavior的持久化。
面对这个棘手的问题,最容易想到的办法是根据State定义不同的CommandHandler,并随State变化而切换,从而使Actor成为一台有限状态机。于是,由此得到的便是由State与Command两级匹配构成的逻辑,利用继承定义State的不同实现,然后先case State、再case Command,最后根据匹配结果将消息分发至相应的处理函数(处理函数亦相对独立,以凸显不同的逻辑分支)。而在代码实现的结构上,就是在一个CommandHandler里,定义若干个协助完成消息处理的private function。这些处理函数的参数由Handler在case分支里赋与,返回类型则统一为与CommandHandler相同的Effect[Event, State]。最后,只需要将这个CommandHandler连壳带肉交给EventSourcedBehavior工厂即可。
更规范的方式是把Handler定义在State里,具体参见后续的Handler设计指南。
Request-Response是最常见的通信模式之一。为了保证Persistent Actor一定会回复,EventSourcedBehavior推出了ReplyEffect,从而保证CommandHandler一定会发回Reply。它与Effect的唯一区别是必须用工厂Effect.reply
、Effect.noReply
、Effect.thenReply
或者Effect.thenNoReply
之一创建的结果作为返回值,而不再是Effect,否则编译器会提示类型不匹配的错误。
为此,在定义Command时必须包含一个replyTo属性,同时得用EventSourcedBehavior.withEnforcedReplies(id, state, cmdHandler, evtHandler)
来创建Behavior。
常见的序列化方案和工具也适用于Akka,推荐使用 Jackson
在序列化时,必须考虑不同版本事件之间的向下兼容性,参考纲要演进 Schema Evolution( 统一个中文名真难。architecture 架构,pattern 模式,structure 结构,style 风格/样式,template 模板,boilerplate 样板,schema 纲要)
相比Recovery,我更喜欢Replay或者Reconstruct,使用“重塑实体”和“事件重播”在语义上也更生动。
Akka Persistence在Actor启动或重启时,将自动地直接使用EventHandler进行Actor的重塑。要注意的是,不要在EventHandler中执行副作用,而应该在重塑完成后,在receiveSignal里响应RecoveryCompleted信号,在响应程序里执行副作用。在RecoveryCompleted信号里带有重塑后的当前状态。而即使对于一个新的、还没有任何已记录事件的Actor,在执行Recovery之后也会触发RecoveryCompleted信号。
由于在重塑完成前,所有新消息将会被Stash,所以为防止失去响应,Akka提供了最大并发的重塑数,可以按akka.persistence.max-concurrent-recoveries = 50
的方式进行配置。
在某些情况下,事件流可能会损坏,而此时多个写入者(即多个Persistent Actor实例)准备写入具有相同序列号的不同消息,则会引发不一致的冲突。为此,Akka Persistence提供了Replay Filter,通过消息序列号和写入者的UUID来检测并解决消息之间的冲突。具体配置需要写入配置文件中的如下区段(leveldb视具体插件而不同):
理解不能:为什么会有多个Actor实例要写入有相同序列号的消息?PersistenceId不该是唯一的吗?消息序列号是什么鬼?
手机扫一扫
移动阅读更方便
你可能感兴趣的文章