java中的AIO
阅读原文时间:2023年07月10日阅读:1

AIO(异步非阻塞)AIO采用了Proactor模式,AIO与NIO的不同之处在于当AIO在进行读写操作时,不用先等通知,可直接调用相应的read/write方法,这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序,而NIO的通知是发生在动作之前的,是在可读、写的时候,Selector发现了这些事件后调用Handler处理.

在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open()静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法外,还提供了read和write方法。

在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的事件处理类是CompletionHandler,这个接口定义了如下两个方法,分别在异步操作成功和失败时被回调。

void completed(V result, A attachment);

void failed(Throwable exc, A attachment);

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.AsynchronousServerSocketChannel;
  5. import java.nio.channels.AsynchronousSocketChannel;
  6. import java.nio.channels.CompletionHandler;
  7. import java.util.concurrent.ExecutionException;
  8. import java.util.concurrent.Future;
  9. import java.util.concurrent.TimeUnit;
  10. import java.util.concurrent.TimeoutException;
  11. public class AIOEchoServer {
  12. public final static int PORT = 8001;
  13. public final static String IP = "127.0.0.1";
  14. private AsynchronousServerSocketChannel server = null;
  15. public AIOEchoServer(){
  16. try {
  17. //同样是利用工厂方法产生一个通道,异步通道 AsynchronousServerSocketChannel
  18. server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));
  19. } catch (IOException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. //使用这个通道(server)来进行客户端的接收和处理
  24. public void start(){
  25. System.out.println("Server listen on "+PORT);
  26. //注册事件和事件完成后的处理器,这个CompletionHandler就是事件完成后的处理器
  27. server.accept(null,new CompletionHandler(){
  28. final ByteBuffer buffer = ByteBuffer.allocate(1024);
  29. @Override
  30. public void completed(AsynchronousSocketChannel result,Object attachment) {
  31. System.out.println(Thread.currentThread().getName());
  32. Future writeResult = null;
  33. try{
  34. buffer.clear();
  35. result.read(buffer).get(100,TimeUnit.SECONDS);
  36. System.out.println("In server: "+ new String(buffer.array()));
  37. //将数据写回客户端
  38. buffer.flip();
  39. writeResult = result.write(buffer);
  40. }catch(InterruptedException | ExecutionException | TimeoutException e){
  41. e.printStackTrace();
  42. }finally{
  43. server.accept(null,this);
  44. try {
  45. writeResult.get();
  46. result.close();
  47. } catch (InterruptedException | ExecutionException e) {
  48. e.printStackTrace();
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  54. @Override
  55. public void failed(Throwable exc, Object attachment) {
  56. System.out.println("failed:"+exc);
  57. }
  58. });
  59. }
  60. public static void main(String[] args) {
  61. new AIOEchoServer().start();
  62. while(true){
  63. try {
  64. Thread.sleep(1000);
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }
  70. }

客户端: