Netty是一款用于创建高性能网络应用程序的高级框架。它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序性能的最大化和可伸缩性。
其次,Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦,简化了开发过程,同时也最大限度地提高了可测试性、模块化以及代码的可重用性。
Netty主要包含以下几个组件:
个人理解:Channel类似对Socket的封装,代表一个网络连接(类似WebServer项目中的http_conn类),可以进行读写操作。
以往基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提供的原语。而Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,常用的 Channel 类型:
NioSocketChannel,NIO的客户端 TCP Socket 连接。
NioServerSocketChannel,NIO的服务器端 TCP Socket 连接。
NioDatagramChannel, UDP 连接。
NioSctpChannel,客户端 Sctp 连接。
NioSctpServerChannel,Sctp 服务器端连接。
这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
有了 Channel 连接服务,连接之间可以消息流动。如果服务器发出的消息称作“出站”消息,服务器接受的消息称作“入站”消息。那么消息的“出站”/“入站”就会产生事件(Event)。
例如:连接已激活;数据读取;用户事件;异常事件;打开链接;关闭链接等等。
有了事件,就需要一个机制去监控和协调事件,这个机制(组件)就是EventLoop。
Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:
每个 Channel 都会被分配到一个 EventLoop。一个 EventLoop 可以服务于多个 Channel。
每个 EventLoop 会占用一个 Thread,同时这个 Thread 会处理 EventLoop 上面发生的所有 IO 操作和事件。
个人理解,可以把EventLoop看作一个线程,而EventLoopGroup就是一个线程池。
EventLoopGroup通过负载均衡算法选择某个EventLoop去绑定Channel。
EventLoop通过其Selector选择器(类似epoll)去监听其绑定的Channel上发生的事件,并根据事件类型的不同,交给相应的ChannelHandler处理。
在Netty的线程模型为基于事件驱动的 Reactor 模型,可以通过配置实现几种不同类型的Reactor模型,如 单线程模型、多线程模型和主从线程模型等。因此可能用到数量不同的
EventLoopGroup
线程池,如主从线程模型就会用到两个EventLoopGroup
,一个用于监听从客户端发来的连接,一个用于读写数据、处理业务等。
EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系如下图所示:
这些关系是:
EventLoopGroup
包含一个或者多个 EventLoop
;EventLoop
在它的生命周期内只和一个 Thread
绑定;EventLoop
处理的 I/O 事件都将在它专有的 Thread
上被处理;Channel
在它的生命周期内只注册于一个 EventLoop
;EventLoop
可能会被分配给一个或多个 Channel
。注意,在这种设计中,一个给定 Channel
的 I/O 操作都是由相同的 Thread
执行的,实际上消除了对于同步的需要。
EventLoopGroup类似于一个线程池,其中放着多个EventLoop,每个EventLoop都是一个单独的线程,并且每个EventLoop都有一个Selector
,用于监听Channel状态的变化(类似epoll)。
EventLoop
监听Channel
的实现原理如下图所示:
个人思考:为了便于理解,可以视为每个工作线程(EventLoop)都有属于自己的epoll(Selector)。
相比之下,之前的WebServer项目所有线程共用一个epoll:监听客户端连接的主线程,以及【解析请求、产生响应】的工作线程都共用一个epoll。
在web Server中:主线程调用
epoll_wait
等待客户端的连接,对于新到的连接调用accept
系统调用生成一个用于通信的socket文件描述符,并把这个文件描述符与一个类似Channel的http连接对象(自己封装的一个类)进行绑定,然后注册进epoll。当有网络中有数据到达这个用于通信的socket文件描述符的读缓冲区,或服务器已经正确生成response并注册了写就绪事件时,epoll会将这个文件描述符拿出,交给一个工作线程执行相应的操作。由于工作线程与主线程共用一个epoll,因此一个工作线程也只能处理一个连接。
在Netty中:如果使用主从线程模型,此时需要两个EventLoopGroup,第一组将只包含一个 ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel(类似用于通信的socket文件描述符),如下图所示。
与 ServerChannel 相关联的 EventLoopGroup 将分配一个负责为传入的连接创建Channel 的EventLoop。一旦连接被接受,第二个EventLoopGroup 就会给它的 Channel分配一个 EventLoop。
与WebServer项目不同的是,在该线程模型中,Netty中每个工作线程EventLoop都有自己的Selector(可理解为epoll),因此一个EventLoop可以处理多个Channel。
为方便理解,贴上书中的两段示例代码:
未使用 Netty 的异步网络编程
//总体流程与WebServer项目中的main.cpp一致
public class PlainNioServer
{
public void serve(int port) throws IOException
{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ssocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
//将服务器绑定到选定的端口
ssocket.bind(address);
//打开Selector来处理Channel,只有一个Selector
Selector selector = Selector.open();
//将ServerSocket注册到Selector以接收连接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;)
{
try
{
//阻塞,等待需要处理的新事件
selector.select();
}
catch (IOException ex)
{
ex.printStackTrace();
// handle exception
break;
}
//获取所有接收事件的SelectionKey 实例
Set
Iterator
while (iterator.hasNext())
{
SelectionKey key = iterator.next();
iterator.remove();
try
{
//检查事件是否是一个新的已经就绪可以被接受的连接(相当于判断从epoll中取出的socketfd是否为当前服务端的fd)
if (key.isAcceptable())
{
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
//accept客户端
SocketChannel client = server.accept();
client.configureBlocking(false);
//将客户端socket注册到Selector选择器
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate());
System.out.println(
"Accepted connection from " + client);
}
//检查之前接收的socket是否已经准备好写数据
if (key.isWritable())
{
SocketChannel client =
(SocketChannel)key.channel();
ByteBuffer buffer =
(ByteBuffer)key.attachment();
while (buffer.hasRemaining())
{
//写数据到socket
if (client.write(buffer) == 0)
{
break;
}
}
//关闭连接
client.close();
}
}
catch (IOException ex)
{
key.cancel();
try
{
key.channel().close();
}
catch (IOException cex)
{
// ignore on close
}
}
}
}
}
}
使用 Netty 的异步网络处理
public class NettyNioServer
{
public void server(int port) throws Exception
{
final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n",
Charset.forName("UTF-8"));
//为非阻塞模式使用NioEventLoopGroup
//Netty 内置了一些可开箱即用的传输,不仅有NIO,还有Netty自己实现的epoll等
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//此处没有使用Reactor主从线程模型,只用了一个EventLoopGroup线程池
b.group(group).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
//指定 ChannelInitializer,对于每个已接受的连接都调用它
.childHandler(new ChannelInitializer
{
@Override
public void initChannel(SocketChannel ch)
throws Exception
{
//添加 ChannelInboundHandlerAdapter 以接收和处理事件
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter()
{
@Override
public void channelActive(
ChannelHandlerContext ctx) throws Exception
{
//将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
ctx.writeAndFlush(buf.duplicate())
.addListener(
ChannelFutureListener.CLOSE);
}
});
}
});
//绑定服务器以接受连接
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
//释放所有的资源
group.shutdownGracefully().sync();
}
}
}
Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了ChannelFuture
接口,其 addListener()
方法注册了一个ChannelFutureListener
,以便在某个操作完成时(无论是否成功)得到通知。
说人话就是ChannelFuture用于在异步操作执行完成后,记录结果并发送通知。
从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler
,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
ChannelHandler
的方法是由网络事件(其中术语“事件”的使用非常广泛)触发的。事实上,ChannelHandler
可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,或者处理转换过程中所抛出的异常。
将多个ChannelHandler串联起来,形成一个处理链,用于处理Channel
中的事件。当 Channel
被创建时,它会被自动地分配到它专属的 ChannelPipeline
。
ChannelHandler安装到 ChannelPipeline
中的过程如下所示(流程与上面Netty 的异步网络处理代码一致):
ChannelInitializer
的实现被注册到了ServerBootstrap中 ;ChannelInitializer.initChannel()
方法被调用时,ChannelInitializer
将在 ChannelPipeline
中安装一组自定义的 ChannelHandler;ChannelInitializer
将它自己从 ChannelPipeline
中移除。 ChannelHandler 的触发顺序是根据 pipeline().addLast()
的注册顺序来确定的。在 Netty 中,ChannelPipeline 是一个事件处理的链表,每个 ChannelHandler 负责处理特定的事件。当一个事件被触发时,Netty 会按照 ChannelPipeline 中 ChannelHandler 的添加顺序依次调用它们的事件处理方法。因此,如果想要控制事件处理的顺序,可以通过添加 ChannelHandler 的顺序来实现。
当ChannelHandler被添加到ChannelPipeline时,它将会被分配一个
ChannelHandlerContext
。通过使用作为参数传递到每个方法的ChannelHandlerContext
,事件可以被传递给当前ChannelHandler链中的下一个 ChannelHandler。因为你有时会忽略那些不感兴趣的事件,所以 Netty提供了抽象基类ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapters
。通过调用ChannelHandlerContext
上的对应方法,每个都提供了简单地将事件传递给下一个ChannelHandler的方法的实现。随后,你可以通过重写你所感兴趣的那些方法来扩展这些类。 个人理解:可以将ChannelHandlerContext类比为Linux中的socket文件描述符。在Netty框架中,ChannelHandlerContext是一个上下文对象,用于表示一个通道的上下文信息,包括该通道的事件处理器、通道的配置信息以及与该通道相关的一些状态信息。类似于Linux中的socket文件描述符,它可以用于发送和接收数据,以及进行其他与通道相关的操作。但是需要注意的是,ChannelHandlerContext并不是一个操作系统底层的文件描述符,而是Netty框架中的一个抽象概念。
具体实例会在后面给出。
Bootstrap和ServerBootstrap分别用于启动客户端和服务端。
在Netty中,有两种类型的引导:一种用于客户端(简单地称为 Bootstrap),而另一种(ServerBootstrap)用于服务器。它们都包含EventLoopGroup和一个ChannelPipeline,用于管理连接和处理事件。无论你的应用程序使用哪种协议或者处理哪种类型的数据,唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。
1. 创建一个EventLoopGroup:EventLoopGroup是Netty中的一个线程池,用于处理所有的I/O操作。
2.创建一个ServerBootstrap(客户端为Bootstrap):ServerBootstrap是Netty中用于创建服务器的类,它包含了一些配置项,如端口号、协议等。
3.配置ServerBootstrap:通过ServerBootstrap的一些方法,如group()、channel()、option()、handler()等,来配置ServerBootstrap。
4.绑定端口号:调用ServerBootstrap的bind()方法来绑定端口号,启动服务器。
5.创建一个ChannelPipeline:当一个客户端连接到服务器时,Netty会为该连接创建一个Channel对象,每个Channel对象都包含一个ChannelPipeline,ChannelPipeline是一个事件处理器链,它包含了一系列的ChannelHandler。
6.添加ChannelHandler:通过调用ChannelPipeline的addLast()方法,将自定义的ChannelHandler添加到ChannelPipeline中。
7.处理事件:当有事件发生时,Netty会将事件封装成一个对象,并将该对象传递给ChannelPipeline中的第一个ChannelHandler,然后由ChannelHandler处理事件,或将事件传递给下一个ChannelHandler。
8.发送响应:当ChannelHandler处理完事件后,可以通过ChannelHandlerContext的write()方法将响应发送给客户端。
9.关闭连接:当客户端关闭连接时,Netty会将该事件封装成一个对象,并将该对象传递给ChannelPipeline中的最后一个ChannelHandler,然后由ChannelHandler处理该事件,释放资源。
Netty使用了典型的三层网络架构,Reactor 通信调度层 -> 职责链 PipeLine -> 业务逻辑处理层
Reactor层主要监听网络的读写和连接操作,负责将网络层的数据 读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读事 件、写事件等等,将这些事件触发到 PipeLine 中,由 PipeLine 充当的职责链来 进行后续的处理。涉及到的类包括:Reactor 线程 NioEventLoop 以及其父类、NioSocketChannel/NioServerSocketChannel 以及其父 类、ByteBuffer 以及由其衍生出来的各种 Buffer、Unsafe 以及其衍生出的各种内 部类等
Pipeline层负责事件在职责链中有序的传播,职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向 后 / 向前传播事件,不同的应用的 Handler 节点的功能也不同,通常情况下,往往会开发编解码 Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部 的 POJO 对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层 的协议差异和线程模型差异,实现了架构层面的分层隔离。
Service层有纯粹的业务逻辑 处理,例如订单处理;也有应用层协议管理,例如 HTTP 协议、FTP 协议等(在书中还未看到)。
Echo服务器与客户端的主要功能为:服务器接收到客户端发送的请求,将消息原封不动返回。如下图所示:
所有的 Netty 服务器都需要以下两部分。
在本小节的剩下部分,我们将描述 Echo 服务器的业务逻辑以及引导代码。
<groupId>org.example</groupId>
<artifactId>netty-demo-1</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>server</module>
<module>client</module>
</modules>
<!-- ...省略引入的依赖及插件等 -->
server模块包含两个.java文件,其中EchoServer为服务器的主流程,EchoServerHandler为业务逻辑。
具体作用如注释所示:
//EchoServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/*
* @Sharable:标示一个ChannelHandler 可以被多个 Channel 安全地共享
* EchoServerHandler:处理业务逻辑
* */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter { //继承 ChannelInboundHandlerAdapter 类,以定义响应入站事件的方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
//客户端写来的消息会被放入msg中
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
//将收到的msg原封不动写回去
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
{
//ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) 表示将一个空的缓冲区写入到当前的 ChannelHandlerContext 中,并将其发送给客户端。
//addListener(ChannelFutureListener.CLOSE) 表示在发送完成后添加一个监听器,当发送完成后会自动关闭当前的连接。
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
cause.printStackTrace();
ctx.close();
}
}
//EchoServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
public class EchoServer {
private final int port;
public EchoServer(int port)
{
this.port = port;
}
public static void main(String[] args) throws Exception
{
if(args.length != 1)
{
//设置端口值(如果端口参数的格式不正确,则抛出一个NumberFormatException)
System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
int port = Integer.parseInt(args[0]);
//调用服务器的start方法
new EchoServer(port).start();
}
}
public void start() throws Exception
{
final EchoServerHandler serverHandler = new EchoServerHandler();
//创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class) //适用于NIO传输的Channel类型
.localAddress(new InetSocketAddress(port)) //设置socket端口和地址,服务器绑定该地址监听新的请求
//此处当一个新的连接被接受时,一个新的子 Channel 将会被创建
//而 ChannelInitializer 将会把一个你的EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。
//如EchoServerHandler类所示,这个 ChannelHandler 将会收到有关入站消息的通知
.childHandler(new ChannelInitializer<SocketChannel>(){ //此处SocketChannel是netty的不是nio的
//添加一个EchoServerHandler 到子Channel的 ChannelPipeline
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
//异步地绑定服务器,调用sync()方法阻塞等待,直到绑定完成
ChannelFuture f = b.bind().sync();
//获取 Channel 的CloseFuture,并且阻塞当前线程直到它完成
f.channel().closeFuture().sync();
}
finally {
//关闭 EventLoopGroup,释放所有的资源
group.shutdownGracefully().sync();
}
}
}
client模块同样包含两个.java文件,其中EchoClient为客户端的主流程,EchoClientHandler为业务逻辑。
//EchoClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable //标记该类的实例可以被多个 Channel 共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>
{
/*
channelActive()——在到服务器的连接已经建立之后将被调用;
这确保了数据将会被尽可能快地写入服务器,其在这个场景下是一个编码了字符串"Netty rocks!"的字节缓冲区
channelRead0() —当从服务器接收到一条消息时被调用;
需要注意的是,由服务器发送的消息可能会被分块接收。
也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被一次性接收。
即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有 3 字节的 ByteBuf(Netty 的字节容器),第二次使用一个持有 2 字节的 ByteBuf。
exceptionCaught()——在处理过程中引发异常时被调用。
*/
@Override
public void channelActive(ChannelHandlerContext ctx)
{
/*
Netty 中的 Unpooled 是一个工具类,用于创建不需要池化的 ByteBuf 实例。
ByteBuf 是 Netty 中的缓冲区类型,用于存储数据。
与池化的 ByteBuf 实例不同,不需要池化的 ByteBuf 实例是一次性的,使用完后会被释放。
因此,如果你需要创建一些临时的 ByteBuf 实例,可以使用 Unpooled 来创建。
例如,如果你需要将一个字符串转换为 ByteBuf,
可以使用 Unpooled.copiedBuffer(str, Charset.defaultCharset()) 方法来创建一个不需要池化的 ByteBuf 实例。
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)
{
cause.printStackTrace();
ctx.close();
}
}
//EchoClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host,int port)
{
this.host = host;
this.port = port;
}
public void start() throws Exception
{
EventLoopGroup group = new NioEventLoopGroup();
try
{
Bootstrap b = new Bootstrap();
//指定 EventLoopGroup 以处理客户端事件;需要适用于 NIO 的实现
b.group(group)
//适用于NIO传输的Channel类型,注意不是NioServerSocketChannel
.channel(NioSocketChannel.class)
//设置服务器的InetSocketAddress
.remoteAddress(new InetSocketAddress(host,port))
//在创建Channel时,向 ChannelPipeline中添加一个 EchoClientHandler 实例
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync(); //阻塞,直到 Channel 关闭
}
finally {
//关闭线程池并且释放所有的资源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception
{
if(args.length != 2)
{
System.err.println(
"Usage: " + EchoClient.class.getSimpleName() + "<host><port>"
);
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host,port).start();
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章