Java网关服务-AIO(一)
阅读原文时间:2023年07月08日阅读:2

Java网关-AIO(一)

aio:声明一个byteBuffer,异步读,读完了之后回调,相比于Future.get(),可以减少阻塞、减少线程等待,充分利用有限的线程

nio:声明一个byteBuffer,自己同步读出来,再做业务

作为一个网关,一般可以代理某个后端的功能,暴露出更容易被访问或添加一些鉴权信息,让前台可以简单、安全的访问到后台。

当前案例想实现一个JAVA后端RPC接口调用的网关给其他语言使用,既然是跨语言,最常见的有:

  • HTTP协议接口,几乎所有语言都有对应的类库,实现HTTP访问
  • Socket协议接口,它是更为底层的方案,对比HTTP接口,它的适用范围更偏向高效代理,基于Socket自定义比HTTP协议更个性化、简单的协议,它的性能会明显高于HTTP调用,因为HTTP协议是一个较为臃肿的协议,性能一般。

当前将要实现的网关仅用于其他语言调用本机的某个端口来通信,而非对外提供服务,并且其他进程使用短连接调用,请求结束后,连接将立即关闭。

AIO(NIO 2.0)又称为异步非阻塞IO,相比同步IO,它向内核发出I/O命令后就去做其他事情了,而不需要等待,等到内核完成I/O操作,会回调注册的成功或失败回调事件,在Windows系统中,用Iocp实现了AIO;但是在linux系统中,AIO实际并非真正意义上的AIO,底层是由EPOLL实现的 ,因此它的性能并非一定比NIO高。

AIO的编程难度相比NIO低了不少,在实际项目中,仍然推荐使用Netty,由于他们在linux上底层都是基于epoll,从性能上来看,差异不大,并且netty在实际项目中编码方式更友好。

首先,定义一个接口

/**
 * 服务接口
 */
public interface IServer {

    /**
     * 启动
     * @throws Exception
     */
    void startup() throws Exception;

}

AsyncServer继承它

public class AsyncServer implements IServer {

    private final static Logger LOGGER = LoggerFactory.getLogger(AsyncServer.class);

    @Value("${server.port}")
    private int port = 9099;

    @Value("${server.poolSize}")
    private int poolSize;

    /**
     * 仅接受localhost请求
     */
    private boolean onlyAcceptLocalhost = true;

    private AsynchronousServerSocketChannel asynchronousServerSocktChannel;

    public AsyncServer() {
    }

    @override
    public void startup() throws Exception {
        initServer(port, poolSize);
        startServer();
    }

    public void initServer(int port, final int poolSize) throws IOException {
        this.port = port;
        this.poolSize = poolSize;

        ThreadFactory threadFactory = new DefaultThreadFactory("GateWay-Server-IO");
        //这个线程池是用来处理I/O事件,调度CompletionHandler的
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(poolSize, threadFactory);

        asynchronousServerSocktChannel = AsynchronousServerSocketChannel.open(group);
        //因为直接杀进程,没有显式关闭套接字来释放端口,会等待一段时间后才可以重新use这个关口,解决办法就是用SO_REUSEADDR
        // 端口重用,防止进程意外终止,未释放端口,重启时失败,只有最后一个套接字会正常接收数据
        asynchronousServerSocktChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        //本地环境,可以将socket 接受缓冲区recbuf调大,因为不存在网络问题
        asynchronousServerSocktChannel.setOption(StandardSocketOptions.SO_RCVBUF, 100 * 1024);
        asynchronousServerSocktChannel.bind(new InetSocketAddress(port));
        LOGGER.info("Server start!port:{}", port);
    }

    /**
     * 启用监听connection accepted
     */
    public void startServer() {
        asynchronousServerSocktChannel.accept(asynchronousServerSocktChannel, new ChannelAcceptedHandler(this.onlyAcceptLocalhost));
        LOGGER.info("Server start accept connection!port:{}", port);
    }

}

ThreadFactory

这里定义了一个ThreadFactory,主要是用来约定线程名称的,具体实现是io.netty.util.concurrent.DefaultThreadFactory

AsynchronousChannelGroup

AsynchronousChannelGroup的目的是资源共享,用于一组异步channels来做他们的I/O完成后的操作,一个Group会有一个关联的ThreadPool,如果程序没有指定,则会使用系统默认的Pool,同时其生成出来的线程会是daemon线程

AsynchronousChannelGroup.withFixedThreadPool(poolSize, threadFactory)会生成一个ThreadPool,用来处理I/O事件完成后的回调任务,如从Channel缓冲区读取已经完成的I/O操作任务结果,解析为程序可以识别的信息;网上有很多人推荐处理业务时,另开线程池处理,防止业务操作将I/O操作的线程阻塞了。

我们的实现中并没有这么做,原因是网关将部署在一台机器性能并非多高的机器上,主要用于本机其他语言访问,流量不会太大,它不需要对I/O操作那么敏感,并且阻塞I/O事件一定意义上有限流的效果;当然单独开辟业务线程池,同时调低I/O事件线程池的大小也是值得推荐的。

绑定

AsynchronousServerSocketChannel.open(group)会打开了异步的server-socket channel,使用group处理面向流的客户的channel I/O完成操作

asynchronousServerSocktChannel.bind(new InetSocketAddress(port));监听指定的端口

startServer

asynchronousServerSocktChannel.accept(asynchronousServerSocktChannel, new ChannelAcceptedHandler(this.onlyAcceptLocalhost));

server开始服务始于accept方法被调用,此时服务开始监听connection accept事件

ChannelAcceptedHandler

实现了CompletionHandler,用于连接被成功accept或accept失败后的操作

在下一节将对ChannelAcceptedHandler为首的多个CompletionHandler进行讲解,它们共同完成了一个协议较简单、含粘包处理的服务端

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器