nacos集群
阅读原文时间:2023年07月09日阅读:2

本章分析一下nacos集群之间nacos服务器上线,下线原理

  • 每5秒运行定时任务ServerListManager.ServerListUpdater获取新上线的节点或下线的节点
  • 每2秒运行定时任务ServerListManager.ServerStatusReporter发送心跳数据到集群中的每个节点
  • 每5秒运行定时任务ServerStatusManager.ServerStatusUpdater检测和控制本地服务器的工作状态
  • 当前节点收到其他节点发送心跳数据后更新该节点最后心跳时间戳的Map对象
  • 执行ServerStatusReporter时候运行checkDistroHeartbeat方法更新健康的服务器节点集合

Nacos集群代码主要相关类:

// Nacos集群的成员节点
com.alibaba.nacos.naming.cluster.servers.Server;

// 用于全局刷新和操作服务器列表的管理器。
com.alibaba.nacos.naming.cluster.ServerListManager;

// 检测和控制本地服务器的工作状态
com.alibaba.nacos.naming.cluster.ServerStatusManager

//向其他服务器报告本地服务器状态
com.alibaba.nacos.naming.misc.ServerStatusSynchronizer
  1. 启动定时任务

    /**

    • The manager to globally refresh and operate server list.
      */
      @Component("serverListManager")
      public class ServerListManager {
      //如果服务器节点有变化需要通知的对象
      private List listeners = new ArrayList<>();
      //存储集群中所有服务器节点
      private List servers = new ArrayList<>();
      //存储集群中健康的服务器节点
      private List healthyServers = new ArrayList<>();
      //根据site=key集群节点集合
      private Map> distroConfig = new ConcurrentHashMap<>();
      //存储各个节点最后一次心跳时间戳
      private Map distroBeats = new ConcurrentHashMap<>(16);

      //服务启动时候启动两个定时任务
      @PostConstruct
      public void init() {
      //下面代码相当于:executorService.scheduleAtFixedRate(new ServerListUpdater(), 0, NACOS_SERVER_LIST_REFRESH_INTERVAL=5000, TimeUnit.MILLISECONDS);
      GlobalExecutor.registerServerListUpdater(new ServerListUpdater());

      //下面代码相当于:SERVER_STATUS_EXECUTOR.schedule(new ServerStatusReporter(), 2000, TimeUnit.MILLISECONDS);
      GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);

      }
      }

    @Service
    public class ServerStatusManager {
    private ServerStatus serverStatus = ServerStatus.STARTING;

    @PostConstruct
    public void init() {
        //executorService.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD=5000, TimeUnit.MILLISECONDS);
        GlobalExecutor.registerServerStatusUpdater(new ServerStatusUpdater());
    }

    }

  2. ServerListManager.ServerListUpdater定时刷新本机集群节点列表的变化

    /**
     * ServerListManager的内部类,定时刷新本机集群节点列表的变化
     */
    public class ServerListUpdater implements Runnable {
    @Override
    public void run() {
        try {
            /**
             * refreshServerList方法:
             * 1. 如果是STANDALONE_MODE返回当前节点实例
             * 2. 从cluster.conf配置文件中读取节点列表 (readClusterConf())返回
             * 3. 如果为空则从System.getenv("naming_self_service_cluster_ips") 读取节点列表返回
             */
            List&lt;Server&gt; refreshedServers = refreshServerList();
            List&lt;Server&gt; oldServers = servers;
    
            if (CollectionUtils.isEmpty(refreshedServers)) {
                Loggers.RAFT.warn("refresh server list failed, ignore it.");
                return;
            }
    
            boolean changed = false;
            //新增加集群节点
            List&lt;Server&gt; newServers = (List&lt;Server&gt;) CollectionUtils.subtract(refreshedServers, oldServers);
            if (CollectionUtils.isNotEmpty(newServers)) {
                servers.addAll(newServers);
                changed = true;
            }
            //移除的集群节点
            List&lt;Server&gt; deadServers = (List&lt;Server&gt;) CollectionUtils.subtract(oldServers, refreshedServers);
            if (CollectionUtils.isNotEmpty(deadServers)) {
                servers.removeAll(deadServers);
                changed = true;
            }
            //如果服务器节点有变化,通知其他类
            if (changed) {
                notifyListeners();
            }
    
        } catch (Exception e) {
            Loggers.RAFT.info("error while updating server list.", e);
        }
    }</code></pre></li>
  3. ServerListManager.ServerStatusReporter定时发送心跳到集群中的其他节点

    报文格式:SITE#IP:port#currentTimeMillis#weight\r\n

      /**
       * ServerListManager的内部类,定时发送心跳到集群中的其他节点
      */
      private class ServerStatusReporter implements Runnable {
    @Override
    public void run() {
        try {
            //
            checkDistroHeartbeat();
    
            int weight = Runtime.getRuntime().availableProcessors() / 2;
            if (weight &lt;= 0) weight = 1;
            long curTime = System.currentTimeMillis();
            String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";
    
            //send status to itself
            onReceiveServerStatus(status);
    
            //集群所有节点(除了本机)发送心跳
            List&lt;Server&gt; allServers = getServers();
            for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) {
                 if (server.getKey().equals(NetUtils.localServer())) {
                    continue;
                 }
                 Message msg = new Message();
                 msg.setData(status);
                 synchronizer.send(server.getKey(), msg);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
        } finally {
            GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
        }
    
    }
    }

    }

    /**

    • Report local server status to other server
    • @author nacos
      */
      public class ServerStatusSynchronizer implements Synchronizer {
      @Override
      public void send(final String serverIP, Message msg) {
      final Map params = new HashMap(2);
      params.put("serverStatus", msg.getData());
      //上报地址
      String url = "http://serverIP:8848 + "/nacos/v1/ns/operator/server/status";
      HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler() {
      @Override
      public Integer onCompleted(Response response) throws Exception {
      if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
      return 1;
      }
      return 0;
      }
      });
      }
      }
  4. 本地节点接收其他节点发送心跳后处理,主要代码

    @RequestMapping("/server/status")
    public String serverStatus(@RequestParam String serverStatus) {
    serverListManager.onReceiveServerStatus(serverStatus);
    return "ok";
    }

    public synchronized void onReceiveServerStatus(String config) {
    List tmpServerList = new ArrayList<>();
    //site:ip:lastReportTime:weight
    String[] params = config.split("#");
    Server server = new Server();
    server.setSite(params[0]);
    server.setIp(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]);
    server.setServePort(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1]));
    server.setLastRefTime(Long.parseLong(params[2]));

           //如果服务器的两个报告间隔大于distroServerExpiredMillis,则认为服务器已过期。
            Long lastBeat = distroBeats.get(server.getKey());
            long now = System.currentTimeMillis();
            if (null != lastBeat) {
                server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
            }
            //更新当前节点最后一次心跳时间
            distroBeats.put(server.getKey(), now);
            Date date = new Date(Long.parseLong(params[2]));
            server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
            server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1);
        //如果当前节点在列表中更新为现在数据
        List&lt;Server&gt; list = distroConfig.get(server.getSite());
        for (Server s : list) {
            String serverId = s.getKey() + "_" + s.getSite();
            String newServerId = server.getKey() + "_" + server.getSite();
    
            if (serverId.equals(newServerId)) {
                tmpServerList.add(server);
                continue;
            }
            tmpServerList.add(s);
        }
        //如果没在列表中则添加到列表中
        if (!tmpServerList.contains(server)) {
            tmpServerList.add(server);
        }
        distroConfig.put(server.getSite(), tmpServerList);
    }
  5. 检查心跳时间戳确定节点是否Alive

    private void checkDistroHeartbeat() {
    List newHealthyList = new ArrayList<>(servers.size());
    long now = System.currentTimeMillis();
    for (Server s: servers) {
    Long lastBeat = distroBeats.get(s.getKey());
    if (null == lastBeat) {
    continue;
    }
    s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
    }

        //local site servers
        List<String> allLocalSiteSrvs = new ArrayList<>();
        for (Server server : servers) {
            server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));
            for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {
                if (!allLocalSiteSrvs.contains(server.getKey())) {
                    allLocalSiteSrvs.add(server.getKey());
                }
                if (server.isAlive() && !newHealthyList.contains(server)) {
                    newHealthyList.add(server);
                }
            }
        }
    Collections.sort(newHealthyList);
    if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {
        healthyServers = newHealthyList;
        notifyListeners();
    }
    }
  6. ServerStatusManager.ServerStatusUpdater 检测和控制本地服务器的工作状态

    public class ServerStatusUpdater implements Runnable {

        @Override
        public void run() {
            refreshServerStatus();
        }

    }

    private void refreshServerStatus() {
    if (StringUtils.isNotBlank(switchDomain.getOverriddenServerStatus())) {
    serverStatus = ServerStatus.valueOf(switchDomain.getOverriddenServerStatus());
    return;
    }

        if (consistencyService.isAvailable()) {
            serverStatus = ServerStatus.UP;
        } else {
            serverStatus = ServerStatus.DOWN;
        }
    }