深入理解DiscoveryClient
阅读原文时间:2023年07月10日阅读:1

Spring Cloud Commons 提供的抽象

最早的时候服务发现注册都是通过DiscoveryClient来实现的,随着版本变迁把DiscoveryClient服务注册抽离出来变成了ServiceRegistry抽象,专门负责服务注册,DiscoveryClient专门负责服务发现。还提供了负载均衡的发现LoadBalancerClient抽象。

DiscoveryClient通过@EnableDiscoveryClient的方式进行启用。

自动向Eureka服务端注册

ServiceRegistry使用的式EurekaServiceRegistry 来做的注册, 注册信息放在EurekaRegistration中

源码:

public interface ServiceRegistry {

/\*\*注册  
    \*/

void register(R registration);

/\*\*  
 \* 取消注册  
 \*  
 \*/  
void deregister(R registration);

/\*\*  
 \* 关闭服务  
 \*/  
void close();

/\*\*  
 \*设置状态  
 \*/  
void setStatus(R registration, String status);

/\*\*  
 \* 获取状态  
 \*/  
<T> T getStatus(R registration);

}

EurekaServiceRegistry的实现

import java.util.HashMap;

import com.netflix.appinfo.InstanceInfo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.client.serviceregistry.ServiceRegistry;

import static com.netflix.appinfo.InstanceInfo.InstanceStatus.UNKNOWN;

/**
* @author Spencer Gibb
*/
public class EurekaServiceRegistry implements ServiceRegistry {

private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);  

  
    // 获取到EurekaRegistration 信息

@Override public void register(EurekaRegistration reg) {
     // 初始化信息
maybeInitializeClient(reg);

    if (log.isInfoEnabled()) {  
        log.info("Registering application "  
                + reg.getApplicationInfoManager().getInfo().getAppName()  
                + " with eureka with status "  
                + reg.getInstanceConfig().getInitialStatus());  
    }  

      // 设置实例状态
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
      // 如果有healthCheckHandler 设置 healthCheck
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}

private void maybeInitializeClient(EurekaRegistration reg) {  
    // force initialization of possibly scoped proxies  
    reg.getApplicationInfoManager().getInfo();  
    reg.getEurekaClient().getApplications();  
}

@Override  
public void deregister(EurekaRegistration reg) {  
    if (reg.getApplicationInfoManager().getInfo() != null) {

        if (log.isInfoEnabled()) {  
            log.info("Unregistering application "  
                    + reg.getApplicationInfoManager().getInfo().getAppName()  
                    + " with eureka with status DOWN");  
        }  

        //将状态设置为DOWN
reg.getApplicationInfoManager()
.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);

        // shutdown of eureka client should happen with EurekaRegistration.close()  
        // auto registration will create a bean which will be properly disposed  
        // manual registrations will need to call close()  
    }  
}

@Override  
public void setStatus(EurekaRegistration registration, String status) {  
    InstanceInfo info = registration.getApplicationInfoManager().getInfo();

    // TODO: howto deal with delete properly?  
    if ("CANCEL\_OVERRIDE".equalsIgnoreCase(status)) {  
        registration.getEurekaClient().cancelOverrideStatus(info);  
        return;  
    }

    // TODO: howto deal with status types across discovery systems?  
    InstanceInfo.InstanceStatus newStatus = InstanceInfo.InstanceStatus  
            .toEnum(status);  
    registration.getEurekaClient().setStatus(newStatus, info);  
}

@Override  
public Object getStatus(EurekaRegistration registration) {  
    String appname = registration.getApplicationInfoManager().getInfo().getAppName();  
    String instanceId = registration.getApplicationInfoManager().getInfo().getId();  
    InstanceInfo info = registration.getEurekaClient().getInstanceInfo(appname,  
            instanceId);

    HashMap<String, Object> status = new HashMap<>();  
    if (info != null) {  
        status.put("status", info.getStatus().toString());  
        status.put("overriddenStatus", info.getOverriddenStatus().toString());  
    }  
    else {  
        status.put("status", UNKNOWN.toString());  
    }

    return status;  
}

public void close() {  
}

}

DefaultLifecycleProcessor中start的调用

public void start() {
startBeans(false);
this.running = true;
}

private void startBeans(boolean autoStartupOnly) {
    // 找到所有声明周期中的bean
Map lifecycleBeans = getLifecycleBeans();
Map phases = new HashMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(beanName, bean);
}
});
if (!phases.isEmpty()) {
List keys = new ArrayList<>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
          // 只要不为空,每一个调用start方法
phases.get(key).start();
}
}
}

public void start() {
if (this.members.isEmpty()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
for (LifecycleGroupMember member : this.members) {
doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
}
}

private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = lifecycleBeans.remove(beanName);
if (bean != null && bean != this) {
String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
for (String dependency : dependenciesForBean) {
doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() &&
(!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
if (logger.isTraceEnabled()) {
logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
}
try {
bean.start();
}
catch (Throwable ex) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
}
if (logger.isDebugEnabled()) {
logger.debug("Successfully started bean '" + beanName + "'");
}
}
}
}

DiscoveryClient

public interface DiscoveryClient extends Ordered {

/\*\*  
 \* Default order of the discovery client.  
 \*/  
int DEFAULT\_ORDER = 0;

/\*\*  
 \* A human-readable description of the implementation, used in HealthIndicator. 提供一些描述信息  
 \* @return The description.  
 \*/  
String description();

/\*\*  
 \* Gets all ServiceInstances associated with a particular serviceId.  
 \* @param serviceId The serviceId to query.  
 \* @return A List of ServiceInstance. 根据serviceid 找到ServiceInstance  
 \*/  
List<ServiceInstance> getInstances(String serviceId);

/\*\*  
 \* @return All known service IDs. 获取所有的服务  
 \*/  
List<String> getServices();

/\*\*  
 \* Default implementation for getting order of discovery clients.  
 \* @return order  
 \*/  
@Override  
default int getOrder() {  
    return DEFAULT\_ORDER;  
}

}

Eureka的实现

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.netflix.appinfo.EurekaInstanceConfig;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;

import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.core.Ordered;
import org.springframework.util.Assert;

import static com.netflix.appinfo.InstanceInfo.PortType.SECURE;

/**
* A {@link DiscoveryClient} implementation for Eureka.
*
* @author Spencer Gibb
* @author Tim Ysewyn
*/
public class EurekaDiscoveryClient implements DiscoveryClient {

/\*\*  
 \* Client description {@link String}.  
 \*/  
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";

private final EurekaClient eurekaClient;

private final EurekaClientConfig clientConfig;

@Deprecated  
public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {  
    this(eurekaClient, eurekaClient.getEurekaClientConfig());  
}

public EurekaDiscoveryClient(EurekaClient eurekaClient,  
        EurekaClientConfig clientConfig) {  
    this.clientConfig = clientConfig;  
    this.eurekaClient = eurekaClient;  
}

@Override  
public String description() {  
    return DESCRIPTION;  
}  

    // 通过serviceid 获取到 instanceInfo信息
@Override
public List getInstances(String serviceId) {
List infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}

@Override  
public List<String> getServices() {  
    Applications applications = this.eurekaClient.getApplications();  
    if (applications == null) {  
        return Collections.emptyList();  
    }  
    List<Application> registered = applications.getRegisteredApplications();  
    List<String> names = new ArrayList<>();  
    for (Application app : registered) {  
        if (app.getInstances().isEmpty()) {  
            continue;  
        }  
        names.add(app.getName().toLowerCase());

    }  
    return names;  
}

@Override  
public int getOrder() {  
    return clientConfig instanceof Ordered ? ((Ordered) clientConfig).getOrder()  
            : DiscoveryClient.DEFAULT\_ORDER;  
}

/\*\*  
 \* An Eureka-specific {@link ServiceInstance} implementation.  
 \*/  
public static class EurekaServiceInstance implements ServiceInstance {

    private InstanceInfo instance;

    public EurekaServiceInstance(InstanceInfo instance) {  
        Assert.notNull(instance, "Service instance required");  
        this.instance = instance;  
    }

    public InstanceInfo getInstanceInfo() {  
        return instance;  
    }

    @Override  
    public String getInstanceId() {  
        return this.instance.getId();  
    }

    @Override  
    public String getServiceId() {  
        return this.instance.getAppName();  
    }

    @Override  
    public String getHost() {  
        return this.instance.getHostName();  
    }

    @Override  
    public int getPort() {  
        if (isSecure()) {  
            return this.instance.getSecurePort();  
        }  
        return this.instance.getPort();  
    }

    @Override  
    public boolean isSecure() {  
        // assume if secure is enabled, that is the default  
        return this.instance.isPortEnabled(SECURE);  
    }

    @Override  
    public URI getUri() {  
        return DefaultServiceInstance.getUri(this);  
    }

    @Override  
    public Map<String, String> getMetadata() {  
        return this.instance.getMetadata();  
    }

}

}

总结:

自动配置注册:

EurekaAutoServiceRegistration::start(在DefaultLifecycleProcessor中调用):

1. 先检查port;
2. 如果没有启动过并且非安全的端口大于0,则进行注册(通过调用org.springframework.cloud.client.serviceregistry.ServiceRegistry接口的实例).
3. 注册事件;
4. 设置运行状态
注册:
org.springframework.cloud.client.serviceregistry.ServiceRegistry->注册和取消注册
->
EurekaServiceRegistry::register从参数获取EurekaRegistration,
并初始化EurekaRegistration,通过ApplicationInfoManager设置实例的状态,
如果有healthCheck则注册healthCheck.
自动配置取消注册:

1. 调用org.springframework.cloud.client.serviceregistry.ServiceRegistry接口的实例的deregister方法.
2. 设置状态

取消注册(EurekaServiceRegistry::deregister):

将状态设置成DOWN.

ServiceInstance