Netty介绍与认识
阅读原文时间:2023年07月09日阅读:1

Netty是由JBOSS提供的一个java开源框架。
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

  2.体系结构图

Netty是典型的Reactor模型结构,在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认NioWorker的个数是当前服务器的可用核数)。

在处理新来的请求时,NioWorker读完已收到的数据到ChannelBuffer中,之后触发ChannelPipeline中的ChannelHandler流。

Netty是事件驱动的,可以通过ChannelHandler链来控制执行流向。因为ChannelHandler链的执行过程是在subReactor中同步的,所以如果业务处理handler耗时长,将严重影响可支持的并发数。

发一下Netty的架构

Server1

public static void main(String[] args) throws Exception {
//启动server服务
new NettyServer().bind(8089);
}

NettyServer

public void bind(int port) throws Exception {

    EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的  
    EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件  

    ServerBootstrap sb = new ServerBootstrap();  
    sb.group(bossGroup, workerGroup)  
            .channel(NioServerSocketChannel.class)  
            .option(ChannelOption.SO\_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128  
            .childOption(ChannelOption.SO\_KEEPALIVE, true) //保持长连接  
            .childHandler(new ChannelInitializer<SocketChannel>() {  // 绑定客户端连接时候触发操作  
                @Override  
                protected void initChannel(SocketChannel sh) throws Exception {  

// sh.pipeline().addLast(new StringEncoder(Charset.forName("gbk")));
// sh.pipeline().addLast(new StringDecoder(Charset.forName("gbk")));
sh.pipeline().addLast("decoder", new MyDecode());
sh.pipeline() .addLast(new ServerHandler()); //使用ServerHandler类来处理接收到的消息
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完
ChannelFuture future = sb.bind(port).sync();

    if (future.isSuccess()) {  
        System.out.println("服务端启动成功");  
    } else {  
        System.out.println("服务端启动失败");  
        future.cause().printStackTrace();  
        bossGroup.shutdownGracefully(); //关闭线程组  
        workerGroup.shutdownGracefully();  
    }  

    //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。  
    future.channel().closeFuture().sync();  

}  

MyDecode 是我自己编写的一个解析16进制的类,编码器

public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
byte[] b = new byte[byteBuf.readableBytes()];
//复制内容到字节数组b
byteBuf.readBytes(b);
//字节数组转字符串
String str = new String(b);
list.add(bytesToHexString(b));
}

public String bytesToHexString(byte\[\] bArray) {  
    StringBuffer sb = new StringBuffer(bArray.length);  
    String sTemp;  
    for (int i = 0; i < bArray.length; i++) {  
        sTemp = Integer.toHexString(0xFF & bArray\[i\]);  
        if (sTemp.length() < 2)  
            sb.append(0);  
        sb.append(sTemp.toUpperCase());  
    }  
    return sb.toString();  
}  

public static String toHexString1(byte\[\] b) {  
    StringBuffer buffer = new StringBuffer();  
    for (int i = 0; i < b.length; ++i) {  
        buffer.append(toHexString1(b\[i\]));  
    }  
    return buffer.toString();  
}  

public static String toHexString1(byte b) {  
    String s = Integer.toHexString(b & 0xFF);  
    if (s.length() == 1) {  
        return "0" + s;  
    } else {  
        return s;  
    }  
}  

public static byte\[\] hexString2Bytes(String hex) {  
    if ((hex == null) || (hex.equals(""))){  
        return null;  
    }else if (hex.length()%2 != 0){  
        return  null;  
    }else{  
        hex = hex.toUpperCase();  
        int len = hex.length()/2;  
        byte\[\] b = new byte\[len\];  
        char\[\] hc = hex.toCharArray();  
        for (int i=0; i<len; i++){  
            int p=2\*i;  
            b\[i\] = (byte) (charToByte(hc\[p\]) << 4 | charToByte(hc\[p+1\]));  
        }  
        return b;  
    }  
}  

private static byte charToByte(char c) {  
    return (byte) "0123456789ABCDEF".indexOf(c);  
}  

public static void main(String\[\] args) throws Exception {  
    String fan="烦";  
    String str = URLEncoder.encode(fan, "utf-8").replaceAll("%", "");  
    System.err.println(str);  

}  

}

ServerHandler

@Override  
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {  
    try {  
       System.out.println(msg.toString);  
    } catch (Exception e) {  
        e.printStackTrace();  
        System.err.println(e.getMessage());  
    }  
}  

// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// RpcRequest request = (RpcRequest) msg;
// System.out.println("接收到客户端信息:" + request.toString());
// //返回的数据结构
// RpcResponse response = new RpcResponse();
// response.setId(UUID.randomUUID().toString());
// response.setData("server响应结果");
// response.setStatus(1);
// ctx.writeAndFlush(response);
// }

//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用  
@Override  
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
    ctx.flush();  
}  

//读操作时捕获到异常时调用  
@Override  
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
    ctx.close();  
}  

//客户端去和服务端连接成功时触发  
@Override  
public void channelActive(ChannelHandlerContext ctx) throws Exception {  

    try {  
        int strLen = 34;  
        StringBuffer sb = null;  
        while (classpath.length() < strLen) {  
            sb = new StringBuffer();  
            //sb.append("0").append(\_16);// 左补0  
            sb.append(classpath).append("0");//右补0  
            classpath = sb.toString();  
        }  
        System.err.println("sb=" + sb);  
        String str = "ADBA000160002546323160ACC70014CB142701" + classpath + "FE";  
        String byte16 = "ADBA000120000B020195FE";  
        ByteBuf bytebuf = Unpooled.buffer(16);  
        ByteBuf bytebuf1 = Unpooled.buffer(16);  
        byte\[\] bytes = MyDecode.hexString2Bytes(byte16);  
        byte\[\] bytes1 = MyDecode.hexString2Bytes(str);  
        bytebuf1.writeBytes(bytes1);  
        bytebuf.writeBytes(bytes);  
        ctx.writeAndFlush(bytebuf);  
        for (int i = 0; i < 2; i++) {  
            if (i == 1) {  
                ctx.writeAndFlush(bytebuf + "\\n");  
            } else {  
                //ctx.writeAndFlush(bytebuf1 + "\\n");  
            }  
        }  
        ctx.flush();  
    } catch (Exception e) {  
        e.printStackTrace();  
        System.out.println(e.getMessage());  
    }  
}

  代码就上了,这样16进制和读不到的问题就解决了。