aio:声明一个byteBuffer,异步读,读完了之后回调,相比于Future.get(),可以减少阻塞、减少线程等待,充分利用有限的线程
nio:声明一个byteBuffer,自己同步读出来,再做业务
作为一个网关,一般可以代理某个后端的功能,暴露出更容易被访问或添加一些鉴权信息,让前台可以简单、安全的访问到后台。
当前案例想实现一个JAVA后端RPC接口调用的网关给其他语言使用,既然是跨语言,最常见的有:
当前将要实现的网关仅用于其他语言调用本机的某个端口来通信,而非对外提供服务,并且其他进程使用短连接调用,请求结束后,连接将立即关闭。
AIO(NIO 2.0)又称为异步非阻塞IO,相比同步IO,它向内核发出I/O命令后就去做其他事情了,而不需要等待,等到内核完成I/O操作,会回调注册的成功或失败回调事件,在Windows系统中,用Iocp实现了AIO;但是在linux系统中,AIO实际并非真正意义上的AIO,底层是由EPOLL实现的 ,因此它的性能并非一定比NIO高。
AIO的编程难度相比NIO低了不少,在实际项目中,仍然推荐使用Netty,由于他们在linux上底层都是基于epoll,从性能上来看,差异不大,并且netty在实际项目中编码方式更友好。
首先,定义一个接口
/**
* 服务接口
*/
public interface IServer {
/**
* 启动
* @throws Exception
*/
void startup() throws Exception;
}
AsyncServer继承它
public class AsyncServer implements IServer {
private final static Logger LOGGER = LoggerFactory.getLogger(AsyncServer.class);
@Value("${server.port}")
private int port = 9099;
@Value("${server.poolSize}")
private int poolSize;
/**
* 仅接受localhost请求
*/
private boolean onlyAcceptLocalhost = true;
private AsynchronousServerSocketChannel asynchronousServerSocktChannel;
public AsyncServer() {
}
@override
public void startup() throws Exception {
initServer(port, poolSize);
startServer();
}
public void initServer(int port, final int poolSize) throws IOException {
this.port = port;
this.poolSize = poolSize;
ThreadFactory threadFactory = new DefaultThreadFactory("GateWay-Server-IO");
//这个线程池是用来处理I/O事件,调度CompletionHandler的
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(poolSize, threadFactory);
asynchronousServerSocktChannel = AsynchronousServerSocketChannel.open(group);
//因为直接杀进程,没有显式关闭套接字来释放端口,会等待一段时间后才可以重新use这个关口,解决办法就是用SO_REUSEADDR
// 端口重用,防止进程意外终止,未释放端口,重启时失败,只有最后一个套接字会正常接收数据
asynchronousServerSocktChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
//本地环境,可以将socket 接受缓冲区recbuf调大,因为不存在网络问题
asynchronousServerSocktChannel.setOption(StandardSocketOptions.SO_RCVBUF, 100 * 1024);
asynchronousServerSocktChannel.bind(new InetSocketAddress(port));
LOGGER.info("Server start!port:{}", port);
}
/**
* 启用监听connection accepted
*/
public void startServer() {
asynchronousServerSocktChannel.accept(asynchronousServerSocktChannel, new ChannelAcceptedHandler(this.onlyAcceptLocalhost));
LOGGER.info("Server start accept connection!port:{}", port);
}
}
这里定义了一个ThreadFactory,主要是用来约定线程名称的,具体实现是io.netty.util.concurrent.DefaultThreadFactory
AsynchronousChannelGroup的目的是资源共享,用于一组异步channels来做他们的I/O完成后的操作,一个Group会有一个关联的ThreadPool,如果程序没有指定,则会使用系统默认的Pool,同时其生成出来的线程会是daemon线程
AsynchronousChannelGroup.withFixedThreadPool(poolSize, threadFactory)会生成一个ThreadPool,用来处理I/O事件完成后的回调任务,如从Channel缓冲区读取已经完成的I/O操作任务结果,解析为程序可以识别的信息;网上有很多人推荐处理业务时,另开线程池处理,防止业务操作将I/O操作的线程阻塞了。
我们的实现中并没有这么做,原因是网关将部署在一台机器性能并非多高的机器上,主要用于本机其他语言访问,流量不会太大,它不需要对I/O操作那么敏感,并且阻塞I/O事件一定意义上有限流的效果;当然单独开辟业务线程池,同时调低I/O事件线程池的大小也是值得推荐的。
AsynchronousServerSocketChannel.open(group)会打开了异步的server-socket channel,使用group处理面向流的客户的channel I/O完成操作
asynchronousServerSocktChannel.bind(new InetSocketAddress(port));监听指定的端口
asynchronousServerSocktChannel.accept(asynchronousServerSocktChannel, new ChannelAcceptedHandler(this.onlyAcceptLocalhost));
server开始服务始于accept方法被调用,此时服务开始监听connection accept事件
实现了CompletionHandler,用于连接被成功accept或accept失败后的操作
在下一节将对ChannelAcceptedHandler为首的多个CompletionHandler进行讲解,它们共同完成了一个协议较简单、含粘包处理的服务端
手机扫一扫
移动阅读更方便
你可能感兴趣的文章