Scala基于Akka的Remote Actor实现的简单RPC
阅读原文时间:2021年04月20日阅读:1

spark 1.3中的通信是基于Akka实现的,Actor之间的交互都是通过消息,并且所有动作都是异步的。
本文基于spark 1.3通信核心原理实现一个简单的基于akka的rpc框架。
服务端:Server
//模式匹配 消息类型
case class AkkaMessage(message: Any)
case class Response(response: Any)
class Server extends Actor {
override def receive: Receive = {
case msg:AkkaMessage=>{
println(“服务端收到消息:”+msg.message)
sender ! Response(“response_” + msg.message)
}
case _ => println(“服务端不支持的消息类型 .. “)

}
}
object Server {
//创建远程Actor:ServerSystem
def main(args: Array[String]): Unit = {
val serverSystem = ActorSystem(“mxb”,ConfigFactory.parseString(“””
akka {
actor {
provider = “akka.remote.RemoteActorRefProvider”
}
remote {
enabled-transports = [“akka.remote.netty.tcp”]
netty.tcp {
hostname = “127.0.0.1”
port = 2555
}
}
}
“”“))
serverSystem.actorOf(Props[Server], “server”)
}
}
客户端Client:

class Client extends Actor {
//远程Actor
var remoteActor : ActorSelection = null
//当前Actor
var localActor : akka.actor.ActorRef = null
@throwsException
override def preStart(): Unit = {
remoteActor = context.actorSelection(“akka.tcp://mxb@127.0.0.1:2555/user/server”)
println(“远程服务端地址 : ” + remoteActor)
}

override def receive: Receive = {
//接收到消息类型为AkkaMessage后,将消息转发至远程Actor
case msg: AkkaMessage => {
println(“客户端发送消息 : ” + msg)
this.localActor = sender()
remoteActor ! msg
}
//接收到远程Actor发送的消息类型为Response,响应
case res: Response => {
localActor ! res
}
case _ => println(“客户端不支持的消息类型 .. “)

}
}
object Client {
def main(args: Array[String]) : Unit = {
val clientSystem = ActorSystem(“ClientSystem”, ConfigFactory.parseString(“””
akka {
actor {
provider = “akka.remote.RemoteActorRefProvider”
}
}
“”“))

 var client = clientSystem.actorOf(Props[Client])
 var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4"))

  implicit val timeout = Timeout(3 seconds)

  msgs.foreach { x =>
    val future = client ? x
    val result = Await.result(future,timeout.duration).asInstanceOf[Response]
    println("收到的反馈: " + result)
  }

  //     msgs.foreach { x =>
  //       client ! x
  //     }

  clientSystem.shutdown()

}
}

运行结果:
server console:
[INFO] [03/13/2017 15:38:03.154] [main] [Remoting] Starting remoting
[INFO] [03/13/2017 15:38:03.653] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://mxb@127.0.0.1:2555]
[INFO] [03/13/2017 15:38:03.653] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://mxb@127.0.0.1:2555]
服务端收到消息:message1
服务端收到消息:message2
服务端收到消息:message3
服务端收到消息:message4
[ERROR] [03/13/2017 15:38:49.113] [mxb-akka.remote.default-remote-dispatcher-6] [akka.tcp://mxb@127.0.0.1:2555/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClientSystem%4010.60.98.79%3A2552-0/endpointWriter] AssociationError [akka.tcp://mxb@127.0.0.1:2555] <- [akka.tcp://ClientSystem@10.60.98.79:2552]: Error [Shut down address: akka.tcp://ClientSystem@10.60.98.79:2552] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://ClientSystem@10.60.98.79:2552
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]

client console:
[INFO] [03/13/2017 15:38:48.409] [main] [Remoting] Starting remoting
[INFO] [03/13/2017 15:38:48.596] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClientSystem@10.60.98.79:2552]
[INFO] [03/13/2017 15:38:48.596] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://ClientSystem@10.60.98.79:2552]
远程服务端地址 : ActorSelection[Anchor(akka.tcp://mxb@127.0.0.1:2555/), Path(/user/server)]
客户端发送消息 : AkkaMessage(message1)
收到的反馈: Response(response_message1)
客户端发送消息 : AkkaMessage(message2)
收到的反馈: Response(response_message2)
客户端发送消息 : AkkaMessage(message3)
收到的反馈: Response(response_message3)
客户端发送消息 : AkkaMessage(message4)
收到的反馈: Response(response_message4)
[INFO] [03/13/2017 15:38:49.035] [ClientSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClientSystem@10.60.98.79:2552/system/remoting-terminator] Shutting down remote daemon.
[INFO] [03/13/2017 15:38:49.050] [ClientSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://ClientSystem@10.60.98.79:2552/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [03/13/2017 15:38:49.128] [ForkJoinPool-3-worker-15] [Remoting] Remoting shut down
[INFO] [03/13/2017 15:38:49.128] [ClientSystem-akka.remote.default-remote-dispatcher-7] [akka.tcp://ClientSystem@10.60.98.79:2552/system/remoting-terminator] Remoting shut down.

Process finished with exit code 0