集群客户端
阅读原文时间:2021年04月20日阅读:1

集群客户端

不属于集群的actor系统可以通过ClusterClient与集群中的actor进行通信。当然这个客户可以是另一个集群的一部分。它只需要知道作为联络点的一个节点或多个节点的位置。它会建立一个与集群的集群接待员ClusterReceptionist的连接。它会监视与接待员的连接,如果连接故障了,它会建立一条新的连接。当查找一个新的接待员时,它会从之前的建立获取新的联络点,或者周期性的刷新联络点,即不必初始化联络点。

注意:当向同一个集群中的actor发送消息时,不应该使用ClusterClient。与 ClusterClient类似的功能以更加高效的方式提供,即为同一个集群中的actor提供集群的分布式发布订阅

同时,注意,当使用集群客户端时,不必将akka.actor.provider从local改为remote。

所有的节点或者特定角色的所有节点上会启动接待员。接待员可以ClusterClientReceptionist扩展启动,也可以作为普通的actor启动。

你可以通过ClusterClient向集群中的任何actor发送消息,前提是集群要用ClusterReceptionist注册到DistributedPubSubMediator。ClusterClientReceptionist提供了应该客户端可访问actor的注册方法。消息封装到了ClusterClient.Send、ClusterClient.SendToAll或者ClusterClient.Publish。

ClusterClient和ClusterClientReceptionist都会发出事件。ClusterClient 发出的通知与收到的ClusterClientReceptionist的联络点列表有关。这个联络点列表的一个使用场景可能是用于客户端记录它的联络点。重启后的客户端可以使用这个信息来取代任何之前的配置的联络点。

ClusterClientReceptionist发出的通知与收到的ClusterClient的联系有关。这个通知使得包含接待员的服务端可以感知到连接的客户端。

1. ClusterClient.Send

这个消息会被投递到匹配的接待员。如果多个接待员匹配这个路径,那么消息会随机投递。消息的sender()可以指定本地的姻亲,即消息被发送与接待员相同的本地actor系统的一个actor;否则随机匹配。

2. ClusterClient.SendToAll

消息会被投递到匹配路径的所有接待员。

3. ClusterClient.Publish

消息会被投递到所有的接待员actor,这些actor都被注册为特定主题的订阅者。

来自目的actor的响应消息通过接待员传递,以避免其它集群节点的入站连接,即目的actor看到的发送者不是客户端自己。客户端看到的响应消息的发送者死信,因为客户端应该正常发送后面的消息。应答消息有可能传递原始发送者,如果客户端想要直接与集群中的actor通信。

当ClusterClient正在与接待员建立连接时,它会缓存消息;当连接建立后,它会发送消息。如果缓存满了,当消息到来后,ClusterClient会丢弃掉老的消息。 缓存的大小是可配置的,缓存大小设置为0则禁止缓存。

值得注意的是,由于这些actor的分布式特性,消息总是可能丢失的。在目的actor和客户端actor总是需要实现额外的逻辑以确保至少一次的消息投递。

示例

首先在集群节点上启动接待员。注意,推荐当actor系统启动时加载扩展,只需要在akka.extensions配置属性中定义:

akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]

然后,注册客户端可用的actor:

ActorRef serviceA = system.actorOf(Props.create(Service.class), "serviceA");

ClusterClientReceptionist.get(system).registerService(serviceA);

ActorRef serviceB = system.actorOf(Props.create(Service.class), "serviceB");

ClusterClientReceptionist.get(system).registerService(serviceB);

在客户端,你创建一个ClusterClient actor,并用它作为向以路径标识(没有地址信息)的actor发送消息的网关:

final ActorRef c = system.actorOf(ClusterClient.props(

ClusterClientSettings.create(system).withInitialContacts(initialContacts())),

"client");

c.tell(new ClusterClient.Send("/user/serviceA", "hello", true), ActorRef.noSender());

c.tell(new ClusterClient.SendToAll("/user/serviceB", "hi"), ActorRef.noSender());

initialContacts参数是一个Set,可以这样创建:

Set initialContacts() {

returnnew HashSet(Arrays.asList(

ActorPaths.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),

ActorPaths.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist")));

}

你可能要在配置或者系统属性中定义初始的联络点的地址信息。

更加复杂的样例可在Lightbend Activator教程Distributed workers with Akka and Java!找到。

ClusterClientReceptionist Extension

在上面的例子中,接待员启动和访问都是通过akka.cluster.client.ClusterClientReceptionist扩展实现的。大多数场景主公,这是很方便,也是极其完美的,但是最好知道可能会作为普通的actor启动akka.cluster.client.ClusterReceptionist actor,你可以同时拥有几个不同的接待员,服务不同类型的客户端。

注意,ClusterClientReceptionist使用了DistributedPubSub扩展,在Distributed Publish Subscribe in Cluster中介绍。

推荐当actor系统启动时加载这个扩展,它可以通过akka.extensions配置属性定义:

akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]

事件

正如之前提到的,ClusterClient和ClusterClientReceptionist都会发布事件。下面的代码片段声明的actor会接收联络点的通知。这个代码说明了订阅事件和接收ClusterClient初始态:

static public class ClientListener extends UntypedActor {

    private final ActorRef targetClient;
    private final Set<ActorPath> contactPoints = new HashSet<>();

    public ClientListener(ActorRef targetClient) {
        this.targetClient = targetClient;
    }

    @Override
    publicvoid preStart() {
        targetClient.tell(SubscribeContactPoints.getInstance(), sender());
    }

    @Override
    publicvoid onReceive(Object message) {
        if (messageinstanceof ContactPoints) {
            ContactPoints msg = (ContactPoints) message;
            contactPoints.addAll(msg.getContactPoints());
            // Now do something with an up-to-date "contactPoints"
        } elseif (messageinstanceof ContactPointAdded) {
            ContactPointAdded msg = (ContactPointAdded) message;
            contactPoints.add(msg.contactPoint());
            // Now do something with an up-to-date "contactPoints"
        } elseif (messageinstanceof ContactPointRemoved) {
            ContactPointRemoved msg = (ContactPointRemoved) message;
            contactPoints.remove(msg.contactPoint());
            // Now do something with an up-to-date "contactPoints"
        }
    }
}

类似的,我们可以由一个行为类似的 actor ,集群客户端联系 ClusterClientReceptionist :

static public class ReceptionistListener extends UntypedActor {

    private final ActorRef targetReceptionist;
    private final Set<ActorRef> clusterClients = new HashSet<>();

    public ReceptionistListener(ActorRef targetReceptionist) {
        this.targetReceptionist = targetReceptionist;
    }

 &nbsp; @Override
    publicvoid preStart() {
        targetReceptionist.tell(SubscribeClusterClients.getInstance(), sender());
    }

    @Override
    publicvoid onReceive(Object message) {
        if (messageinstanceof ClusterClients) {
            ClusterClients msg = (ClusterClients) message;
            clusterClients.addAll(msg.getClusterClients());
            // Now do something with an up-to-date "clusterClients"
        } elseif (messageinstanceof ClusterClientUp) {
            ClusterClientUp msg = (ClusterClientUp) message;
            clusterClients.add(msg.clusterClient());
            // Now do something with an up-to-date "clusterClients"
        } elseif (messageinstanceof ClusterClientUnreachable) {
            ClusterClientUnreachable msg = (ClusterClientUnreachable) message;
            clusterClients.remove(msg.clusterClient());
            // Now do something with an up-to-date "clusterClients"
        }
    }
}

依赖

要使用集群客户端,你必须在你的工程中添加如下的依赖:

sbt:

"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.16"

maven:

com.typesafe.akka

akka-cluster-tools_2.11

2.4.16

配置

ClusterClientReceptionist扩展(或ClusterReceptionistSettings)可以用以下的属性配置:

# Settings for the ClusterClientReceptionist extension

akka.cluster.client.receptionist {

# Actor name of the ClusterReceptionist actor, /system/receptionist

name = receptionist

# Start the receptionist on members tagged with this role.

# All members are used if undefined or empty.

role = ""

# The receptionist will send this number of contact points to the client

number-of-contacts = 3

# The actor that tunnel response messages to the client will be stopped

# after this time of inactivity.

response-tunnel-receive-timeout = 30s

# The id of the dispatcher to use for ClusterReceptionist actors.

# If not specified default dispatcher is used.

# If specified you need to define the settings of the actual dispatcher.

use-dispatcher = ""

# How often failure detection heartbeat messages should be received for

# each ClusterClient

heartbeat-interval = 2s

# Number of potentially lost/delayed heartbeats that will be

# accepted before considering it to be an anomaly.

# The ClusterReceptionist is using the akka.remote.DeadlineFailureDetector, which

# will trigger if there are no heartbeats within the duration

# heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with

# the default settings.

acceptable-heartbeat-pause = 13s

# Failure detection checking interval for checking all ClusterClients

failure-detection-interval = 2s

}

当用ActorSystem参数创建ClusterClientSettings时,ClusterClientSettings会读取下面的配置属性。也可能会修改ClusterClientSettings,从同层的另一个配置创建它。ClusterClientSettings是ClusterClient.props工程方法的参数,即每一个客户端可以按需配置不同的设置。

# Settings for the ClusterClient

akka.cluster.client {

# Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)

# that the client will try to contact initially. It is mandatory to specify

# at least one initial contact.

# Comma separated full actor paths defined by a string on the form of

# "akka.tcp://system@hostname:port/system/receptionist"

initial-contacts = []

# Interval at which the client retries to establish contact with one of

# ClusterReceptionist on the servers (cluster nodes)

establishing-get-contacts-interval = 3s

# Interval at which the client will ask the ClusterReceptionist for

# new contact points to be used for next reconnect.

refresh-contacts-interval = 60s

# How often failure detection heartbeat messages should be sent

heartbeat-interval = 2s

# Number of potentially lost/delayed heartbeats that will be

# accepted before considering it to be an anomaly.

# The ClusterClient is using the akka.remote.DeadlineFailureDetector, which

# will trigger if there are no heartbeats within the duration

# heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with

# the default settings.

acceptable-heartbeat-pause = 13s

# If connection to the receptionist is not established the client will buffer

# this number of messages and deliver them the connection is established.

# When the buffer is full old messages will be dropped when new messages are sent

# via the client. Use 0 to disable buffering, i.e. messages will be dropped

# immediately if the location of the singleton is unknown.

# Maximum allowed buffer size is 10000.

buffer-size = 1000

# If connection to the receiptionist is lost and the client has not been

# able to acquire a new connection for this long the client will stop itself.

# This duration makes it possible to watch the cluster client and react on a more permanent

# loss of connection with the cluster, for example by accessing some kind of

# service registry for an updated set of initial contacts to start a new cluster client with.

# If this is not wanted it can be set to "off" to disable the timeout and retry

# forever.

reconnect-timeout = off

}

故障处理

当集群客户端启动时,它必须提供一个初始联络点列表,它们是接待员运行所在的集群节点。它会重复(由establishing-get-contacts-interval配置间隔时间)尝试联系这些节点,直到它与其中的一个节点联系上。当正在运行的时候,这个联系点列表会用来自接待员的数据持续更新(由refresh-contacts-interval配置间隔时间),这样如果集群中有更多的接待员,比提供的联系点多,那么客户端会学习它们。

当客户端运行时,它会检测连接接待员的故障,if more than a configurable amount of heartbeats are missed, 客户端会尝试重新连接它已知的联系点,来查找它可以访问的接待员。

当集群不可达时

如果在可配置的事件内找不到接待员,它可能会停止集群客户端。这是用reconnect-timeout配置的,默认为off。这是很有用的,当从服务注册表中提供的初始联系点时,集群节点地址会完全动态,整个集群可能会被关闭或者崩溃,在新的地址上重启。由于客户端会停止,监视actor会检测到它,一旦终止,新的初始联系点可能会被获取到,新的集群可达会被启动。