nacos服务注册源码解析
阅读原文时间:2021年05月25日阅读:1

1.客户端使用

compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-discovery:2.2.3.RELEASE'
compile 'com.alibaba.cloud:spring-cloud-starter-alibaba-nacos-config:2.2.3.RELEASE'

bootstrap.ym中

spring:
application:
name: product
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
service: ${spring.application.name}
namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
group: DEFAULT_GROUP
config:
server-addr: 127.0.0.1:8848
namespace: 038b8be8-54da-44a5-9664-def33bc8cd19
group: DEFAULT_GROUP
prefix: ${spring.application.name}

@EnableDiscoveryClient
@SpringBootApplication
public class ProductApplication {
public static void main(String[] args) {
SpringApplication.run(ProductApplication.class, args);
}
}

2.服务端代码

git clone https://github.com/alibaba/nacos.git

将源码导入到idea,然后打开console项目中的application.properties配置文件

将db开头的配置放开

找到项目distribution\conf下的SQL文件nacos-mysql.sql,导入数据库

在C:\Users\yue\nacos目录下新建conf文件夹,文件夹下新建一个配置文件cluster.conf

启动Nacos时设置VM参数

-Dnscos.standalone=true -Dnacos.home=C:\Users\yue\nacos -Dserver.port=9000

启动成功后访问地址http://127.0.0.1:9000/nacos/index.html,账号密码默认nacos

3. Spring服务发现的统一规范

Spring将这套规范定义在Spring Cloud Common中

  • circuitbreaker包下定义了断路器的规范
  • discovery包下面定义了服务发现的规范
  • loadbalancer包下面定义了负载均衡的规范
  • serviceregistry包下面定义了服务注册的规范

serviceregistry包定义了三个核心接口,用来实现服务注册。

AutoServiceRegistration:用于服务自动注册

Registration:用于存储服务信息

ServiceRegistry:用于注册/移除服务

4.Nacos客户端服务注册实现

Nacos服务注册模块按照Spring Cloud的规范实现这三个接口

这个类实现了服务自动注册到Nacos注册中心的功能,它继承AbstractAutoServiceRegistration类。

public abstract class AbstractAutoServiceRegistration
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener {

}

AbstractAutoServiceRegistration实现AutoServiceRegistration接口来达到服务自动注册,实现ApplicationListener接口来实现事件回调;当容器启动,应用上下文被刷新且程序准备就绪之后会触发WebServerInitializedEvent事件,调用onApplicationEvent()方法。

@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}

public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register(); if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}

protected void register() {
//实际上最终是调用serviceRegistry.register方法
this.serviceRegistry.register(getRegistration());
}

@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client…");
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();

Instance instance = getNacosInstanceFromRegistration(registration);  
try {  
    // 最终由namingService实现服务注册  

namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed…{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
}

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
     //添加定时心跳任务
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
serverProxy.registerService(groupedServiceName, groupName, instance);
}

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
instance);
final Map params = new HashMap(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
//最终使用http请求注册服务
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
//添加心跳任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
Map params = new HashMap(8);
Map bodyMap = new HashMap(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
//调用http来监控心跳
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}

当容器启动完成,触发事件,开始执行register服务,发起服务注册请求的同时添加一个心跳检测任务。

5.服务注册配置类加载

Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的

在Spring中也有一种类似与Java SPI的加载机制。它在META-INF/spring.factories文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。

这种自定义的SPI机制是Spring Boot Starter实现的基础。

SpringBoot启动的时候会读取所有jar包下面的META-INF/spring.factories文件; 并且将文件中的 接口/抽象类 对应的实现类都对应起来,并在需要的时候可以实例化对应的实现类

这个配置类实例化了NacosServiceRegistryNacosRegistrationNacosAutoServiceRegistration这三个bean。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

@Bean  
public **NacosServiceRegistry** nacosServiceRegistry(  
        NacosDiscoveryProperties nacosDiscoveryProperties) {  
    return new NacosServiceRegistry(nacosDiscoveryProperties);  
}

@Bean  
@ConditionalOnBean(AutoServiceRegistrationProperties.class)  
public **NacosRegistration** nacosRegistration(  
        ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,  
        NacosDiscoveryProperties nacosDiscoveryProperties,  
        ApplicationContext context) {  
    return new NacosRegistration(registrationCustomizers.getIfAvailable(),  
            nacosDiscoveryProperties, context);  
}

@Bean  
@ConditionalOnBean(AutoServiceRegistrationProperties.class)  
public **NacosAutoServiceRegistration** nacosAutoServiceRegistration(  
        NacosServiceRegistry registry,  
        AutoServiceRegistrationProperties autoServiceRegistrationProperties,  
        NacosRegistration registration) {  
    return new NacosAutoServiceRegistration(registry,  
            autoServiceRegistrationProperties, registration);  
}  

}

1. @EnableConfigurationProperties 可以让用了@ConfigurationProperties注解但是没有使用@Component等注解实例化为Bean的类生效,因为没有@Component注解,Spring容器不会实例化这个类,通过@EnableConfigurationProperties注解让NacosdiscoveryProperties被实例化并注入到容器中。

2. @ConditionalOnNacosDiscoveryEnabled 是当spring.cloud.nacos.discovery.enabled=true时当前配置类才会生效。

3. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",matchIfMissing = true)是当spring.cloud.service-registry.auto-registration.enabled=true时当前配置类才会生效。

4. @AutoConfigureAfter(***) 是当括号类的配置类加载生效后再加载当前配置类。

这个配置类实例化NamingService。

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

@Bean  
...

@Bean  
@ConditionalOnEnabledHealthIndicator("nacos-discovery")  
public HealthIndicator nacosDiscoveryHealthIndicator(  
        NacosServiceManager nacosServiceManager,  
        NacosDiscoveryProperties nacosDiscoveryProperties) {  
    Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();  
    return new NacosDiscoveryHealthIndicator(  
            **nacosServiceManager.getNamingService(nacosProperties));**  
}  

}

NacosServiceManager类下:
public NamingService getNamingService(Properties properties) {
if (Objects.isNull(this.namingService)) {
buildNamingService(properties);
}
return namingService;
}
private NamingService buildNamingService(Properties properties) {
if (Objects.isNull(namingService)) {
synchronized (NacosServiceManager.class) {
if (Objects.isNull(namingService)) {
namingService = createNewNamingService(properties);
}
}
}
return namingService;
}
private NamingService createNewNamingService(Properties properties) {
try {
return createNamingService(properties);
}

}
NacosFactory类下:
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
NamingFactory类下:
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
// 通过反射机制创建NamingService的对象
Constructor constructor = driverImplClass.getConstructor(Properties.class);
NamingService vendorImpl = (NamingService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
NacosNamingService类下:
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
//初始化命名空间,注册中心地址,缓存,日志等
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
initServerAddr(properties);
InitUtils.initWebRootContext();
initCacheDir();
initLogName(properties);
//初始化事件分发器,服务代理,心跳机制等
this.eventDispatcher = new EventDispatcher();
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.eventDispatcher, this.serverProxy, beatReactor, this.cacheDir,
isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}

6.服务端服务注册

流程图参考:https://www.processon.com/view/5e25b762e4b04579e409e81f

服务端简单总结:

1.当客户端发起服务注册请求时,将服务添加进本地缓存,并建立心跳检测

2.保存实例时通过实例名称的前缀来判断是临时实例还是永久实例。

3.如果是临时实例,通过阻塞队列将实例更新到内存注册表,然后又通过阻塞队列将实例信息异步批量同步到集群其他节点

4.如果是永久实例,判断当前节点是不是Leader,如果不是,则将服务注册请求转发到Leader节点,如果是Leader,将实例信息到异步更新内存注册表和同步写入文件,然后将实例信息同步到集群其他节点。

naming项目下的 InstanceController类

@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// ①创建1个空服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 从本地缓存serviceMap中获取服务对象
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// ②服务注册
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

①: createEmptyService()方法

// 如果服务不存在,创建一个服务
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
// 如果服务不存在,创建一个空的服务
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 将创建的空的服务添加进本地缓存,并初始化
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
private void putServiceAndInit(Service service) throws NacosException {
// 将服务加入serviceMap缓存
putService(service); // 建立心跳检测任务机制,默认五秒
service.init(); // 实现数据一致性的监听
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
//建立心跳检测任务clientBeatCheckTask,默认五秒
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}

心跳定时任务

@Override
public void run() {
try {
…省略
List instances = service.allIPs(true);
// 设置实例的运行状况
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// 删除过期的实例
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// 删除实例
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}

addInstance()方法

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance… ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 比较并获取新的实例列表
List instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 保存服务实例
consistencyService.put(key, instances);
}
}

区分是临时实例还是永久实例

public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
//通过判断key的前缀字符串来判断
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

@PostConstruct
public void init() {
//提交通知任务
GlobalExecutor.submitDistroNotifyTask(notifier);
}

@Override
public void put(String key, Record value) throws NacosException {
//①注册实例
onPut(key, value);
//②同步实例到其他节点
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}

①注册实例更新到缓存

public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 保存命名数据
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加任务
notifier.addTask(key, DataOperation.CHANGE);
}

public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
// task.run调用
private void handle(Pair pair) {
try {
…省略
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}

}

} …
}

@Override
public void onChange(String key, Instances value) throws Exception {

   //更新实例,运用CopyAndWrite
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
  //重新计算校验
recalculateChecksum();
}

②同步实例到所有远程服务器

异步批量同步

public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}

public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {

}
}

public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
//当前节点不是 Leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
// 将注册请求转发到集群的 Leader节点
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
//当前节点是 Leader
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
// 开始发布
onPublish(datum, peers.local());
final String content = json.toString();
// 利用 CountDownLatch 实现过半
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
final String url = buildUrl(server, API_ON_PUB);
// 同步实例信息给集群节点
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback() {
@Override
public void onReceive(RestResult result) {
if (!result.ok()) {

return;
}
latch.countDown();
}
@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}
@Override
public void onCancel() {
}
});
}
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {

}

    long end = System.currentTimeMillis();  
    Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);  
} finally {  
    OPERATE\_LOCK.unlock();  
}  

}

public void onPublish(Datum datum, RaftPeer source) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
RaftPeer local = peers.local();
if (datum.value == null) {

}
if (!peers.isLeader(source.ip)) {

}
if (source.term.get() < local.term.get()) { … } local.resetLeaderDue(); // 持久化数据,写入文件 if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } datums.put(datum.key, datum); if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
// 发布数据更新事件
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}

一致性监听这里采用了委派机制 ,同时执行了 Distro 和 Raft 两个实现类。由此可以说明Nacos同时使用 Raft 协议和 Distro 协议维护数据一致性的。

@Override
public void listen(String key, RecordListener listener) throws NacosException {
// this special key is listened by both:
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
persistentConsistencyService.listen(key, listener); ephemeralConsistencyService.listen(key, listener); return;
}
mapConsistencyService(key).listen(key, listener);
}

7.启动加载

@EnableDiscoveryClient
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
//默认自动将本服务注册到服务注册中心
boolean autoRegister() default true;
}

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector {
@Override
public String[] selectImports(AnnotationMetadata metadata) {
String[] imports = super.selectImports(metadata);
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(getAnnotationClass().getName(), true));
// 获取@EnableDiscoveryClient注解的属性autoRegister的值
boolean autoRegister = attributes.getBoolean("autoRegister");
if (autoRegister) {
// 加载AutoServiceRegistrationConfiguration配置类
List importsList = new ArrayList<>(Arrays.asList(imports));
importsList.add("org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration");
imports = importsList.toArray(new String[0]);
}
else {
// 设置spring.cloud.service-registry.auto-registration.enabled=false, 关闭服务自动注册功能
Environment env = getEnvironment();
if (ConfigurableEnvironment.class.isInstance(env)) {
ConfigurableEnvironment configEnv = (ConfigurableEnvironment) env;
LinkedHashMap map = new LinkedHashMap<>();
map.put("spring.cloud.service-registry.auto-registration.enabled", false);
MapPropertySource propertySource = new MapPropertySource(
"springCloudDiscoveryClient", map);
configEnv.getPropertySources().addLast(propertySource);
}
}
return imports;
}
…省略
}

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public class AutoServiceRegistrationConfiguration {
}

@ConfigurationProperties("spring.cloud.service-registry.auto-registration")
public class AutoServiceRegistrationProperties {
// 是否启用服务自动注册。默认值为true
private boolean enabled = true;
// 是否将管理注册为服务。默认值为true
private boolean registerManagement = true;
// 如果没有自动注册,启动是否失败。默认值为假
private boolean failFast = false;
…省略
}

综上,没有使用@EnableDiscoveryClient注解,服务也会被自动注册,默认自动注册为true。

如果不想自动注册服务,可以通过

@EnableDiscoveryClient(autoRegister = false)
或者
spring.cloud.service-registry.auto-registration.enabled=false