rpc中的高并发
阅读原文时间:2023年07月10日阅读:1

手写一个高性能的rpc框架

模拟服务端,运行main函数,相当于启动服务器

public class ServerApplication {
public static void main(String[] args) throws Exception {

    //开启服务端,然后等待客户端发出请求,然后给客户端响应数据,但如果这些操作都写在,会使代码不好维护,因此,将其抽象化,抽象出  
    //一个工厂类,专门来做这样的操作。  
    //因此需要一个专门来处理提供者类的工厂类,此类需要提供一个初始化方法、一个开始/停止服务的方法、(如果要加注册中心的话还需要一个添加服务的方法)

}  

}

提供者工厂类

public class XxlRpcProviderFactory {
//初始化方法(相当于一个构造器)里面的参数,就是开启服务需要的参数

/\*\*  
 \*  
 \* @param netType      网络通信方式  
 \* @param serializer   序列化方式  
 \* @param ip            ip地址  
 \* @param port          端口号  
 \* @param accessToken  接收token(如果有)  
 \* @param serviceRegistryClass 注册中心类  (可先不要)  
 \* @param serviceRegistryParam 注册中心参数  (可先不要)  
 \*/  
public void init(){

}

public void start(){  
    //这里使用回调的方式来完成开始和停止的操作  
    //首先需要一个Server类来具体管理服务端的数据通信,而使用回调函数,可以设置一个类似事件一样的功能  
    //当Server开启时需要执行注册的操作,当关闭的时候需要移除注册的操作  
}

public void stop(){  
    //在这里将Server停掉  
}

}

Server抽象类,延迟实现类(功能扩展)

public abstract class Server {

//Server应该是一个抽象类,因为具体的通信可能不止一个。  
//该类定义了XxlRpcProviderFactory需要的两个方法,开始、停止,但没有具体实现,具体实现延时到实现类中,比如NettyServer类  
public abstract void start();

public abstract void st();  

}

回调功能的实现

回调是一种思想,首先提供一个回调的抽象类,提供一个执行的抽象方法,但不去实现。然后设置一个条件,当满足了这个条件之后就执行某项操作,这就是回调。

需求:当Server执行了start方法的时候(条件),就要去注册服务;当Server执行了stop方法的时候(条件),就移除注册服务。

实现思路:

从需求上看,当Server的实现类执行start方法后,需要执行一个方法,这个方法可以去注册服务,但具体的注册服务代码不能写在实现类里面,实现类中只提供一个判断的功能,如果需要去回调就执行回调,不需要回调就不去回调。但这个显然和实现类负责具体的通信代码功能不相符合,因此,判断条件的方法可以写在Server抽象类中,并继承给每一个实现类。

具体判断逻辑就是看回调类是否实现。回调类应该是一个接口,只提供一个运行的方法,当回调类被实现了,那么回调函数一定需要去执行。从这个逻辑上看,真正执行回调函数的类就是实现了回调接口的实现类

BaseCallback

public abstract class BaseCallback {
public abstract void run();
}

Server类中定义判断条件方法

public abstract class Server {

//Server应该是一个抽象类,因为具体的通信可能不止一个。  
//该类定义了XxlRpcProviderFactory需要的两个方法,开始、停止,但没有具体实现,具体实现延时到实现类中,比如NettyServer类  
public abstract void start();

public abstract void stop();

//需要一个传来的BaseCallback类,来判断该类是否被实例化  
private BaseCallback startedCallback;  
private BaseCallback stopedCallback;

//需要从外部将该参数传进来,而且根据逻辑。需要连个传值的方法。  
public void setStartedCallback(BaseCallback baseCallback){  
    this.startedCallback=baseCallback;  
}  
public void setStopdCallback(BaseCallback baseCallback){  
    this.stopedCallback=baseCallback;  
}

//以下方法就是条件判断的方法,判断条件就是baseCallback是否在外面被实例化了  
public void onStart(){  
    if(startedCallback!=null){  
        //说明start方法需要去注册服务了  
        startedCallback.run();  
    }  
}

public void onStop(){  
    if(stopedCallback!=null){  
        //说明start方法需要去注册服务了  
        stopedCallback.run();  
    }  
}  

}

Server

在工厂类中,去实现回调run方法

public class XxlRpcProviderFactory {
//初始化方法(相当于一个构造器)里面的参数,就是开启服务需要的参数

/\*\*  
 \*  
 \* @param netType      网络通信方式  
 \* @param serializer   序列化方式  
 \* @param ip            ip地址  
 \* @param port          端口号  
 \* @param accessToken  接收token(如果有)  
 \* @param serviceRegistryClass 注册中心类  (可先不要)  
 \* @param serviceRegistryParam 注册中心参数  (可先不要)  
 \*/  
public void init(){

}

//先提供一个Server类,然后使用该类的各种方法,但该类是一个抽象类,具体的实现类需要使用newstance创建  
private Server server;

public void start(){  
    //这里使用回调的方式来完成开始和停止的操作  
    //首先需要一个Server类来具体管理服务端的数据通信,而使用回调函数,可以设置一个类似事件一样的功能  
    //当Server开启时需要执行注册的操作,当关闭的时候需要移除注册的操作

    //先将回调函数实例化,当后面执行的时候就会按条件执行了  
    server.setStartedCallback(new BaseCallback() {  
        @Override  
        public void run() {  
            //执行注册的功能  
        }  
    });  
    server.setStopdCallback(new BaseCallback() {  
        @Override  
        public void run() {  
            //执行移除注册的功能  
        }  
    });  
    //开启服务端的通信  
    server.start();  
}

public void stop(){  
    //在这里将Server停掉  
    server.stop();  
}

}

XxlRpcProviderFactory


以上就是服务端的总体框架,接下来,只要提供了服务端的底层通信实现,服务端就搭建完成了。

下面开始实现Server类,使用netty通信模块,采用NIO模型,传输数据是异步的,当然,在服务端几乎不用考虑异步同步问题,当接收到请求之后,就返回所请求的对象。

为了更好的管理客户端传到服务器上的请求,将处理请求的操作启动不同的线程来管理,这需要一个线程池。

ThreadPoolExecutor类的获取(说明都在代码里)

public class ThreadPoolUtil {

//该类的作用就是返回一个线程池的实现类 ThreadPoolExecutor,包括规定池子的一些参数以及特性  
public static ThreadPoolExecutor makeServerThreadPool(final String serverType){

    //serverHandlerPool服务端处理线程池,将所有请求的处理线程都放到池子里面去  
    //一个线程池的实例化过程就是这样,规定了池子的大小,存活时间,工作队列,线程创建工厂给线程命名,池子满了之后的处理  
    ThreadPoolExecutor serverHandlerPool =new ThreadPoolExecutor(  
            60,  
            300,  
            60L,  
            TimeUnit.SECONDS,  
            new LinkedBlockingQueue<Runnable>(1000),  
            new ThreadFactory() {  
                @Override  
                public Thread newThread(Runnable r) {  
                    return new Thread(r, "xxl-rpc, " + serverType + "-serverHandlerPool-" + r.hashCode());  
                }  
            },  
            new RejectedExecutionHandler() {  
                @Override  
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
                    throw new RuntimeException("xxl-rpc "+serverType+" Thread pool is EXHAUSTED!");  
                }  
            }  
    );  
    return serverHandlerPool;

}  

}

ThreadPoolUtil

有了上面的工具类,就可以管理每次请求处理的线程了。

现在工厂类中写一个将request转换为response的方法

public class XxlRpcProviderFactory {

private NetEnum netType;  
private Serializer serializer;

private String ip;                    // for registry  
private int port;                    // default port  
private String accessToken;

private Class<? extends ServiceRegistry> serviceRegistryClass;  
private Map<String, String> serviceRegistryParam;

//初始化方法(相当于一个构造器)里面的参数,就是开启服务需要的参数

/\*\*  
 \*  
 \* @param netType      网络通信方式  
 \* @param serializer   序列化方式  
 \* @param ip            ip地址  
 \* @param port          端口号  
 \* @param accessToken  接收token(如果有)  
 \* @param serviceRegistryClass 注册中心类  (可先不要)  
 \* @param serviceRegistryParam 注册中心参数  (可先不要)  
 \*/  
public void init(NetEnum netType,  
                 Serializer serializer,  
                 String ip,  
                 int port,  
                 String accessToken,  
                 Class<? extends ServiceRegistry> serviceRegistryClass,  
                 Map<String, String> serviceRegistryParam){

}

//先提供一个Server类,然后使用该类的各种方法,但该类是一个抽象类,具体的实现类需要使用newstance创建  
private Server server;

public void start(){  
    //这里使用回调的方式来完成开始和停止的操作  
    //首先需要一个Server类来具体管理服务端的数据通信,而使用回调函数,可以设置一个类似事件一样的功能  
    //当Server开启时需要执行注册的操作,当关闭的时候需要移除注册的操作

    //先将回调函数实例化,当后面执行的时候就会按条件执行了  
    server.setStartedCallback(new BaseCallback() {  
        @Override  
        public void run() {  
            //执行注册的功能  
        }  
    });  
    server.setStopdCallback(new BaseCallback() {  
        @Override  
        public void run() {  
            //执行移除注册的功能  
        }  
    });  
    //开启服务端的通信  
    server.start();  
}

public void stop(){  
    //在这里将Server停掉  
    server.stop();  
}

//存放bean对象的map集合  
private Map<String,Object> serviceData=new HashMap<>();

public Map<String, Object> getServiceData() {  
    return serviceData;  
}  
//这里是将servicebean添加到map中的方法,在application中被调用,并传给一个实例化好的类  
public void addService(String iface, String version, Object serviceBean){  
    String serviceKey = makeServiceKey(iface, version);  
    serviceData.put(serviceKey, serviceBean);

}

//建立一个制作key值的方法,根据接口名和版本号就可以得到key值 形式为iface#version  
public static String makeServiceKey(String iface, String version){  
    String serviceKey = iface;  
    if (version!=null && version.trim().length()>0) {  
        serviceKey += "#".concat(version);  
    }  
    return serviceKey;  
}

//将handle类中的转换方法放在这里实现  
public XxlRpcResponse invokeService(XxlRpcRequest request){

    XxlRpcResponse response=new XxlRpcResponse();  
    //设置response对象  
    response.setRequestId(request.getRequestId());

    //这里将之前请求过的bean放在一个map集合中,使用key值来取,可以避免重复的进行反射的操作  
    //因此,在上面需要定义一个集合来存放得到的bean对象  
    //根据上面定义的方法和map集合,先从map中取,如果取不到,说明请求的接口有问题  
    String key = makeServiceKey(request.getClassName(), request.getVersion());  
    Object serviceBean = serviceData.get(key);

    //对得到的bean以及时间和token进行判断,来抛出响应的错误  
    //没有发现这个bean  
    if(serviceBean==null){  
        response.setErrorMsg("The serviceKey\["+ key +"\] not found.");  
        return response;  
    }  
    //判断是否超时  
    if (System.currentTimeMillis() - request.getCreateMillisTime() > 3\*60\*1000) {  
        response.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");  
        return response;  
    }  
    //判断请求的token和响应的token是否一致  
    if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) {  
        response.setErrorMsg("The access token\[" + request.getAccessToken() + "\] is wrong.");  
        return response;  
    }  
    //接下来,就将请求的方法执行得到的结果返回,利用反射  
    Class<?> serviceBeanClass = serviceBean.getClass();  
    String methodName = request.getMethodName();  
    Class<?>\[\] parameterTypes = request.getParameterTypes();  
    Object\[\] parameters = request.getParameters();

    //反射得到类的方法  
    try {  
        Method method = serviceBeanClass.getMethod(methodName, parameterTypes);  
        method.setAccessible(true);  
        Object result = method.invoke(serviceBean, parameters);

        response.setResult(result);

    } catch (Throwable t) {  
        // catch error  
        response.setErrorMsg("");  
    }  
    return response;

}

}

XxlRpcProviderFactory

然后就可以在handle类中调用了

public class NettyServerHandler extends SimpleChannelInboundHandler{

//请求传来,首先传到这个类中,然后进行处理  
//XxlRpcRequest类是封装了请求的客户端请求的类,这里在NettyServer中添加了编解码就可以直接传输这个类了。

//向Handle类中传入线程池类,然后执行  
private ThreadPoolExecutor serverHandlerPool;  
//需要传入一个工厂类,来调用转换代码  
private XxlRpcProviderFactory providerFactory;

//提供构造方法传值  
public NettyServerHandler(final XxlRpcProviderFactory providerFactory,final ThreadPoolExecutor serverHandlerPool){  
    this.providerFactory=providerFactory;  
    this.serverHandlerPool=serverHandlerPool;  
}

//netty通信,从客户端传来的值会直接到这里,request  
@Override  
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final XxlRpcRequest request) throws Exception {

    serverHandlerPool.execute(new Runnable() {  
        @Override  
        public void run() {  
            //在线程池的某个线程中执行调用函数,将请求转换为响应。这样做是为了在逻辑上分开,netty的线程是处理netty的IO操作,  
            // 而这个线程就专门处理请求转换。  
            //这个调用代码直接在这里实现,使代码有点繁琐,将其放到工厂类中调用。  
            XxlRpcResponse response=providerFactory.invokeService(request);  
            channelHandlerContext.writeAndFlush(response);  
        }  
    });  
}

//异常处理,就是关掉上下文  
@Override  
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
    ctx.close();  
}  

}

NettyServerHandler

然后将handle类加到具体nettyserver类的pipeline里面,就可以传输了

public class NettyServer extends Server{

//Server的实现类,底层使用的是Netty通信

//开启通信的方法,这里专门开辟一个线程开实现开启功能  
private Thread thread;  
@Override  
public void start(final XxlRpcProviderFactory factory) {  
    thread=new Thread(new Runnable() {  
        @Override  
        public void run() {  
            //这里就是具体的netty通信的服务端代码  
            //首先创建一个线程池对象,要传递给处理类  
            final ThreadPoolExecutor pool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());  
            //创建两个线程组  
            EventLoopGroup bossGroup = new NioEventLoopGroup();  
            EventLoopGroup workerGroup = new NioEventLoopGroup();

            // start server  
            ServerBootstrap bootstrap = new ServerBootstrap();  
            bootstrap.group(bossGroup, workerGroup)  
                    .channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        public void initChannel(SocketChannel channel) throws Exception {  
                            channel.pipeline()  
                                    .addLast(new NettyDecoder(XxlRpcRequest.class, factory.getSerializer()))  
                                    .addLast(new NettyEncoder(XxlRpcResponse.class, factory.getSerializer()))  
                                    .addLast(new NettyServerHandler(factory, pool));  
                        }  
                    })  
                    .childOption(ChannelOption.TCP\_NODELAY, true)  
                    .childOption(ChannelOption.SO\_KEEPALIVE, true);

            // bind  
            try {  
                ChannelFuture future = bootstrap.bind(factory.getPort()).sync();

                //这里是回调函数的执行体  
                onStart();

                //等待停止  
                future.channel().closeFuture().sync();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
            finally {  
                //关闭池子  
                pool.shutdown();  
                workerGroup.shutdownGracefully();  
                bossGroup.shutdownGracefully();  
            }  
        }  
    });  
    //将该线程设置为守护线程  
    thread.setDaemon(true);  
    thread.start();

}

@Override  
public void stop() {  

// destroy server thread
//如果线程没有关掉,但需要关掉,则执行线程的中端
if (thread != null && thread.isAlive()) {
thread.interrupt();
} // on stop
onStop();
}
}

NettyServer

以上就实现了一个rpc的服务端。


下面来实现客户端,客户端相比服务端要复杂很多,服务端仅仅是开启一个线程,然后静静等待,当有请求来了,从线程池中取一个线程来执行请求逻辑,返回一个响应对象。

客户端比较复杂,其中发送请求和得到响应的过程就有四种调用方式,这四种调用方式也是最难理解的部分

首先,客户端需要一个模拟的客户端来完成服务的请求

ClientApplication类,模拟客户端

public class ClientApplication {

//1.测试同步发送请求
@Test
public void syncTest(){
//首先提供一个代理类,通过动态代理,得到我们所需的接口方法的返回值(从服务端的转换方法可以知道,得到了方法的名称和参数后,返回的是方法的返回值),
//动态代理的作用就是在客户端使用哪个方法时,动态的请求哪个方法的返回值。
//这个动态代理封装在一个geObject方法里,至于如何得到,有四种调用方式,请求的时候就传入。
//创建一个参考的bean类来实现这个方法。

}

//2.测试异步发送请求  
@Test  
public void futureTest() throws ExecutionException, InterruptedException {

}

//3.测试回调发送请求  
@Test  
public void callbackTest() throws InterruptedException {  
}

//4,测试单一长连接发送请求  
@Test  
public void onewayTest() throws InterruptedException {

}  

}

ClientApplication

因为有四种调用方式,我们在请求服务之初就将四种调用方式,通过枚举定义在一个类中

public enum CallType {

//在判断的时候,可以按照其名字,去选择不同的调用方式

SYNC,

FUTURE,

CALLBACK,

ONEWAY;  

}

定义参考Bean,创建getObject方法

步骤1:得到所需的必须要参数

//该类就封装了动态代理的getObject方法
//首先需要传入必要的参数

private NetEnum netType;  
private Serializer serializer;  
private CallType callType;  
private LoadBalance loadBalance;

private Class<?> iface;  
private String version;

private long timeout = 1000;

private String address;  
private String accessToken;

private XxlRpcInvokeCallback invokeCallback;

private XxlRpcInvokerFactory invokerFactory;

/\*\*  
 \*  
 \* @param netType       使用的底层通信模块  
 \* @param serializer    序列化方式  
 \* @param callType      调用方式,有四种  
 \* @param loadBalance   负载均衡的方式  
 \* @param iface          接口名  
 \* @param version        版本  
 \* @param timeout        超时时间  
 \* @param address        请求地址  
 \* @param accessToken    接入token  
 \* @param invokeCallback    回调的调用(可无)  
 \* @param invokerFactory    调用工厂(可无)  
 \*/  
public XxlRpcReferenceBean(NetEnum netType,  
                           Serializer serializer,  
                           CallType callType,  
                           LoadBalance loadBalance,  
                           Class<?> iface,  
                           String version,  
                           long timeout,  
                           String address,  
                           String accessToken,  
                           XxlRpcInvokeCallback invokeCallback,  
                           XxlRpcInvokerFactory invokerFactory  
){  
    this.netType = netType;  
    this.serializer = serializer;  
    this.callType = callType;  
    this.loadBalance = loadBalance;  
    this.iface = iface;  
    this.version = version;  
    this.timeout = timeout;  
    this.address = address;  
    this.accessToken = accessToken;  
    this.invokeCallback = invokeCallback;  
    this.invokerFactory = invokerFactory;

    // valid  
    if (this.netType==null) {  
        throw new XxlRpcException("xxl-rpc reference netType missing.");  
    }  
    if (this.serializer==null) {  
        throw new XxlRpcException("xxl-rpc reference serializer missing.");  
    }  
    if (this.callType==null) {  
        throw new XxlRpcException("xxl-rpc reference callType missing.");  
    }  
    if (this.loadBalance==null) {  
        throw new XxlRpcException("xxl-rpc reference loadBalance missing.");  
    }  
    if (this.iface==null) {  
        throw new XxlRpcException("xxl-rpc reference iface missing.");  
    }  
    if (this.timeout < 0) {  
        this.timeout = 0;  
    }  
    if (this.invokerFactory == null) {  
        this.invokerFactory = XxlRpcInvokerFactory.getInstance();  
    }

    // init Client  

// initClient();
}

步骤2:动态代理的准备函数

//利用动态代理,得到请求方法返回的参数
public Object getOnject(){

    //直接返回动态代理的结果  
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),    //这里使用的类加载器是当前线程的类加载器  
            new Class\[\]{iface},                  //指明需要代理的类的接口名称  
            new InvocationHandler() {             //建立一个内部类,相当于继承了InvocationHandler接口,这里写的需要代理的内容  
                @Override  
                public Object invoke(Object proxy, Method method, Object\[\] args) throws Throwable {  

}
}

步骤3:得到resquest所需要的参数

//先得到要请求的类和方法的全部信息,通过这些信息封装成request发送给服务端
String className = method.getDeclaringClass().getName();//类名,包括包名
String varsion_ = version;//版本信息
String methodName = method.getName();//方法名称
Class[] parameterTypes = method.getParameterTypes();//方法参数的类型
Object[] parameters = args;//方法的参数

                    //得到请求地址,  
                    //可能参数中已经传递了请求地址,那么请求就按传递的地址请求,也有可能没有传递请求地址,  
                    // 但注册中心可能已经为该请求的类准备了很多的请求地址,那么就需要用到负载均衡

                    String finalAddress=address;  
                    if(finalAddress==null ||finalAddress.trim().length()==0){  
                        //没有传递过来地址,则需要到注册中心去找  
                        //这里需要创建一个工厂类,提供一些需要的方法,包括生成key值,获取地址等函数  
                        if(invokeCallback!=null || invokerFactory.getServiceRegistry()!=null){  
                            //如果这个工厂类不为空,而且通过工厂类能得到注册中心,那么就可以取地址了  
                            //得到存储的key值  
                            String key = XxlRpcProviderFactory.makeServiceKey(className, varsion\_);  
                            //得到的地址是一个set集合  
                            TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key);

                            if (addressSet==null || addressSet.size()==0) {   //没有得到请求地址  
                                // pass  
                            } else if (addressSet.size()==1) {  //得到一个请求地址,那么就是这一个  
                                finalAddress = addressSet.first();  
                            } else {   //得到很多请求地址,则需要负载均衡  负载均衡的代码之后再说  
                                finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet);  
                            }  
                        }  
                    }  
                    //如果最终还是没有得到请求地址,则抛出异常  
                    if (finalAddress==null || finalAddress.trim().length()==0) {  
                        throw new XxlRpcException("xxl-rpc reference bean\["+ className +"\] address empty");  
                    }

                    //现在就可以封装request了  
                    XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();  
                    xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//随机的request的id,根据这个唯一的id可以在同步调用时候拿到正确的值  
                    xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//创建时间,传到服务端与那里的时间相比,如果大于超时时间则抛出异常  
                    xxlRpcRequest.setAccessToken(accessToken);//token值如果有,会与服务端的对比  
                    xxlRpcRequest.setClassName(className);//请求类名  
                    xxlRpcRequest.setMethodName(methodName);//方法名  
                    xxlRpcRequest.setParameterTypes(parameterTypes);  
                    xxlRpcRequest.setParameters(parameters);

步骤3:同步调用的实现

//如果是使用同步调用,在netty底层的调用是异步的,异步的意思就是,如果发起多个请求,那么会无法判断返回的响应是否是请求所得
// 因此,需要在请求发送过去之后,阻塞发送请求,直到请求得到响应之后,才能继续发送请求。要实现这样的功能,要用到

//future的get方法。

netty底层的nio模型,客户端只管发,而服务端只管收,然后响应。但具体请求和响应之间不是同步的,因此会出现如法获取到真实的响应的情况。

比如:以下就是异步返回的值:

null
null
UserDTO{name='liu1', word='hihihi'}
UserDTO{name='liu3', word='hihihi'}
UserDTO{name='liu4', word='hihihi'}
UserDTO{name='liu4', word='hihihi'}
UserDTO{name='liu4', word='hihihi'}
UserDTO{name='liu5', word='hihihi'}
UserDTO{name='liu6', word='hihihi'}
UserDTO{name='liu7', word='hihihi'}
UserDTO{name='liu7', word='hihihi'}
UserDTO{name='liu9', word='hihihi'}
UserDTO{name='liu11', word='hihihi'}
UserDTO{name='liu12', word='hihihi'}
UserDTO{name='liu13', word='hihihi'}
UserDTO{name='liu15', word='hihihi'}
UserDTO{name='liu16', word='hihihi'}
UserDTO{name='liu16', word='hihihi'}
UserDTO{name='liu17', word='hihihi'}
UserDTO{name='liu18', word='hihihi'}
UserDTO{name='liu19', word='hihihi'}
UserDTO{name='liu19', word='hihihi'}
UserDTO{name='liu21', word='hihihi'}
UserDTO{name='liu22', word='hihihi'}
UserDTO{name='liu23', word='hihihi'}
UserDTO{name='liu24', word='hihihi'}

可以看出,刚开始甚至返回了null,而且每次返回的值都可能重复。因此实现同步很有必要

同步实现的逻辑为:需要在请求发送过去之后,阻塞发送请求,直到请求得到响应之后,才能继续发送请求

这里利用了Future实现类的get方法来对响应返回进行阻塞。

对future类的理解:

Future的核心思想是:一个方法f,计算过程可能非常耗时,等待f返回,显然不明智。可以在调用f的时候,立马返回一个Future,可以通过Future这个数据结构去控制方法f的计算过程。

这里的控制包括:

get方法:获取计算结果(如果还没计算完,也是必须等待的)

cancel方法:还没计算完,可以取消计算过程

isDone方法:判断是否计算完

isCancelled方法:判断计算是否被取消

因此,我们可以创建一个response的future实现类,在这个方法中利用get方法获取response,如果没有获取到,get方法会阻塞掉,线程在这里阻塞,因此不会继续发送请求了。当get方法完成了任务(即得到了resopnse),就可以把阻塞掉的线程唤醒,那么就可以继续发送请求了。

synchronized关键字的使用:可以让其修饰代码块、方法和静态方法,被修饰的语句,只能允许一个线程执行,在该线程被执行完之前,其他线程都无法执行该代码块。它是一个互斥锁。需要线程去申请synchronized括号里的对象锁,只有申请到了才能执行。

接下来我们就编写Future的实现类FutureResponse类,使得其在获取response的时候,先将获取response的线程阻塞,同时释放锁,当resopnse被设置了值之后,再去唤醒锁,让他去参与竞争,去拿到resopnse对象返回。

public class XxlRpcFutureResponse implements Future {

//这里实现了一个future类,并让其监听resopnse对象,如果response存在,才能返回response对象  
//可以看到,在继承了该接口后,需要实现这些方法。  
//分析了那些需要实现的方法,可以总结出实现的逻辑

//首先需要知道什么时候response对象已经存在了,需要定义一个设置response对象的方法,如果该方法执行了,代表该方法已经执行了  
//1.设置一个标志位,二设置一个锁,get线程只有拿到锁对象才能去返回锁,在response存在之前,get线程被阻塞。这就是get的逻辑  
// future lock  
private boolean done = false;  
private Object lock = new Object();

private XxlRpcResponse response;  
public void setResponse(XxlRpcResponse response){  
    this.response=response;  
    //方法执行到这里,说明response已经存在了,需要告诉get方法,  
    synchronized (lock){  
        done=true;  
        lock.notifyAll();  
    }  
}  
//还没计算完成,可以取消计算结果  
@Override  
public boolean cancel(boolean mayInterruptIfRunning) {  
    return false;  
}  
//判断是否计算被取消,若cancel返回了true就取消了  
@Override  
public boolean isCancelled() {  
    return false;  
}  
//判断是否计算完  
@Override  
public boolean isDone() {  
    return done;  
}  
//获取计算结果,如果没有计算完成,则在这里等到,并阻塞  
@Override  
public XxlRpcResponse get() throws InterruptedException, ExecutionException {  
    //具体get逻辑在下面的get方法中  
    try {  
        return get(-1,TimeUnit.MILLISECONDS);  
    } catch (TimeoutException e) {  
        e.printStackTrace();  
    }  
}  
//获取计算结果,如果没有计算完,去看是否超时,如果超时了就不等了  
@Override  
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {  
    if(!done){  
        //get  
        //获取response对象的线程运行到这里,会去获取lock的锁对象,如果此时lock的锁对象正被set方法的线程拿着,那么get需要等待,当锁释放以后,get方法会拿到这个锁  
        //并且去根据超时数判断需要阻塞的时长,当get的线程阻塞了之后,会释放锁资源,让set方法拿到。如果set方法执行了,done会变true ,那么直接返回response即可  
        synchronized (lock){  
            if(timeout<0){//若超时数为-1,则一则阻塞下去,等待唤醒  
                lock.wait();  
            }else {  
                long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);//计算需要阻塞的时间  
                lock.wait();  
            }  
        }  
    }  
    //如果超时时间已经过去,还没有获取到对象,那么就抛出异常  
    if(!done){  
        throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() );  
    }

    return response;  
}

XxlRpcFutureResponse

设置response的方法,需要在哪里执行呢,当handle类的read方法,拿到响应后,可以设置response,因此,设置response的线程是netty的io线程执行的。这里将set方法放到工厂里去执行。

package com.xxl.rpc.remoting.invoker;

import com.xxl.rpc.registry.ServiceRegistry;
import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.xxl.rpc.remoting.net.params.XxlRpcResponse;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class XxlRpcInvokerFactory {

private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory();  
public static XxlRpcInvokerFactory getInstance() {  
    return instance;  
}

private ServiceRegistry serviceRegistry;  
public ServiceRegistry getServiceRegistry() {  
    return serviceRegistry;  
}

//要想设置response,需要给一个futureresponse的类,这里设置一个map集合,将futureresponse类存在这里,从这里取,防止每次请求都创建  
private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();

//设置一个初始化函数,在初始化的时候,将XxlRpcFutureResponse类放到池子里  
public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){  
    futureResponsePool.put(requestId, futureResponse);  
}  
public void removeInvokerFuture(String requestId){  
    futureResponsePool.remove(requestId);  
}

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){  
    //先得到XxlRpcFutureResponse的类对象  
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);  
    //如果没有就不执行  
    if (futureResponse == null) {  
        return;  
    }  
    //给futureResponse设置response对象  
    futureResponse.setResponse(xxlRpcResponse);  
}  

}

notifyInvokerFuture

上面的方法被放在handle类的read方法里面,由nio线程调用。

这样,就完成了一个同步的方法。

package com.xxl.rpc.remoting.invoker.reference;

import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.xxl.rpc.remoting.invoker.call.CallType;
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
import com.xxl.rpc.remoting.net.NetEnum;
import com.xxl.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.xxl.rpc.remoting.net.params.XxlRpcRequest;
import com.xxl.rpc.remoting.net.params.XxlRpcResponse;
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
import com.xxl.rpc.serialize.Serializer;
import com.xxl.rpc.util.XxlRpcException;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class XxlRpcReferenceBean {

//该类就封装了动态代理的getObject方法  
//首先需要传入必要的参数

private NetEnum netType;  
private Serializer serializer;  
private CallType callType;  
private LoadBalance loadBalance;

private Class<?> iface;  
private String version;

private long timeout = 1000;

private String address;  
private String accessToken;

private XxlRpcInvokerFactory invokerFactory;

/\*\*  
 \*  
 \* @param netType       使用的底层通信模块  
 \* @param serializer    序列化方式  
 \* @param callType      调用方式,有四种  
 \* @param loadBalance   负载均衡的方式  
 \* @param iface          接口名  
 \* @param version        版本  
 \* @param timeout        超时时间  
 \* @param address        请求地址  
 \* @param accessToken    接入token  

// * @param invokeCallback 回调的调用(可无)
* @param invokerFactory 调用工厂(可无)
*/
public XxlRpcReferenceBean(NetEnum netType,
Serializer serializer,
CallType callType,
LoadBalance loadBalance,
Class iface,
String version,
long timeout,
String address,
String accessToken,
// XxlRpcInvokeCallback invokeCallback,
XxlRpcInvokerFactory invokerFactory
){
this.netType = netType;
this.serializer = serializer;
this.callType = callType;
this.loadBalance = loadBalance;
this.iface = iface;
this.version = version;
this.timeout = timeout;
this.address = address;
this.accessToken = accessToken;
// this.invokeCallback = invokeCallback;
this.invokerFactory = invokerFactory;

    // valid  
    if (this.netType==null) {  
        throw new XxlRpcException("xxl-rpc reference netType missing.");  
    }  
    if (this.serializer==null) {  
        throw new XxlRpcException("xxl-rpc reference serializer missing.");  
    }  
    if (this.callType==null) {  
        throw new XxlRpcException("xxl-rpc reference callType missing.");  
    }  
    if (this.loadBalance==null) {  
        throw new XxlRpcException("xxl-rpc reference loadBalance missing.");  
    }  
    if (this.iface==null) {  
        throw new XxlRpcException("xxl-rpc reference iface missing.");  
    }  
    if (this.timeout < 0) {  
        this.timeout = 0;  
    }  
    if (this.invokerFactory == null) {  
        this.invokerFactory = XxlRpcInvokerFactory.getInstance();  
    }

    // init Client  

// initClient();
}

//利用动态代理,得到请求方法返回的参数  
public Object getOnject() {

    //直接返回动态代理的结果  
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),    //这里使用的类加载器是当前线程的类加载器  
            new Class\[\]{iface},                  //指明需要代理的类的接口名称  
            new InvocationHandler() {             //建立一个内部类,相当于继承了InvocationHandler接口,这里写的需要代理的内容  
                @Override  
                public Object invoke(Object proxy, Method method, Object\[\] args) throws Throwable {  
                    //下面是动态代理的调用的具体逻辑

                    //先得到要请求的类和方法的全部信息,通过这些信息封装成request发送给服务端  
                    String className = method.getDeclaringClass().getName();//类名,包括包名  
                    String varsion\_ = version;//版本信息  
                    String methodName = method.getName();//方法名称  
                    Class<?>\[\] parameterTypes = method.getParameterTypes();//方法参数的类型  
                    Object\[\] parameters = args;//方法的参数

                    //得到请求地址,  
                    //可能参数中已经传递了请求地址,那么请求就按传递的地址请求,也有可能没有传递请求地址,  
                    // 但注册中心可能已经为该请求的类准备了很多的请求地址,那么就需要用到负载均衡

                    String finalAddress = address;  
                    if (finalAddress == null || finalAddress.trim().length() == 0) {  
                        //没有传递过来地址,则需要到注册中心去找  
                        //这里需要创建一个工厂类,提供一些需要的方法,包括生成key值,获取地址等函数  
                        if (invokerFactory != null || invokerFactory.getServiceRegistry() != null) {  
                            //如果这个工厂类不为空,而且通过工厂类能得到注册中心,那么就可以取地址了  
                            //得到存储的key值  
                            String key = XxlRpcProviderFactory.makeServiceKey(className, varsion\_);  
                            //得到的地址是一个set集合  
                            TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(key);

                            if (addressSet == null || addressSet.size() == 0) {   //没有得到请求地址  
                                // pass  
                            } else if (addressSet.size() == 1) {  //得到一个请求地址,那么就是这一个  
                                finalAddress = addressSet.first();  
                            } else {   //得到很多请求地址,则需要负载均衡  负载均衡的代码之后再说  
                                finalAddress = loadBalance.xxlRpcInvokerRouter.route(key, addressSet);  
                            }  
                        }  
                    }  
                    //如果最终还是没有得到请求地址,则抛出异常  
                    if (finalAddress == null || finalAddress.trim().length() == 0) {  
                        throw new XxlRpcException("xxl-rpc reference bean\[" + className + "\] address empty");  
                    }

                    //现在就可以封装request了  
                    XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();  
                    xxlRpcRequest.setRequestId(UUID.randomUUID().toString());//随机的request的id,根据这个唯一的id可以在同步调用时候拿到正确的值  
                    xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());//创建时间,传到服务端与那里的时间相比,如果大于超时时间则抛出异常  
                    xxlRpcRequest.setAccessToken(accessToken);//token值如果有,会与服务端的对比  
                    xxlRpcRequest.setClassName(className);//请求类名  
                    xxlRpcRequest.setMethodName(methodName);//方法名  
                    xxlRpcRequest.setParameterTypes(parameterTypes);  
                    xxlRpcRequest.setParameters(parameters);

                    //接下来就是调用的逻辑了,四种调用方式用if->else来选择  
                    if (CallType.SYNC == callType) {  
                        //如果是使用同步调用,在netty底层的调用是异步的,异步的意思就是,如果发起多个请求,那么会无法判断返回的响应是否是请求所得,  
                        // 因此,需要在请求发送过去之后,阻塞发送请求,直到请求得到响应之后,才能继续发送请求。要实现这样的功能,要用到  
                        //future的get方法。  
                        //这里先创建future对象,并初始化  
                        XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest);

                        //然后发送请求  
                        //执行send方法,这里还没有实现  
                        //然后接收响应  
                        //通过get方法的阻塞来同步响应  
                        XxlRpcResponse response = futureResponse.get(timeout, TimeUnit.MILLISECONDS);  
                        if (response.getErrorMsg() != null) {  
                            throw new XxlRpcException(response.getErrorMsg());  
                        }  
                        return response.getResult();  
                        //以上就是同步调用的逻辑  
                    }  
                    return null;  
                }  
            });

}

}

getOnject

步骤4:Future异步调用的实现:、

在上面同步的代码中,使用get方法阻塞实现了线程的同步,那么如果用户想自己规定get resopnse的时机,而不是直接返回,就需要future模式,再创建一个调用的future,然后设置一个ThreadLocal,将值存起来,等需要的时候直接获取就可以了。

具体实现,是在上面同步的基础上,定义了一个调用Future,并通过get方法来得到futureresponse对象,这里存的是response,如何取到response的逻辑和上面一致。但不同的是,设置了一个ThreadLocal集合

ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。

在ThreadLocal类中有一个Map,用于存储每一个线程的变量副本,Map中元素的键为ThreadLocal对象,而值对应线程的变量副本。这个map是thread类的一个私有属性,所以可以通过线程获取该map的值。

XxlRpcInvokeFuture类

public class XxlRpcInvokeFuture implements Future {

//首先需要将futureResponse引进来,要获取response对象,需要从futureResponse中get。  
private XxlRpcFutureResponse futureResponse;

public XxlRpcInvokeFuture(XxlRpcFutureResponse futureResponse) {  
    this.futureResponse = futureResponse;  
}  
public void stop(){  
    // remove-InvokerFuture  
}  
//实现的方法都从futureResponse调用  
@Override  
public boolean cancel(boolean mayInterruptIfRunning) {  
    return futureResponse.cancel(mayInterruptIfRunning);  
}

@Override  
public boolean isCancelled() {  
    return futureResponse.isCancelled();  
}

@Override  
public boolean isDone() {  
    return futureResponse.isDone();  
}

@Override  
public Object get() throws InterruptedException, ExecutionException {  
    try {  
        return get(-1, TimeUnit.MILLISECONDS);  
    } catch (TimeoutException e) {  
        throw new XxlRpcException(e);  
    }  
}

@Override  
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {  
    try {  
        // future get  
        XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, unit);  
        if (xxlRpcResponse.getErrorMsg() != null) {  
            throw new XxlRpcException(xxlRpcResponse.getErrorMsg());  
        }  
        return xxlRpcResponse.getResult();  
    } finally {  
        stop();  
    }  
}

//创建一个ThreadLocal线程,来绑定invokerfuture的线程  
private static ThreadLocal<XxlRpcInvokeFuture> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeFuture>();

//将future保存到线程中去,相当于一个线程副本  
public static void setFuture(XxlRpcInvokeFuture future) {  
    threadInvokerFuture.set(future);  
}

public static void removeFuture() {  
    threadInvokerFuture.remove();  
}

//获取Future对象,然后将线程移除掉,如果不移除,会导致内存泄露  
public static <T> Future<T> getFuture(Class<T> type) {  
    Future<T> future = (Future<T>) threadInvokerFuture.get();  
    threadInvokerFuture.remove();  
    return future;  
}

}

XxlRpcInvokeFuture

else if(CallType.FUTURE==callType){
//使用get方法阻塞实现了线程的同步,那么如果用户想自己规定get resopnse的时机,
// 而不是直接返回,就需要future模式,再创建一个调用的future,然后设置一个ThreadLocal,
// 将值存起来,等需要的时候直接获取就可以了。
//因此,我们需要创建一个调用future类,并将该类的线程交个threadlocal去执行

                        //先创建futureResponse对象  
                        XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest);  
                        //然后将futureResponse对象加到调用future类中,保存到线程里面去  
                        XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);  
                        XxlRpcInvokeFuture.setFuture(invokeFuture);

                        //发送请求  
                        //发送完之后,不用返回,而是等什么时候用,什么时候调用getFuture得到调用future,然后再执行get方法,获取response对象  
                        return null;  
                    }

对于每一次请求都会set一个invokerFuture对象,并将其存入ThreadLocal中,它是一个map,因此,依照顺序存入有得到。依照队列的形式存取

@Test
public void futureTest() throws ExecutionException, InterruptedException {

    XxlRpcReferenceBean bean = new XxlRpcReferenceBean(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(),  
            CallType.FUTURE, LoadBalance.ROUND, IDemo.class, null, 500, "127.0.0.1:7080",  
            null, null, null);  
    //得到返回对象  
    IDemo demo =(IDemo) bean.getObject();

    demo.sayHi("liu:");  
    Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);  
    demo.sayHi("xing");  
    Future<UserDTO> future1 = XxlRpcInvokeFuture.getFuture(UserDTO.class);

    UserDTO s1 = future.get();  
    UserDTO s2 = future1.get();  
    System.out.println(s2);  
    System.out.println(s1);

}

测试结果如下:

UserDTO{name='xing', word='hihihi'}
UserDTO{name='liu:', word='hihihi'}

步骤:5:callback方式的调用

callback调用方式使用了回调的理念,即当事件发生时会触发调用函数,并执行。那么就需要一个回调的类,

与future的调用方法相似,创建一个回调的类,并在调用的时候,将该类加到ThreadLocal里面,然后,回调函数如果判断到回调类存在了(即运行了set方法),就执行回调的操作,因此回调函数的执行是在获取到response之后,即放在工厂类的notify里面。

public abstract class XxlRpcInvokeCallback {

//设置一个抽象的回调类,定义了回调函数执行的方法名,具体实现在满足条件的时候再调用  
//两个抽象方法,规定了在成功和失败时调用的方法  
public abstract void onSuccess(T result);

public abstract void onFailure(Throwable exception);

//与InvokerFuture类似,设置一个ThreadLocal,在调用的时候设置这个回调类,然后去取  
private static ThreadLocal<XxlRpcInvokeCallback> threadInvokerFuture = new ThreadLocal<XxlRpcInvokeCallback>();

public static XxlRpcInvokeCallback getCallback() {  
    XxlRpcInvokeCallback invokeCallback = threadInvokerFuture.get();  
    threadInvokerFuture.remove();  
    return invokeCallback;  
}  
public static void setCallback(XxlRpcInvokeCallback invokeCallback) {  
    threadInvokerFuture.set(invokeCallback);  
}  
public static void removeCallback() {  
    threadInvokerFuture.remove();  
}

}

XxlRpcInvokeCallback

在工厂类中执行回调函数,是执行的io的线程,因此,需要给他设置一个线程池,来防止将回调逻辑都放到io线程里面去。

这里继续利用线程池额执行类

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){
//先得到XxlRpcFutureResponse的类对象
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
//如果没有就不执行
if (futureResponse == null) {
return;
}
//如果回调类不为空,即得到了回调类,那么就开启线程池来执行操作
if(futureResponse.getInvokeCallback()!=null){

        //执行线程池的操作  
        executeResponseCallback(new Runnable() {  
            @Override  
            public void run() {  
                if (xxlRpcResponse.getErrorMsg() != null) {  
                    futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));  
                } else {  
                    System.out.println(Thread.currentThread());  
                    futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());  
                }  
            }  
        });  
    }  
    //给futureResponse设置response对象  
    futureResponse.setResponse(xxlRpcResponse);  
}

//使用线程池来执行  
private ThreadPoolExecutor responseCallbackThreadPool = null;  //设置一个线程池  
//配置线程池  
public void executeResponseCallback(Runnable runnable){  
    if(responseCallbackThreadPool==null){  
        synchronized (this){  //这里加一个锁,执行这些操作的时候,不能操作其他的线程  
            if (responseCallbackThreadPool == null) {  
                responseCallbackThreadPool = new ThreadPoolExecutor(  
                        10,  
                        100,  
                        60L,  
                        TimeUnit.SECONDS,  
                        new LinkedBlockingQueue<Runnable>(1000),  
                        new ThreadFactory() {  
                            @Override  
                            public Thread newThread(Runnable r) {  
                                return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());  
                            }  
                        },  
                        new RejectedExecutionHandler() {  
                            @Override  
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
                                throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");  
                            }  
                        });        // default maxThreads 300, minThreads 60  
            }  
        }  
    }  
}

else if(CallType.CALLBACK==callType){
//有了上面的分析,回调的实现很简单了,将callback类设置到ThreadLocal里面去,然后在回调函数里面去取,取出来执行。
//不同点在于,执行的函数一般不能放在io的线程里面,因此,需要使用一个线程池来维护。
// get callback
XxlRpcInvokeCallback finalInvokeCallback=null;
XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
// System.out.println(threadInvokeCallback);
if (threadInvokeCallback != null) {
finalInvokeCallback = threadInvokeCallback;
}
if (finalInvokeCallback == null) {
throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
}

                        // future-response set  
                        XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);  
                        //执行发送代码  

// client.asyncSend(finalAddress, xxlRpcRequest);

                        return null;  
                    }  
                    return null;  
                }

CALLBACK

步骤6:单一长连接

因为NIO是非阻塞的,因此,每个客户端只需要连接一次就可以了,这就是单一长连接,它的实现是一个连接池。将一个客户端(adress不变)作为一个连接来维护,将该客户端存到一个ConcurrentHashMap来保存,在进行连接的时候,去看这个ConcurrentHashMap里有没有需要的连接,如果没有,就需要创建,这里的过程是同步的,创建连接需要加锁,

代码如下:

public abstract class ConnectClient {

//实现单一长连接的基础  
//提供的接口方法

//初始化方法  
public abstract void init(String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory) throws Exception;  
//关闭  
public abstract void close();  
//验证  
public abstract boolean isValidate();  
//发送  
public abstract void send(XxlRpcRequest xxlRpcRequest) throws Exception ;

//异步发送方法  
public static void asyncSend(XxlRpcRequest xxlRpcRequest, String address,  
                             Class<? extends ConnectClient> connectClientImpl,  
                             final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {

    //这里提供一个从池子中取连接的方法  
    ConnectClient clientPool = ConnectClient.getPool(address, connectClientImpl, xxlRpcReferenceBean);

    try {  
        // do invoke  
        clientPool.send(xxlRpcRequest);  
    } catch (Exception e) {  
        throw e;  
    }  
}  
//使用ConcurrentHashMap来做一个池子,使用ConcurrentHashMap的原因是它实现了一个分段锁机制,保证不同线程在put的时候不会阻塞  
private static volatile ConcurrentHashMap<String, ConnectClient> connectClientMap;  
//每个地址都会有一个锁,来控制连接的创建,如果是相同的地址,要创建连接需要加锁,否则会出现同一地址创建多个连接的情况。  
private static volatile ConcurrentHashMap<String, Object> connectClientLockMap = new ConcurrentHashMap<>();

//获取连接的代码  
private static ConnectClient getPool(String address, Class<? extends ConnectClient> connectClientImpl,  
                                     final XxlRpcReferenceBean xxlRpcReferenceBean) throws Exception {

    //先判断这个池子是否为空,如果为空,则需要创建(这里为空的意思是null)  
    //创建map需要用到双检锁,保证创建的是一个单例  
    if (connectClientMap == null) {  
        synchronized (ConnectClient.class) {  
            if (connectClientMap == null) {  
                // init  
                connectClientMap = new ConcurrentHashMap<String, ConnectClient>();  
                // stop callback  
                //这里不管他  
            }  
        }  
    }

    //去验证是不是存在客户端,存在了就直接返回  
    ConnectClient connectClient = connectClientMap.get(address);  
    if (connectClient!=null && connectClient.isValidate()) {  
        return connectClient;  
    }

    //如果没有客户端连接,就需要去创建连接,但在此之前,需要构建一个锁  
    Object clientLock = connectClientLockMap.get(address);  
    if (clientLock == null) {  
        connectClientLockMap.putIfAbsent(address, new Object());//putIfAbsent保证了使用原来的锁  
        clientLock = connectClientLockMap.get(address);  
    }

    //加锁  
    synchronized (clientLock) {

        //需要判断是不是这个连接存活,存活就返回  
        connectClient = connectClientMap.get(address);  
        if (connectClient!=null && connectClient.isValidate()) {  
            return connectClient;  
        }

        // remove old  
        if (connectClient != null) {  
            connectClient.close();  
            connectClientMap.remove(address);  
        }

        // 创建连接,并加到池子里面,然后返回  
        ConnectClient connectClient\_new = connectClientImpl.newInstance();  
        connectClientMap.put(address, connectClient\_new);

        return connectClient\_new;

    }

}

ConnectClient

单一长连接是以上三种调用的基础,将它作为一种调用其实不能符合逻辑。


以上,我们的调用就完成了,客户端通信的连接代码和服务端类似,不需要重复,该项目的精华部分上面已经全部搞定。

下面来对注册中进行研究。