dubbo服务引用与集群容错
阅读原文时间:2024年08月30日阅读:1

服务引用无非就是做了两件事

  • 将spring的schemas标签信息转换bean,然后通过这个bean的信息,连接、订阅zookeeper节点信息创建一个invoker

  • invoker的信息创建一个动态代理对象

时序图:

最终返回一个被调用接口的动态代理对象。

在调用代理对象的方法时,会进入InvokerInvocationHandle类的逻辑。

跟踪源码的时候,发现消费端调用invoke的时候要调用一连串的Invoker实现类,一直纠结这些Invoker是用来做什么的?

Invoker的创建应该是入口,也就是从referenceConfig类开始

然后找到RegistryProtocol.doRefer方法

private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
RegistryDirectory directory = new RegistryDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}

directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY\_KEY,  
        Constants.PROVIDERS\_CATEGORY  
        + "," + Constants.CONFIGURATORS\_CATEGORY  
        + "," + Constants.ROUTERS\_CATEGORY));

return cluster.join(directory);  

}

也就是这一行:
cluster.join(directory);

在执行join方法的时候,会通过SPI机制找到cluster的扩展实例,默认的时候FailoverCluster
但是调试发现第一步创建的实例化对象是MockClusterWrapper类而不是FailoverCluster

查阅资料 dubbo中的mock机制 再结合源码总结如下:

在dubbo的配置文件  classpath:/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.cluster.Cluster中,failover对应的是FailoverCluster类:
但是ExtensionLoader在实例化对象时,有个比较特殊的地方,那就是在实例化完成之后,会自动套上当前的ExtensionLoader中的Wrapper类,上面的mock所对应的MockClusterWrapper就是这样的一个Wrapper:也就是实例化出来的FailoverCluster会被套上一层MockClusterWrapper,总结一下就是:

Cluster$Adaptive -> 定位到内部key为failover的对象 ->FailoverCluster->外部套上MockClusterWrapper

所以时序图是这样的:

官网集群容错介绍图:

根据以上时序图查看源码如下:

MockClusterInvoker.java

public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK\_KEY, Boolean.FALSE.toString()).trim();  
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {  
        //no mock  
        //执行到这一行的时候开始进入集群 cluster -> AbstractClusterInvoker  
        result = this.invoker.invoke(invocation);  
    } else if (value.startsWith("force")) {  
        if (logger.isWarnEnabled()) {  
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());  
        }  
        //force:direct mock  
        result = doMockInvoke(invocation, null);  
    } else {  
        //fail-mock  
        try {  
            result = this.invoker.invoke(invocation);  
        } catch (RpcException e) {  
            if (e.isBiz()) {  
                throw e;  
            } else {  
                if (logger.isWarnEnabled()) {  
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);  
                }  
                result = doMockInvoke(invocation, e);  
            }  
        }  
    }  
    return result;  
}

AbstractClusterInvoker.java

public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();

    // binding attachments into invocation.  
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();  
    if (contextAttachments != null && contextAttachments.size() != 0) {  
        ((RpcInvocation) invocation).addAttachments(contextAttachments);  
    }

     //选择出可用的invoker集合  
    List<Invoker<T>> invokers = list(invocation);  
    // 初始化负载均衡策略  
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);  
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);  
    return doInvoke(invocation, invokers, loadbalance);  
}

protected List> list(Invocation invocation) throws RpcException {
// -> AbstractDirectory.java
return directory.list(invocation);
}

AbstractDirectory.java

public List> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}

    //  模板方法,由子类实现  
    // -> RegistryDirectory.java 或者 StaticDirectory.java   
    List<Invoker<T>> invokers = doList(invocation);  
    List<Router> localRouters = this.routers; // local reference  
    if (localRouters != null && !localRouters.isEmpty()) {  
        for (Router router : localRouters) {  
            try {  
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME\_KEY, false)) {  
                     //将invokers返回后,下面来到了Router,开始进入路由,现在我们到了序号6,此时到了MockInvokersSelector类,  
                     //他是Router接口的实现类,从官网的介绍图中我们也可以看到Router分为Script和Condition两种,翻译过来也就是脚本路由和条件路由  
                    invokers = router.route(invokers, getConsumerUrl(), invocation);  
                }  
            } catch (Throwable t) {  
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);  
            }  
        }  
    }  
    return invokers;  
}

RegistryDirectory.java

public List> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List> invokers = null;
Map>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList>(0) : invokers;
}

MockInvokersSelector.java

public List> route(final List> invokers,
URL url, final Invocation invocation) throws RpcException {
if (invocation.getAttachments() == null) {
return getNormalInvokers(invokers);
} else {
String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
if (value == null) {
//拿到能正常执行的invokers,并将其返回
return getNormalInvokers(invokers);
} else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
return getMockedInvokers(invokers);
}
}
return invokers;
}

private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {  
    if (!hasMockProviders(invokers)) {  
        return invokers;  
    } else {  
        List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());  
        for (Invoker<T> invoker : invokers) {  
            if (!invoker.getUrl().getProtocol().equals(Constants.MOCK\_PROTOCOL)) {  
                sInvokers.add(invoker);  
            }  
        }  
        return sInvokers;  
    }  
}

上面出现的这两个关键词,其实无非就是做两件事

在Directory中找出本次集群中的全部invokers
在Router中,将上一步的全部invokers挑选出能正常执行的invokers

回到AbstractClusterInvoker.java

  ......  
    //选择出可用的invoker集合  
    List<Invoker<T>> invokers = list(invocation);  
     // 初始化负载均衡策略  
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);  
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);  
    return doInvoke(invocation, invokers, loadbalance);

从上面步骤已经挑选出能正常执行的invokers了,但是假如2个做集群,但是这两个都是正常的,到底要执行哪一个呢?

 根据官网的描述

在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。

所以这个时候是到了FailoverClusterInvoker类,但是如果你配置的是Failfast Cluster(快速失败),Failsafe Cluster(失败安全),Failback Cluster(失败自动恢复),Forking Cluster(并行调用多个服务器,只要一个成功即返回),Broadcast Cluster(广播调用所有提供者,逐个调用,任意一台报错则报错)他也会到达相应的类

FailoverClusterInvoker.java

public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
List> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers.
Set providers = new HashSet(len);
for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
// 通过负载均衡算法选择一个Invoker,然后调用
Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {  
    if (invokers == null || invokers.isEmpty()) {  
        return null;  
    }  
    String methodName = invocation == null ? "" : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER\_STICKY\_KEY, Constants.DEFAULT\_CLUSTER\_STICKY);  
    {  
        //ignore overloaded method  
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {  
            stickyInvoker = null;  
        }  
        //ignore concurrency problem  
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {  
            if (availablecheck && stickyInvoker.isAvailable()) {  
                return stickyInvoker;  
            }  
        }  
    }  
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    if (sticky) {  
        stickyInvoker = invoker;  
    }  
    return invoker;  
}

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {  
    if (invokers == null || invokers.isEmpty())  
        return null;  
    if (invokers.size() == 1)  
        return invokers.get(0);  
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the \`invoker\` is in the  \`selected\` or invoker is unavailable && availablecheck is true, reselect.  
    if ((selected != null && selected.contains(invoker))  
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {  
        try {  
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);  
            if (rinvoker != null) {  
                invoker = rinvoker;  
            } else {  
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.  
                int index = invokers.indexOf(invoker);  
                try {  
                    //Avoid collision  
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);  
                } catch (Exception e) {  
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);  
                }  
            }  
        } catch (Throwable t) {  
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);  
        }  
    }  
    return invoker;  
}

在这个集群容错的整体架构过程中,dubbo其实也就是三件事

1.在Directory中找出本次集群中的全部invokers

2.在Router中,将上一步的全部invokers挑选出能正常执行的invokers

3.在LoadBalance中,将上一步的能正常的执行invokers中,根据配置的负载均衡策略,挑选出需要执行的invoker

参考  肥朝 dubbo源码解析-集群容错架构设计