Dubbo-聊聊注册中心的设计
阅读原文时间:2023年07月09日阅读:1

前言

Dubbo源码阅读分享系列文章,欢迎大家关注点赞

SPI实现部分

  1. Dubbo-SPI机制

  2. Dubbo-Adaptive实现原理

  3. Dubbo-Activate实现原理

  4. Dubbo SPI-Wrapper

注册中心作用


在整个Duubbo架构中,注册中心主要完成以下三件事情:

  1. Provider应用启动后的初始化阶段会向注册中心完成注册操作;

  2. Consumer应用启动初始化阶段会完成对所需 Provider的进行订阅操作;

  3. 在Provider发生变更时通知监听的Consumer;

Registry在整个架构中主要是对Consumer 和 Provider 所对应的业务进行解耦,从而提升系统的稳定性。关于为什么需要注册中心,大家可以参考一下微服务下的注册中心如何选择,在这篇文章中我花了一小节来介绍这个问题。

源码分析

开始之前,首先来看下关于注册中心源码的项目结构,整个项目由两块组成,一个就是核心Api,另外一个就是具体一些中间件的实现,看到这个我们可能会有一些想法,这个定然会有一些模板类以及一些工厂设计,能想到这里,说明你已经具有很好的抽象思维,废话不多说开始源码。

核心Api

dubbo-registry-api是注册中心核心对象的抽象以及实现部分,我们首先来看下几个核心对象设计

Node

Node位于Dubbo的dubbo-common项目下面,在Dubbo中Node这个接口用来抽象节点的概念,Node不仅可以表示Provider和Consumer节点,还可以表示注册中心节点。Node节点内部定义三个方法:

  1. getUrl方法返回当前节点的URL;

  2. isAvailable方法方法返回对象是否可用;

  3. destroy方法负责销毁对象;

RegistryService

RegistryService此接口定义注册中心的功能,定义五个方法:

  1. register方法向注册中心注册一个URL;

  2. unregister方法取消一个URL的注册;

  3. subscribe方法订阅一个URL,订阅成功之后,当订阅的数据发生变化时,注册中心会主动通知第二个参数指定的 NotifyListener对象,NotifyListener接口中定义的 notify() 方法就是用来接收该通知的;

  4. unsubscribe方法取消一个URL定义,同时当数据发生变化时候也会主动发起通知;

  5. lookup方法查询符合条件的注册的数据,subscribe采用Push方式,而lookup采用的是Pull模式;

Registry


Registry接口继承了Node和RegistryService这两个接口,实现该接口类就是注册中心接口的节点,该方法内部也提供两个默认方法reExportRegister方法和reExportUnregister方法,这两个方法实际调用还是RegistryService中的方法。

public interface Registry extends Node, RegistryService {    //调用RegistryService的register    default void reExportRegister(URL url) {        register(url);    }    //调用RegistryService的unregister    default void reExportUnregister(URL url) {        unregister(url);    }}
RegistryFactory

RegistryFactory是 Registry 的工厂类,负责创建 Registry 对象,通过@SPI 注解指定了默认的扩展名为 dubbo,@Adaptive注解表示会生成适配器类并根据 URL 参数中的 protocol 参数值选择相应的实现。

@SPI("dubbo")public interface RegistryFactory {    @Adaptive({"protocol"})    Registry getRegistry(URL url);}

下图是RegistryFactory多种不同的实现,每个 Registry 实现类都有对应的 RegistryFactory 工厂实现,每个 RegistryFactory 工厂实现只负责创建对应的 Registry 对象。

AbstractRegistryFactory

AbstractRegistryFactory 是一个实现了 RegistryFactory 接口的抽象类,内部维护一个Registry的Map集合以及提供销毁和创建注册中心方法,针对不同的注册中心可以有不同的实现。

&nbsp;&nbsp;&nbsp;&nbsp;//锁&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;static&nbsp;final&nbsp;ReentrantLock&nbsp;LOCK&nbsp;=&nbsp;new&nbsp;ReentrantLock();&nbsp;&nbsp;&nbsp;&nbsp;//Map&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;static&nbsp;final&nbsp;Map<String,&nbsp;Registry>&nbsp;REGISTRIES&nbsp;=&nbsp;new&nbsp;HashMap<>();
销毁

销毁方法分为两个,一个全量,一个是单个,单个销毁在AbstractRegistry中调用,参数是注册实例对象。

&nbsp;&nbsp;&nbsp;//全量销毁&nbsp;&nbsp;&nbsp;public&nbsp;static&nbsp;void&nbsp;destroyAll()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!destroyed.compareAndSet(false,&nbsp;true))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(LOGGER.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOGGER.info("Close&nbsp;all&nbsp;registries&nbsp;"&nbsp;+&nbsp;getRegistries());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Lock&nbsp;up&nbsp;the&nbsp;registry&nbsp;shutdown&nbsp;process&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOCK.lock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(Registry&nbsp;registry&nbsp;:&nbsp;getRegistries())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//一个一个销毁&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;registry.destroy();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;e)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOGGER.error(e.getMessage(),&nbsp;e);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//清空map缓存&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;REGISTRIES.clear();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;finally&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Release&nbsp;the&nbsp;lock&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOCK.unlock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;//单个销毁&nbsp;&nbsp;&nbsp;public&nbsp;static&nbsp;void&nbsp;removeDestroyedRegistry(Registry&nbsp;toRm)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOCK.lock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;REGISTRIES.entrySet().removeIf(entry&nbsp;->&nbsp;entry.getValue().equals(toRm));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;finally&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOCK.unlock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;
创建/获取

getRegistry是对RegistryFactory实现,如果没有在缓存中,则进行创建实例对象createRegistry,createRegistry是抽象方法,为了让子类重写该方法,比如说redis实现的注册中心和zookeeper实现的注册中心创建方式肯定不同,而他们相同的一些操作都已经在AbstractRegistryFactory中实现,所以只要关注且实现该抽象方法即可。

&nbsp;&nbsp;&nbsp;&nbsp;//抽象的createRegistry方法&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;abstract&nbsp;Registry&nbsp;createRegistry(URL&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;//获取实例&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;Registry&nbsp;getRegistry(URL&nbsp;url)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Registry&nbsp;defaultNopRegistry&nbsp;=&nbsp;getDefaultNopRegistryIfDestroyed();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(null&nbsp;!=&nbsp;defaultNopRegistry)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;defaultNopRegistry;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//构建key&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;url&nbsp;=&nbsp;URLBuilder.from(url)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.setPath(RegistryService.class.getName())&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.addParameter(INTERFACE_KEY,&nbsp;RegistryService.class.getName())&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.removeParameters(EXPORT_KEY,&nbsp;REFER_KEY)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;.build();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;String&nbsp;key&nbsp;=&nbsp;createRegistryCacheKey(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Lock&nbsp;the&nbsp;registry&nbsp;access&nbsp;process&nbsp;to&nbsp;ensure&nbsp;a&nbsp;single&nbsp;instance&nbsp;of&nbsp;the&nbsp;registry&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOCK.lock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;double&nbsp;check&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;fix&nbsp;https://github.com/apache/dubbo/issues/7265.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;defaultNopRegistry&nbsp;=&nbsp;getDefaultNopRegistryIfDestroyed();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(null&nbsp;!=&nbsp;defaultNopRegistry)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;defaultNopRegistry;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//获取实例对象&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Registry&nbsp;registry&nbsp;=&nbsp;REGISTRIES.get(key);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(registry&nbsp;!=&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;registry;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//没有获取到就创建&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;registry&nbsp;=&nbsp;createRegistry(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(registry&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw&nbsp;new&nbsp;IllegalStateException("Can&nbsp;not&nbsp;create&nbsp;registry&nbsp;"&nbsp;+&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//放入Map集合中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;REGISTRIES.put(key,&nbsp;registry);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;registry;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;finally&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Release&nbsp;the&nbsp;lock&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;LOCK.unlock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}
ListenerRegistryWrapper


ListenerRegistryWrapper是对RegistryFactory的扩展,创建Registry时候会包装一个ListenerRegistryWrapper对象,内部维护一个监听器RegistryServiceListener,当注册、取消注册、订阅以及取消订阅的时候,会发送通知。

AbstractRegistry

AbstractRegistry该抽象类是对Registry接口的实现,实现了Registry接口中的注册、订阅、查询、通知等方法,但是注册、订阅、查询、通知等方法只是简单地把URL加入对应的集合,没有具体的注册或订阅逻辑。此外该类还实现了缓存机制,只不过,它的缓存有两份,一份在内存,一份在磁盘。

&nbsp;&nbsp;&nbsp;&nbsp;//本地的Properties文件缓存,在内存中&nbsp;与file是同步的&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;Properties&nbsp;properties&nbsp;=&nbsp;new&nbsp;Properties();&nbsp;&nbsp;&nbsp;&nbsp;//该单线程池负责讲Provider的全量数据同步到properties字段和缓存文件中,&nbsp;&nbsp;&nbsp;&nbsp;//如果syncSaveFile配置为false,就由该线程池异步完成文件写入&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ExecutorService&nbsp;registryCacheExecutor&nbsp;=&nbsp;Executors.newFixedThreadPool(1,&nbsp;new&nbsp;NamedThreadFactory("DubboSaveRegistryCache",&nbsp;true));&nbsp;&nbsp;&nbsp;&nbsp;//是否异步写入&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;boolean&nbsp;syncSaveFile;&nbsp;&nbsp;&nbsp;&nbsp;//注册数据的版本号&nbsp;防止旧数据覆盖新数据&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;AtomicLong&nbsp;lastCacheChanged&nbsp;=&nbsp;new&nbsp;AtomicLong();&nbsp;&nbsp;&nbsp;&nbsp;//保存Properties失败以后异常重试次数&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;AtomicInteger&nbsp;savePropertiesRetryTimes&nbsp;=&nbsp;new&nbsp;AtomicInteger();&nbsp;&nbsp;&nbsp;&nbsp;//已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;Set<URL>&nbsp;registered&nbsp;=&nbsp;new&nbsp;ConcurrentHashSet<>();&nbsp;&nbsp;&nbsp;&nbsp;//消费者Url订阅的监听器集合&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ConcurrentMap<URL,&nbsp;Set<NotifyListener>>&nbsp;subscribed&nbsp;=&nbsp;new&nbsp;ConcurrentHashMap<>();&nbsp;&nbsp;&nbsp;&nbsp;//费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ConcurrentMap<URL,&nbsp;Map<String,&nbsp;List<URL>>>&nbsp;notified&nbsp;=&nbsp;new&nbsp;ConcurrentHashMap<>();&nbsp;&nbsp;&nbsp;&nbsp;//注册中心URL&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;URL&nbsp;registryUrl;&nbsp;&nbsp;&nbsp;&nbsp;//本地磁盘Properties文件&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;File&nbsp;file;
变更通知

当 Provider 端暴露的 URL 发生变化时,ZooKeeper 等服务发现组件会通知 Consumer 端的 Registry 组件,Registry 组件会调用 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表并写入 properties 集合以及本地文件中。

&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;void&nbsp;notify(List<URL>&nbsp;urls)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(CollectionUtils.isEmpty(urls))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//遍历订阅消费者URL的监听器集合,通知他们&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(Map.Entry<URL,&nbsp;Set<NotifyListener>>&nbsp;entry&nbsp;:&nbsp;getSubscribed().entrySet())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;URL&nbsp;url&nbsp;=&nbsp;entry.getKey();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//匹配&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!UrlUtils.isMatch(url,&nbsp;urls.get(0)))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;continue;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//遍历所有监听器&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Set<NotifyListener>&nbsp;listeners&nbsp;=&nbsp;entry.getValue();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(listeners&nbsp;!=&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(NotifyListener&nbsp;listener&nbsp;:&nbsp;listeners)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//通知监听器,URL变化结果&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;notify(url,&nbsp;listener,&nbsp;filterEmpty(url,&nbsp;urls));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;t)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.error("Failed&nbsp;to&nbsp;notify&nbsp;registry&nbsp;event,&nbsp;urls:&nbsp;"&nbsp;+&nbsp;urls&nbsp;+&nbsp;",&nbsp;cause:&nbsp;"&nbsp;+&nbsp;t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;/**&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*&nbsp;Notify&nbsp;changes&nbsp;from&nbsp;the&nbsp;Provider&nbsp;side.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*&nbsp;@param&nbsp;url&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;consumer&nbsp;side&nbsp;url&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*&nbsp;@param&nbsp;listener&nbsp;listener&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*&nbsp;@param&nbsp;urls&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;provider&nbsp;latest&nbsp;urls&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;*/&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;void&nbsp;notify(URL&nbsp;url,&nbsp;NotifyListener&nbsp;listener,&nbsp;List<URL>&nbsp;urls)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//参数校验&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(url&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw&nbsp;new&nbsp;IllegalArgumentException("notify&nbsp;url&nbsp;==&nbsp;null");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(listener&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw&nbsp;new&nbsp;IllegalArgumentException("notify&nbsp;listener&nbsp;==&nbsp;null");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;((CollectionUtils.isEmpty(urls))&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&&&nbsp;!ANY_VALUE.equals(url.getServiceInterface()))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Ignore&nbsp;empty&nbsp;notify&nbsp;urls&nbsp;for&nbsp;subscribe&nbsp;url&nbsp;"&nbsp;+&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(logger.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info("Notify&nbsp;urls&nbsp;for&nbsp;subscribe&nbsp;url&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;",&nbsp;urls:&nbsp;"&nbsp;+&nbsp;urls);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Map<String,&nbsp;List<URL>>&nbsp;result&nbsp;=&nbsp;new&nbsp;HashMap<>();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(URL&nbsp;u&nbsp;:&nbsp;urls)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(UrlUtils.isMatch(url,&nbsp;u))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;String&nbsp;category&nbsp;=&nbsp;u.getParameter(CATEGORY_KEY,&nbsp;DEFAULT_CATEGORY);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;List<URL>&nbsp;categoryList&nbsp;=&nbsp;result.computeIfAbsent(category,&nbsp;k&nbsp;->&nbsp;new&nbsp;ArrayList<>());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//分类结果放入result&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;categoryList.add(u);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(result.size()&nbsp;==&nbsp;0)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//处理通知监听器URL变化结果&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Map<String,&nbsp;List<URL>>&nbsp;categoryNotified&nbsp;=&nbsp;notified.computeIfAbsent(url,&nbsp;u&nbsp;->&nbsp;new&nbsp;ConcurrentHashMap<>());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(Map.Entry<String,&nbsp;List<URL>>&nbsp;entry&nbsp;:&nbsp;result.entrySet())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;String&nbsp;category&nbsp;=&nbsp;entry.getKey();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;List<URL>&nbsp;categoryList&nbsp;=&nbsp;entry.getValue();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//把分类标实和分类后的列表放入notified的value中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;categoryNotified.put(category,&nbsp;categoryList);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//调用NotifyListener监听器&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;listener.notify(categoryList);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//单个Url变更,并将更改信息同步至内存缓存和磁盘缓存中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;saveProperties(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;void&nbsp;saveProperties(URL&nbsp;url)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(file&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;StringBuilder&nbsp;buf&nbsp;=&nbsp;new&nbsp;StringBuilder();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//从通知列表中取出信息&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Map<String,&nbsp;List<URL>>&nbsp;categoryNotified&nbsp;=&nbsp;notified.get(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//以空格为间隔拼接&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(categoryNotified&nbsp;!=&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(List<URL>&nbsp;us&nbsp;:&nbsp;categoryNotified.values())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(URL&nbsp;u&nbsp;:&nbsp;us)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(buf.length()&nbsp;>&nbsp;0)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;buf.append(URL_SEPARATOR);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;buf.append(u.toFullString());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//推送url至内存缓存&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;properties.setProperty(url.getServiceKey(),&nbsp;buf.toString());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//增加版本号&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;long&nbsp;version&nbsp;=&nbsp;lastCacheChanged.incrementAndGet();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(syncSaveFile)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//如果磁盘文件未被加锁,将内存缓存同步至磁盘缓存&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;doSaveProperties(version);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;else&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//如果被加锁了,使用新的线程去执行,当前线程返回&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;registryCacheExecutor.execute(new&nbsp;SaveProperties(version));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;t)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn(t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}
缓存设计

注册中心有两份缓存,一份在内存,一份在磁盘。该方法的作用是将内存中的缓存数据保存在磁盘文件中,该方法有错误重试,最大重试次数是3,重试采用另一个线程去执行重试,不是当前线程。本地缓存设计相当于是一种容错机制,当网络抖动等原因而导致订阅失败时,Consumer端的Registry就可以通过getCacheUrls()方法获取本地缓存,从而得到最近注册的服务提供者。

&nbsp;&nbsp;&nbsp;&nbsp;//将内存中的文件写到磁盘上&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;doSaveProperties(long&nbsp;version)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//版本号判断&nbsp;防止重复写&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(version&nbsp;<&nbsp;lastCacheChanged.get())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//判断磁盘文件是否为空&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(file&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Save&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//lock文件,用于加锁操作&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;File&nbsp;lockfile&nbsp;=&nbsp;new&nbsp;File(file.getAbsolutePath()&nbsp;+&nbsp;".lock");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!lockfile.exists())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;lockfile.createNewFile();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//RandomAccessFile提供对文件的读写操作&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;(RandomAccessFile&nbsp;raf&nbsp;=&nbsp;new&nbsp;RandomAccessFile(lockfile,&nbsp;"rw");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;FileChannel&nbsp;channel&nbsp;=&nbsp;raf.getChannel())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//获取锁&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;FileLock&nbsp;lock&nbsp;=&nbsp;channel.tryLock();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(lock&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw&nbsp;new&nbsp;IOException("Can&nbsp;not&nbsp;lock&nbsp;the&nbsp;registry&nbsp;cache&nbsp;file&nbsp;"&nbsp;+&nbsp;file.getAbsolutePath()&nbsp;+&nbsp;",&nbsp;ignore&nbsp;and&nbsp;retry&nbsp;later,&nbsp;maybe&nbsp;multi&nbsp;java&nbsp;process&nbsp;use&nbsp;the&nbsp;file,&nbsp;please&nbsp;config:&nbsp;dubbo.registry.file=xxx.properties");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;Save&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!file.exists())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;file.createNewFile();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;(FileOutputStream&nbsp;outputFile&nbsp;=&nbsp;new&nbsp;FileOutputStream(file))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//从内存缓存中获取数据&nbsp;写入文件&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;properties.store(outputFile,&nbsp;"Dubbo&nbsp;Registry&nbsp;Cache");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;finally&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;lock.release();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;e)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//发生异常时,重试次数+1&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;savePropertiesRetryTimes.incrementAndGet();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//重试次数大于抛出异常&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(savePropertiesRetryTimes.get()&nbsp;>=&nbsp;MAX_RETRY_TIMES_SAVE_PROPERTIES)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Failed&nbsp;to&nbsp;save&nbsp;registry&nbsp;cache&nbsp;file&nbsp;after&nbsp;retrying&nbsp;"&nbsp;+&nbsp;MAX_RETRY_TIMES_SAVE_PROPERTIES&nbsp;+&nbsp;"&nbsp;times,&nbsp;cause:&nbsp;"&nbsp;+&nbsp;e.getMessage(),&nbsp;e);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;savePropertiesRetryTimes.set(0);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//再次对比版本信息,如果版本已过期,返回不再处理&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(version&nbsp;<&nbsp;lastCacheChanged.get())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;savePropertiesRetryTimes.set(0);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;else&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//重试线程&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;registryCacheExecutor.execute(new&nbsp;SaveProperties(lastCacheChanged.incrementAndGet()));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Failed&nbsp;to&nbsp;save&nbsp;registry&nbsp;cache&nbsp;file,&nbsp;will&nbsp;retry,&nbsp;cause:&nbsp;"&nbsp;+&nbsp;e.getMessage(),&nbsp;e);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;//磁盘中文件加载到内存中&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;void&nbsp;loadProperties()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(file&nbsp;!=&nbsp;null&nbsp;&&&nbsp;file.exists())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;InputStream&nbsp;in&nbsp;=&nbsp;null;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;in&nbsp;=&nbsp;new&nbsp;FileInputStream(file);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;properties.load(in);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(logger.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info("Load&nbsp;registry&nbsp;cache&nbsp;file&nbsp;"&nbsp;+&nbsp;file&nbsp;+&nbsp;",&nbsp;data:&nbsp;"&nbsp;+&nbsp;properties);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;e)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Failed&nbsp;to&nbsp;load&nbsp;registry&nbsp;cache&nbsp;file&nbsp;"&nbsp;+&nbsp;file,&nbsp;e);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;finally&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(in&nbsp;!=&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;in.close();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(IOException&nbsp;e)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn(e.getMessage(),&nbsp;e);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}
注册/订阅

AbstractRegistry 实现了 Registry 接口,当新节点注册进来时候registry() 方法,会将当前节点要注册的 URL缓存到 registered集合,当节点下线时候, unregistry() 方法会从 registered 集合删除指定的 URL。当消费者新增加一个订阅的时候,subscribe() 方法会将当前节点作为 Consumer 的 URL 以及相关的 NotifyListener 记录到 subscribed 集合,当消费者取消一个订阅的时候,unsubscribe() 方法会将当前节点的 URL 以及关联的 NotifyListener 从 subscribed 集合删除。这四个方法相对比较简单,这里不做展示,此处设计为抽象类,当子类重写的时候可以对其进行增强。

恢复/销毁

当因为网络问题与注册中心断开连接之后,会进行重连,重新连接成功之后,会调用 recover() 方法将 registered 集合中的全部 URL 重新执行register() 方法,恢复注册数据。同样,recover() 方法也会将 subscribed 集合中的 URL 重新执行subscribe() 方法,恢复订阅监听器。
当前节点下线的时候,destroy() 方法会调用 unregister() 方法和 unsubscribe() 方法将当前节点注册的 URL 以及订阅的监听全部清理掉,此外还会销毁本实例。

&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;destroy()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(logger.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info("Destroy&nbsp;registry:"&nbsp;+&nbsp;getUrl());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Set<URL>&nbsp;destroyRegistered&nbsp;=&nbsp;new&nbsp;HashSet<>(getRegistered());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!destroyRegistered.isEmpty())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(URL&nbsp;url&nbsp;:&nbsp;new&nbsp;HashSet<>(destroyRegistered))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(url.getParameter(DYNAMIC_KEY,&nbsp;true))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//取消注册&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;unregister(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(logger.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info("Destroy&nbsp;unregister&nbsp;url&nbsp;"&nbsp;+&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;t)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Failed&nbsp;to&nbsp;unregister&nbsp;url&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;"&nbsp;to&nbsp;registry&nbsp;"&nbsp;+&nbsp;getUrl()&nbsp;+&nbsp;"&nbsp;on&nbsp;destroy,&nbsp;cause:&nbsp;"&nbsp;+&nbsp;t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Map<URL,&nbsp;Set<NotifyListener>>&nbsp;destroySubscribed&nbsp;=&nbsp;new&nbsp;HashMap<>(getSubscribed());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!destroySubscribed.isEmpty())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(Map.Entry<URL,&nbsp;Set<NotifyListener>>&nbsp;entry&nbsp;:&nbsp;destroySubscribed.entrySet())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;URL&nbsp;url&nbsp;=&nbsp;entry.getKey();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;for&nbsp;(NotifyListener&nbsp;listener&nbsp;:&nbsp;entry.getValue())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//取消订阅&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;unsubscribe(url,&nbsp;listener);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(logger.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info("Destroy&nbsp;unsubscribe&nbsp;url&nbsp;"&nbsp;+&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;t)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Failed&nbsp;to&nbsp;unsubscribe&nbsp;url&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;"&nbsp;to&nbsp;registry&nbsp;"&nbsp;+&nbsp;getUrl()&nbsp;+&nbsp;"&nbsp;on&nbsp;destroy,&nbsp;cause:&nbsp;"&nbsp;+&nbsp;t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//移除注册中心&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;AbstractRegistryFactory.removeDestroyedRegistry(this);&nbsp;&nbsp;&nbsp;&nbsp;}

重试机制


关于重试机制,Dubbo将重试机制放在了FailbackRegistry类中,FailbackRegistry 设计思想,重写了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 这五个核心方法,结合时间轮,实现失败重试机制。此外,还添加了四个未实现的抽象模板方法,由其继承者去实现,这里也就是典型的模板类的设计。

核心字段介绍
&nbsp;&nbsp;&nbsp;&nbsp;//注册失败的URL&nbsp;Key是注册失败的&nbsp;URL,Value&nbsp;是对应的重试任务&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ConcurrentMap<URL,&nbsp;FailedRegisteredTask>&nbsp;failedRegistered&nbsp;=&nbsp;new&nbsp;ConcurrentHashMap<URL,&nbsp;FailedRegisteredTask>();&nbsp;&nbsp;&nbsp;&nbsp;//取消注册失败URL&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ConcurrentMap<URL,&nbsp;FailedUnregisteredTask>&nbsp;failedUnregistered&nbsp;=&nbsp;new&nbsp;ConcurrentHashMap<URL,&nbsp;FailedUnregisteredTask>();&nbsp;&nbsp;&nbsp;&nbsp;//订阅失败的URL&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ConcurrentMap<Holder,&nbsp;FailedSubscribedTask>&nbsp;failedSubscribed&nbsp;=&nbsp;new&nbsp;ConcurrentHashMap<Holder,&nbsp;FailedSubscribedTask>();&nbsp;&nbsp;&nbsp;&nbsp;//取消订阅失败的URL&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;ConcurrentMap<Holder,&nbsp;FailedUnsubscribedTask>&nbsp;failedUnsubscribed&nbsp;=&nbsp;new&nbsp;ConcurrentHashMap<Holder,&nbsp;FailedUnsubscribedTask>();&nbsp;&nbsp;&nbsp;&nbsp;//重试的时间间隔&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;int&nbsp;retryPeriod;&nbsp;&nbsp;&nbsp;&nbsp;//用于定时执行失败的时间轮&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;final&nbsp;HashedWheelTimer&nbsp;retryTimer;

构造方法首先会调用父类的构造方法完成本地缓存相关的初始化操作,然后根据传入URL参数中获取重试操作的时间间隔来初始化 retryPeriod 字段,最后初始化 HashedWheelTimer时间轮。

&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;FailbackRegistry(URL&nbsp;url)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//调用&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;super(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;this.retryPeriod&nbsp;=&nbsp;url.getParameter(REGISTRY_RETRY_PERIOD_KEY,&nbsp;DEFAULT_REGISTRY_RETRY_PERIOD);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;retryTimer&nbsp;=&nbsp;new&nbsp;HashedWheelTimer(new&nbsp;NamedThreadFactory("DubboRegistryRetryTimer",&nbsp;true),&nbsp;retryPeriod,&nbsp;TimeUnit.MILLISECONDS,&nbsp;128);&nbsp;&nbsp;&nbsp;&nbsp;}
核心方法
register

register方法重写了父类的注册方法,首先调用父类的register将url加入对应的容器,然后从failedRegistered 和failedUnregistered 两个容器中移除失败URL,然后执行doRegister方法,doRegister是抽象方法,具体的实现交给其继承者,如果注册失败抛出异常,会将URL加入failedRegistered 容器中。

&nbsp;&nbsp;&nbsp;&nbsp;@Override&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;register(URL&nbsp;url)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(!acceptable(url))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info("URL&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;"&nbsp;will&nbsp;not&nbsp;be&nbsp;registered&nbsp;to&nbsp;Registry.&nbsp;Registry&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;"&nbsp;does&nbsp;not&nbsp;accept&nbsp;service&nbsp;of&nbsp;this&nbsp;protocol&nbsp;type.");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//执行父类的方法加入到容器中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;super.register(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//移除注册失败&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;removeFailedRegistered(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//移除取消注册失败&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;removeFailedUnregistered(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//抽象方法交给具体子类去实现&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;doRegister(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Exception&nbsp;e)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Throwable&nbsp;t&nbsp;=&nbsp;e;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;If&nbsp;the&nbsp;startup&nbsp;detection&nbsp;is&nbsp;opened,&nbsp;the&nbsp;Exception&nbsp;is&nbsp;thrown&nbsp;directly.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;boolean&nbsp;check&nbsp;=&nbsp;getUrl().getParameter(Constants.CHECK_KEY,&nbsp;true)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&&&nbsp;url.getParameter(Constants.CHECK_KEY,&nbsp;true)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&&&nbsp;!CONSUMER_PROTOCOL.equals(url.getProtocol());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;boolean&nbsp;skipFailback&nbsp;=&nbsp;t&nbsp;instanceof&nbsp;SkipFailbackWrapperException;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(check&nbsp;||&nbsp;skipFailback)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(skipFailback)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;t&nbsp;=&nbsp;t.getCause();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw&nbsp;new&nbsp;IllegalStateException("Failed&nbsp;to&nbsp;register&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;"&nbsp;to&nbsp;registry&nbsp;"&nbsp;+&nbsp;getUrl().getAddress()&nbsp;+&nbsp;",&nbsp;cause:&nbsp;"&nbsp;+&nbsp;t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;else&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.error("Failed&nbsp;to&nbsp;register&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;",&nbsp;waiting&nbsp;for&nbsp;retry,&nbsp;cause:&nbsp;"&nbsp;+&nbsp;t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//注册发生异常将注册失败放入到注册失败的容器中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;addFailedRegistered(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}

接下来我们看下添加重试任务的方法addFailedRegistered,该方法相对比较简单,核心就是将失败的任务放到容器中,然后将失败的任务加入时间轮等待执行。

&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;void&nbsp;addFailedRegistered(URL&nbsp;url)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//判断容器中是是否存在任务&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;FailedRegisteredTask&nbsp;oldOne&nbsp;=&nbsp;failedRegistered.get(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(oldOne&nbsp;!=&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//将任务添加容器中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;FailedRegisteredTask&nbsp;newTask&nbsp;=&nbsp;new&nbsp;FailedRegisteredTask(url,&nbsp;this);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;oldOne&nbsp;=&nbsp;failedRegistered.putIfAbsent(url,&nbsp;newTask);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(oldOne&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;never&nbsp;has&nbsp;a&nbsp;retry&nbsp;task.&nbsp;then&nbsp;start&nbsp;a&nbsp;new&nbsp;task&nbsp;for&nbsp;retry.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//将任务提交到时间轮中&nbsp;等待retryPeriod秒后执行&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;retryTimer.newTimeout(newTask,&nbsp;retryPeriod,&nbsp;TimeUnit.MILLISECONDS);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}

对于其他unregister()、subscribe()、unsubscribe() 都与register()类似这里就不做过多介绍,简单看下提供几个抽象的方法。

&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;abstract&nbsp;void&nbsp;doRegister(URL&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;abstract&nbsp;void&nbsp;doUnregister(URL&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;abstract&nbsp;void&nbsp;doSubscribe(URL&nbsp;url,&nbsp;NotifyListener&nbsp;listener);&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;abstract&nbsp;void&nbsp;doUnsubscribe(URL&nbsp;url,&nbsp;NotifyListener&nbsp;listener);
重试任务

addFailedRegistered方法中创建FailedRegisteredTask以及其他重试任务,都是继承AbstractRetryTask,接下来我们要来关于AbstractRetryTask的设计和实现。

在AbstractRetryTask抽象类中,有一个核心run方法实现已经一个抽象方法,该抽象方法也是模板类作用。

&nbsp;@Override&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;run(Timeout&nbsp;timeout)&nbsp;throws&nbsp;Exception&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//检查定义任务状态以及时间轮状态&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(timeout.isCancelled()&nbsp;||&nbsp;timeout.timer().isStop()&nbsp;||&nbsp;isCancel())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;other&nbsp;thread&nbsp;cancel&nbsp;this&nbsp;timeout&nbsp;or&nbsp;stop&nbsp;the&nbsp;timer.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//检查重试次数&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(times&nbsp;>&nbsp;retryTimes)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;reach&nbsp;the&nbsp;most&nbsp;times&nbsp;of&nbsp;retry.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Final&nbsp;failed&nbsp;to&nbsp;execute&nbsp;task&nbsp;"&nbsp;+&nbsp;taskName&nbsp;+&nbsp;",&nbsp;url:&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;",&nbsp;retry&nbsp;"&nbsp;+&nbsp;retryTimes&nbsp;+&nbsp;"&nbsp;times.");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(logger.isInfoEnabled())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.info(taskName&nbsp;+&nbsp;"&nbsp;:&nbsp;"&nbsp;+&nbsp;url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//执行重试&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;doRetry(url,&nbsp;registry,&nbsp;timeout);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;t)&nbsp;{&nbsp;//&nbsp;Ignore&nbsp;all&nbsp;the&nbsp;exceptions&nbsp;and&nbsp;wait&nbsp;for&nbsp;the&nbsp;next&nbsp;retry&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;logger.warn("Failed&nbsp;to&nbsp;execute&nbsp;task&nbsp;"&nbsp;+&nbsp;taskName&nbsp;+&nbsp;",&nbsp;url:&nbsp;"&nbsp;+&nbsp;url&nbsp;+&nbsp;",&nbsp;waiting&nbsp;for&nbsp;again,&nbsp;cause:"&nbsp;+&nbsp;t.getMessage(),&nbsp;t);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;reput&nbsp;this&nbsp;task&nbsp;when&nbsp;catch&nbsp;exception.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//执行异常&nbsp;则重新等待&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;reput(timeout,&nbsp;retryPeriod);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;void&nbsp;reput(Timeout&nbsp;timeout,&nbsp;long&nbsp;tick)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//边界值检查&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(timeout&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw&nbsp;new&nbsp;IllegalArgumentException();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//检查定时任务&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Timer&nbsp;timer&nbsp;=&nbsp;timeout.timer();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(timer.isStop()&nbsp;||&nbsp;timeout.isCancelled()&nbsp;||&nbsp;isCancel())&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//递增times&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;times++;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//添加下次定时任务&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;timer.newTimeout(timeout.task(),&nbsp;tick,&nbsp;TimeUnit.MILLISECONDS);&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;abstract&nbsp;void&nbsp;doRetry(URL&nbsp;url,&nbsp;FailbackRegistry&nbsp;registry,&nbsp;Timeout&nbsp;timeout);

接下来我们看下FailedRegisteredTask对AbstractRetryTask�的实现,子类doRetry方法会执行关联Registry的doRegister() 方法,完成与服务发现组件交互。如果注册成功,则会调用removeFailedRegisteredTask()方法将当前关联的 URL 以及当前重试任务从 failedRegistered集合中删除。如果注册失败,则会抛出异常,执行AbstractRetryTask的reput()方法重试。

&nbsp;&nbsp;&nbsp;&nbsp;@Override&nbsp;&nbsp;&nbsp;&nbsp;protected&nbsp;void&nbsp;doRetry(URL&nbsp;url,&nbsp;FailbackRegistry&nbsp;registry,&nbsp;Timeout&nbsp;timeout)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;重新注册&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;registry.doRegister(url);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;删除注册任务&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;registry.removeFailedRegisteredTask(url);&nbsp;&nbsp;&nbsp;&nbsp;}

未完待续

下一篇会简单介绍一下时间轮、ZK对注册中心的实现以及在,欢迎大家点点关注,点点赞!