本章分析一下nacos集群之间nacos服务器上线,下线原理
Nacos集群代码主要相关类:
// Nacos集群的成员节点
com.alibaba.nacos.naming.cluster.servers.Server;
// 用于全局刷新和操作服务器列表的管理器。
com.alibaba.nacos.naming.cluster.ServerListManager;
// 检测和控制本地服务器的工作状态
com.alibaba.nacos.naming.cluster.ServerStatusManager
//向其他服务器报告本地服务器状态
com.alibaba.nacos.naming.misc.ServerStatusSynchronizer
启动定时任务
/**
The manager to globally refresh and operate server list.
*/
@Component("serverListManager")
public class ServerListManager {
//如果服务器节点有变化需要通知的对象
private List
//存储集群中所有服务器节点
private List
//存储集群中健康的服务器节点
private List
//根据site=key集群节点集合
private Map
//存储各个节点最后一次心跳时间戳
private Map
//服务启动时候启动两个定时任务
@PostConstruct
public void init() {
//下面代码相当于:executorService.scheduleAtFixedRate(new ServerListUpdater(), 0, NACOS_SERVER_LIST_REFRESH_INTERVAL=5000, TimeUnit.MILLISECONDS);
GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
//下面代码相当于:SERVER_STATUS_EXECUTOR.schedule(new ServerStatusReporter(), 2000, TimeUnit.MILLISECONDS);
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
}
}
@Service
public class ServerStatusManager {
private ServerStatus serverStatus = ServerStatus.STARTING;
@PostConstruct
public void init() {
//executorService.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD=5000, TimeUnit.MILLISECONDS);
GlobalExecutor.registerServerStatusUpdater(new ServerStatusUpdater());
}
}
ServerListManager.ServerListUpdater定时刷新本机集群节点列表的变化
/**
* ServerListManager的内部类,定时刷新本机集群节点列表的变化
*/
public class ServerListUpdater implements Runnable {@Override
public void run() {
try {
/**
* refreshServerList方法:
* 1. 如果是STANDALONE_MODE返回当前节点实例
* 2. 从cluster.conf配置文件中读取节点列表 (readClusterConf())返回
* 3. 如果为空则从System.getenv("naming_self_service_cluster_ips") 读取节点列表返回
*/
List<Server> refreshedServers = refreshServerList();
List<Server> oldServers = servers;
if (CollectionUtils.isEmpty(refreshedServers)) {
Loggers.RAFT.warn("refresh server list failed, ignore it.");
return;
}
boolean changed = false;
//新增加集群节点
List<Server> newServers = (List<Server>) CollectionUtils.subtract(refreshedServers, oldServers);
if (CollectionUtils.isNotEmpty(newServers)) {
servers.addAll(newServers);
changed = true;
}
//移除的集群节点
List<Server> deadServers = (List<Server>) CollectionUtils.subtract(oldServers, refreshedServers);
if (CollectionUtils.isNotEmpty(deadServers)) {
servers.removeAll(deadServers);
changed = true;
}
//如果服务器节点有变化,通知其他类
if (changed) {
notifyListeners();
}
} catch (Exception e) {
Loggers.RAFT.info("error while updating server list.", e);
}
}</code></pre></li>
ServerListManager.ServerStatusReporter定时发送心跳到集群中的其他节点
报文格式:SITE#IP:port#currentTimeMillis#weight\r\n
/**
* ServerListManager的内部类,定时发送心跳到集群中的其他节点
*/
private class ServerStatusReporter implements Runnable {@Override
public void run() {
try {
//
checkDistroHeartbeat();
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) weight = 1;
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";
//send status to itself
onReceiveServerStatus(status);
//集群所有节点(除了本机)发送心跳
List<Server> allServers = getServers();
for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) {
if (server.getKey().equals(NetUtils.localServer())) {
continue;
}
Message msg = new Message();
msg.setData(status);
synchronizer.send(server.getKey(), msg);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {
GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
}
/**
- Report local server status to other server
- @author nacos
*/
public class ServerStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIP, Message msg) {
final Map params = new HashMap(2);
params.put("serverStatus", msg.getData());
//上报地址
String url = "http://serverIP:8848 + "/nacos/v1/ns/operator/server/status";
HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
return 0;
}
});
}
}
本地节点接收其他节点发送心跳后处理,主要代码
@RequestMapping("/server/status")
public String serverStatus(@RequestParam String serverStatus) {
serverListManager.onReceiveServerStatus(serverStatus);
return "ok";
}
public synchronized void onReceiveServerStatus(String config) {
List tmpServerList = new ArrayList<>();
//site:ip:lastReportTime:weight
String[] params = config.split("#");
Server server = new Server();
server.setSite(params[0]);
server.setIp(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]);
server.setServePort(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1]));
server.setLastRefTime(Long.parseLong(params[2]));
//如果服务器的两个报告间隔大于distroServerExpiredMillis,则认为服务器已过期。
Long lastBeat = distroBeats.get(server.getKey());
long now = System.currentTimeMillis();
if (null != lastBeat) {
server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
//更新当前节点最后一次心跳时间
distroBeats.put(server.getKey(), now);
Date date = new Date(Long.parseLong(params[2]));
server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1); //如果当前节点在列表中更新为现在数据
List<Server> list = distroConfig.get(server.getSite());
for (Server s : list) {
String serverId = s.getKey() + "_" + s.getSite();
String newServerId = server.getKey() + "_" + server.getSite();
if (serverId.equals(newServerId)) {
tmpServerList.add(server);
continue;
}
tmpServerList.add(s);
}
//如果没在列表中则添加到列表中
if (!tmpServerList.contains(server)) {
tmpServerList.add(server);
}
distroConfig.put(server.getSite(), tmpServerList);
}
检查心跳时间戳确定节点是否Alive
private void checkDistroHeartbeat() {
List newHealthyList = new ArrayList<>(servers.size());
long now = System.currentTimeMillis();
for (Server s: servers) {
Long lastBeat = distroBeats.get(s.getKey());
if (null == lastBeat) {
continue;
}
s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
//local site servers
List<String> allLocalSiteSrvs = new ArrayList<>();
for (Server server : servers) {
server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));
for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {
if (!allLocalSiteSrvs.contains(server.getKey())) {
allLocalSiteSrvs.add(server.getKey());
}
if (server.isAlive() && !newHealthyList.contains(server)) {
newHealthyList.add(server);
}
}
}Collections.sort(newHealthyList);
if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {
healthyServers = newHealthyList;
notifyListeners();
}
}
ServerStatusManager.ServerStatusUpdater 检测和控制本地服务器的工作状态
public class ServerStatusUpdater implements Runnable {
@Override
public void run() {
refreshServerStatus();
}
}
private void refreshServerStatus() {
if (StringUtils.isNotBlank(switchDomain.getOverriddenServerStatus())) {
serverStatus = ServerStatus.valueOf(switchDomain.getOverriddenServerStatus());
return;
}
if (consistencyService.isAvailable()) {
serverStatus = ServerStatus.UP;
} else {
serverStatus = ServerStatus.DOWN;
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章