前两节中,我们已经获取了body的总长度,剩下的就是读出body,处理请求
ChannelServerHandler即从channel中读取请求,也向channle输出结果,因此它实现了InboundHandler, OutboundHandler
/**
* 读取请求的内容,业务处理
*/
public class ChannelServerHandler implements CompletionHandler<Integer, ByteBuffer>, InboundHandler, OutboundHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ChannelServerHandler.class);
private AsynchronousSocketChannel channel;
public ChannelServerHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
public void completed(Integer result, ByteBuffer attachment) {
//如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了
if (result == -1) {
System.out.println("remote is close");
closeChannel();
return;
}
Object resultData;
String req = (String) read(channel, attachment);
if (req == null) {
closeChannel();
return;
}
try {
LOGGER.info("socket:{}", channel.getRemoteAddress());
//同步处理请求
RequestHandler requestHandler = ApplicationUtils.getBean(RequestHandler.class);
resultData = requestHandler.execute(req);
} catch (Throwable t) {
resultData = Result.error("ERROR", Utils.error(t));
LOGGER.error("调用接口失败", t);
}
if (resultData == null) {
resultData = Result.failure("FAILURE", "调用失败,数据为空.");
}
try {
String resultContent = resultData instanceOf String ? (String) resultData : JSON.toJSONString(resultData);
byte[] bytes = resultContent.getBytes("UTF-8");
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
write(channel, writeBuffer);
} catch (Exception e) {
LOGGER.error("对象转JSON失败,对象:{}", resultData, e);
}
closeChannel();
}
@Override
public Object read(AsynchronousSocketChannel socketChannel, ByteBuffer in) {
in.flip();
byte[] body = new byte[in.remaining()];
in.get(body);
String req = null;
try {
req = new String(body, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return req;
}
@Override
public Object write(AsynchronousSocketChannel socketChannel, ByteBuffer out) {
//write,write操作结束后关闭通道
channel.write(out, out, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
closeChannel();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel();
}
});
return null;
}
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel();
}
private void closeChannel() {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
in.flip();
byte[] body = new byte[in.remaining()];
in.get(body);
String req = null;
try {
req = new String(body, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return req;
buffer中含有的字节数
客户端、服务端由于跨语言和经验问题,没有使用复杂的跨语言序列化技术,双方约定使用UTF-8编码,通过将body转换为String,最终获得了客户端传递的字符串。
经过自定义的请求处理逻辑,同步处理,最终将响应编码后,发送给客户端,write操作结束后,关闭连接
使用AIO开发服务端时,主要涉及
长连接该如何处理
/**
* mvn -Dtest=com.jd.jshop.web.sdk.test.ClientTest#pingReqSocket test
*
* @throws IOException
*/
@Test
@PerfTest(invocations = 20000, threads = 50)
public void pingReqSocket() throws IOException {
byte[] content = "ping".getBytes("UTF-8");
String result = sendReq(content);
//断言 是否和预期一致
Assert.assertEquals("pong", result);
}
private String sendReq(byte[] content) throws IOException {
ByteBuffer writeBuffer = ByteBuffer.allocate(4 + content.length);
writeBuffer.putInt(content.length);
writeBuffer.put(content);
writeBuffer.flip();
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", 9801));
socket.getOutputStream().write(writeBuffer.array());
socket.getOutputStream().flush();
byte[] buf = new byte[1024];
int len = 0;
String result = null;
while ((len = socket.getInputStream().read(buf)) != -1) {
result = new String(buf, 0, len);
System.out.println(result);
}
return result;
}
测试的方法是,在服务器上建立socket连接,向server发送ping,server返回pong
测试服务器:centos, 2个物理核,4个逻辑核,内存16G
分析aio的实现:
在ping-pong测试中性能极高,优于并甩开netty
以下是使用Netty开发的server端的测试用例,可以和上面的图片对比一下
Measured invocations: 10,000
Thread Count: 10
Measured
(system) Required
Execution time: 1,646 ms
Throughput: 6,075 / s
Min. latency: 0 ms
Average latency: 1 ms
Median: 2 ms
90%: 2 ms
Max latency: 26 ms
============================
Started at: Oct 16, 2018 5:27:03 PM
Measured invocations: 20,000
Thread Count: 20
Measured
(system) Required
Execution time: 3,293 ms
Throughput: 6,073 / s
Min. latency: 0 ms
Average latency: 3 ms
Median: 3 ms
90%: 5 ms
Max latency: 54 ms
============================
Started at: Oct 16, 2018 5:28:24 PM
Measured invocations: 20,000
Thread Count: 10
Measured
(system) Required
Execution time: 3,051 ms
Throughput: 6,555 / s
Min. latency: 0 ms
Average latency: 1 ms
Median: 1 ms
90%: 2 ms
Max latency: 44 ms
============================
Started at: Oct 16, 2018 5:30:06 PM
Measured invocations: 20,000
Thread Count: 50
Measured
(system) Required
Execution time: 3,167 ms
Throughput: 6,315 / s
Min. latency: 0 ms
Average latency: 7 ms
Median: 7 ms
90%: 10 ms
Max latency: 64 ms
分析基于Netty的实现:
吞吐量:6000+/s
10个线程时:90%低于2ms,平均1ms
20个线程时:90%低于5ms,平均3ms
50个线程时:90%低于10ms,平均7ms
线程越多,性能越低
当前测试用例不太依赖内存
执行10000+次请求,建立10000+连接,要求服务器对单个进程fd限制打开,防止报too many open files导致测试用例执行失败
ulimit -n 20240
手机扫一扫
移动阅读更方便
你可能感兴趣的文章