java网络编程基础——TCP网络编程三
阅读原文时间:2023年07月08日阅读:1

AIO实现非阻塞通信

java7 NIO2 提供了异步Channel支持,这种异步Channel可以提供更高效的IO,这种基于异步Channel的IO被称为异步IO(Asynchronous IO)

IO操作分为两步:1、程序发出IO请求  2、完成实际的IO操作

阻塞和非阻塞IO是根据第一步划分的:

发出IO请求如果阻塞线程则是阻塞IO,如果不阻塞线程,则是非阻塞IO。

同步IO和异步IO是根据第二步划分:

如果实际的IO操作是由操作系统完成,再将结果返回给应用程序,这就是异步IO。

如果实际的IO需要应用程序本身去执行,会阻塞线程,那就是同步IO。

(java传统的IO操作和基于Channel的非阻塞IO都是同步IO)

NIO2提供了一系列以Asynchronous开头的Channel接口和类。

其中AsynchronousSocketChannel、AsynchronousServerSocketChannel是支持TCP通信的异步Channel。

AsynchronousServerSocketChannel:负责监听的Channel,与ServerSocketChannel相似。

AsynchronousServerSocketChannel使用需要三步:

1)调用open()静态方法创建AsynchronousServerSocketChannel实例

2)调用AsynchronousServerSocketChannel的bind()方法让他在指定IP,端口监听。

3)调用AsynchronousServerSocketChannel的accept()方法接收连接请求。

AsynchronousSocketChannel:与SocketChannel类似,执行具体的IO操作

AsynchronousSocketChannel的用法也可以分为三步:

1)调用Open()静态方法创建AsynchronousSocketChannel实例

2)调用AsynchronousSocketChannel的connect()方法让他在指定IP,端口服务器。

3)调用AsynchronousSocketChannel的read()、write()方法进行读写。

AsynchronousServerSocketChannel、AsynchronousSocketChannel都允许使用线程池管理,open()方法创建对应实例时都可以传入AsynchronousChannelGroup。AsynchronousChannelGroup创建需要传入一个线程池ExecutorService。

AsynchronousServerSocketChannel的accept()方法、AsynchronousSocketChannel的read()、write()方法都有两个版本

1)返回Future对象版本:必须等到Future的get方法返回时IO操作才完成,get方法会阻塞线程的。

2)需要传入CompletionHandler版本:通过ComplctionHandler完成相关操作。

CompletionHandler是一个接口,该接口中定义了两个方法:

completed(V result,A attachment):当IO操作完成时触发该方法,第一参数表示IO操作返回的参数;第二个参数表示发起IO操作时传入的附加参数。

failed(Trowable exc,A attachment):当IO操作失败事触发该方法,第一参数表示异常信息,,第二个参数表示发起IO操作时传入的附加参数。

下面使用Future对象版本实现简单的AIO服务端、客户端通信:

package net;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;

public class SimpleAIOServer {

static final int PORT = 30000;

public static void main(String\[\] args) throws Exception {

    try (//创建AsynchronousServerSocketChannel实例  
            AsynchronousServerSocketChannel serverSocketChannel =  
                AsynchronousServerSocketChannel.open();)  
    {  
        serverSocketChannel.bind(new InetSocketAddress(PORT));  
        while(true) {  
            //采用循环接收客户端的连接  
            Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();  
            //获取连接后返回AsynchronousSocketChannel  
            AsynchronousSocketChannel socketChannel = future.get();

            //向客户端输出数据  
            Future<Integer> future1 =socketChannel.write(ByteBuffer.wrap("AIO HELLO".  
            getBytes("UTF-8")));  
            future1.get();  
        }  
    } catch (IOException e) {  
        e.printStackTrace();  
    }  
}  

}
package net;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.Future;

public class SimpleAIOClient {

static final int PORT = 30000;

public static void main(String\[\] args) throws Exception {

    //用户读取数据的Buffer  
    ByteBuffer buff = ByteBuffer.allocate(1024);  
    Charset charset = Charset.forName("UTF-8");  
    try(//创建AsynchronousSocketChannel实例  
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();)  
    {  
        //连接到远程服务器  
        Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT));  
        future.get();

        buff.clear();  
        //socketChannel中读取数据  
        Future<Integer> future1 = socketChannel.read(buff);  
        future1.get();

        buff.flip();  
        String content = charset.decode(buff).toString();  
        System.out.println("服务器:" + content);

    } catch (IOException e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
    }  
}  

}

结果:

服务器:AIO HELLO

AIO实现多人聊天

package net;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer {

static final int PORT = 30000;  
static List<AsynchronousSocketChannel> channelList = new ArrayList<>();

public void init() throws IOException {  
    //创建一个线程池  
    ExecutorService executor = Executors.newFixedThreadPool(20);  
    //以指定线程池创建分组管理器  
    AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);  
    //以线程池创建AsynchronousServerSocketChannel  
    AsynchronousServerSocketChannel serverSocketChannel =  
                            AsynchronousServerSocketChannel.open(channelGroup);  
    //绑定端口  
    serverSocketChannel.bind(new InetSocketAddress(PORT));  
    //使用CompletionHandler处理客户端连接请求,此处的Handler主要处理客户端连接请求  
    serverSocketChannel.accept(null, new AcceptHandler(serverSocketChannel));

}

public static void main(String\[\] args) throws Exception {  
    AIOServer aioServer = new AIOServer();  
    aioServer.init();  
    Thread.sleep(Integer.MAX\_VALUE);  
     //不让服务器停止  
    while(true) {}  
}

}

package net;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;

public class AcceptHandler implements CompletionHandler{

private AsynchronousServerSocketChannel serverSocketChannel = null;  
public AcceptHandler(AsynchronousServerSocketChannel serverSocketChannel) {  
    this.serverSocketChannel = serverSocketChannel ;  
}

//定义一个Buffer准备读取数据  
ByteBuffer buff = ByteBuffer.allocate(1024);  
Charset charset = Charset.forName("UTF-8");

//当IO操作完成时触发该方法  
@Override  
public void completed(final AsynchronousSocketChannel socketChannel, Object attachment) {  
    //记录新进来的Channel  
    AIOServer.channelList.add(socketChannel);

    //准备接收客户端的下一次连接  
    serverSocketChannel.accept(null, this);

    //读取客户端数据,此处的Handler主要处理读取客户数据  
    socketChannel.read(buff, null, new CompletionHandler<Integer, Object>() {

        @Override  
        public void completed(Integer result, Object attachment) {  
            buff.flip();  
            //将Buffer中的数据转换成字符串  
            String content = charset.decode(buff).toString();  
            //将客户端发来的数据 发送到么每个客户端  
            for(AsynchronousSocketChannel asc : AIOServer.channelList) {  
                try {  
                    asc.write(ByteBuffer.wrap(content.getBytes(charset))).get();  
                } catch (InterruptedException | ExecutionException e) {  
                    e.printStackTrace();  
                }  
            }  
            //清空buff容器,用户读取下一次数据  
            buff.clear();  
        }

        //当IO操作失败事触发该方法  
        @Override  
        public void failed(Throwable exc, Object attachment) {  
            System.out.println("读取数据失败:"+ exc);  
            //读取数据失败,客户端出问题,移除对应的channel  
            AIOServer.channelList.remove(socketChannel);  
        }  
    });

}

@Override  
public void failed(Throwable exc, Object attachment) {  
    System.out.println("连接失败:"+exc);  
}

}

package net;
import java.awt.BorderLayout;
import java.awt.event.ActionEvent;
import java.awt.event.InputEvent;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import javax.swing.KeyStroke;

public class AIOClient {
static final int PORT = 30000;
//与服务器通信的异步Channel
AsynchronousSocketChannel socketChannel = null;

JFrame mainWin = new JFrame("多人聊天");  
JTextArea jta = new JTextArea(16,48);  
JTextField jtf = new JTextField(40);  
JButton sendBtn = new JButton("发送");

public void init() {  
    mainWin.setLayout(new BorderLayout());  
    jta.setEditable(false);  
    mainWin.add(new JScrollPane(jta),BorderLayout.CENTER);  
    JPanel jp = new JPanel();  
    jp.add(jtf);  
    jp.add(sendBtn);

    @SuppressWarnings("serial")  
    Action sendAction  = new AbstractAction() {  
        @Override  
        public void actionPerformed(ActionEvent e) {  
            String content = jtf.getText();  
            if(content.trim().length() > 0) {  
                //将输入内容写到channel中  
                try {  
                    socketChannel.write(ByteBuffer.  
                    wrap(content.getBytes(StandardCharsets.UTF\_8))).get();  
                } catch (InterruptedException | ExecutionException e1) {  
                    e1.printStackTrace();  
                }  
            }  
            jtf.setText("");  
        }  
    };

    sendBtn.addActionListener(sendAction);  
    jtf.getInputMap().put(KeyStroke.getKeyStroke('\\n', InputEvent.CTRL\_MASK), "send");  
    jtf.getActionMap().put("send", sendAction);  
    mainWin.setDefaultCloseOperation(JFrame.EXIT\_ON\_CLOSE);  
    mainWin.add(jp, BorderLayout.SOUTH);  
    mainWin.pack();  
    mainWin.setVisible(true);  
}

public void connect() throws Exception {  
    //用于读取数据的buffer  
    ByteBuffer buff = ByteBuffer.allocate(1024);  
    //创建一个线程池  
    ExecutorService executor = Executors.newFixedThreadPool(80);  
    //以指定线程池创建分组管理器  
    AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);  
    //以分组管理器创建AsynchronousSocketChannel  
    socketChannel =  AsynchronousSocketChannel.open(channelGroup);  
    //连接服务器  
    socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT)).get();  
    jta.append("\*\*\*与服务器连接成功\*\*\*\\n");  
    socketChannel.read(buff, null, new CompletionHandler<Integer, Object>() {

        @Override  
        public void completed(Integer result, Object attachment) {  
            buff.flip();  
            //将Buffer转换成字符串  
            String content = StandardCharsets.UTF\_8.decode(buff).toString();  
            //显示从服务器读取的数据  
            jta.append("某人说:"+content+"\\n");  
            buff.clear();  
            //为下一次读取数据做准备  
            socketChannel.read(buff, null, this);  
        }

        @Override  
        public void failed(Throwable exc, Object attachment) {  
            System.out.println("读取数据失败"+exc);  
        }  
    });  
}

public static void main(String\[\] args) throws Exception {  
    AIOClient aioClient = new AIOClient();  
    aioClient.init();  
    aioClient.connect();  
}  

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

package net;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.Buffer;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.charset.Charset;

import java.util.concurrent.Future;

public class SimpleAIOClient&nbsp;{

static final int PORT&nbsp;= 30000``;

public static void main(String[]&nbsp;args) throws Exception&nbsp;{

//用户读取数据的Buffer

ByteBuffer&nbsp;buff&nbsp;=&nbsp;ByteBuffer.allocate(``1024``);

Charset&nbsp;charset&nbsp;=&nbsp;Charset.forName(``"UTF-8"``);

try``(``//创建AsynchronousSocketChannel实例

AsynchronousSocketChannel&nbsp;socketChannel&nbsp;=&nbsp;AsynchronousSocketChannel.open();)

{

//连接到远程服务器

Future<Void>&nbsp;future&nbsp;=&nbsp;socketChannel.connect(``new InetSocketAddress(``"127.0.0.1"``,&nbsp;PORT));

future.get();

buff.clear();

//socketChannel中读取数据

Future<Integer>&nbsp;future1&nbsp;=&nbsp;socketChannel.read(buff);

future1.get();

buff.flip();

String&nbsp;content&nbsp;=&nbsp;charset.decode(buff).toString();

System.out.println(``"服务器:" +&nbsp;content);

} catch (IOException&nbsp;e)&nbsp;{

//&nbsp;TODO&nbsp;Auto-generated&nbsp;catch&nbsp;block

e.printStackTrace();

}

}

}

结果:

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章