**本人博客网站 **IT小神 www.itxiaoshen.com
Netty官网 https://netty.io/ 最新版本为4.1.70.Final
Netty是一个异步的、事件驱动网络应用框架,用于快速开发可靠、可维护的高性能协议服务器和客户端。简单的说Netty是一个基于NIO的客户、服务器端的编程框架,它可以大大简化如TCP和UDP套接字的网络编程.
Netty由JBOSS提供的一个Java开源框架,现为 Github上的独立项目Netty从许多协议如FTP、SMTP、HTTP和各种基于二进制和文本的协议的实现精心设计从而兼顾实现了易于开发、性能、稳定性和灵活性。分为核心层、传输服务、协议支持。
最下面一层是Netty最底层最核心的部分包括零拷贝、API库、可扩展的事件模型;上面右边橙色部分Protocol Support协议支持,包括Http协议、WebSocket、SSL(安全套接字协议)、谷歌Protobuf协议、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等;红色的部分Transport Services传输服务,包括Socket、Datagram、Http Tunnel、In-VM Pipe等等。足以看出Netty的功能、协议、传输方式都比较全,比较强大。
Netty 是目前Java技术栈中最流行的、首选的 NIO 框架,性能和稳定性都有保障,社区比较活跃,基于 Netty 进行二次定制服务开发成本小,提供了简单易用的API从网络处理代码中解耦业务逻辑,且已得到成百上千的商业及商用项目验证,许多框架和开源组件的底层 rpc 都是使用的Netty,如Dubbo、Elasticsearch 、RocketMQ以及大数据Hadoop、Spark等等。下面是Netty官方描述的特性:
设计
易用性
性能
安全性
社区
从官方上看4.x版本是当前官方推荐,4.x版本目前也一直在维护中,3.x版本是比较旧的版本,跟4.x版本相比变化比较大,特别是API。5.x是被舍弃的版本,官方不再支持! Netty 5.0以前是发布alpha版,之前也有一部分书籍是基于Netty5来写的,从作者在GitHub上的回复得出:使用ForkJoinPool增加了复杂性,并且没有显示出明显的性能优势。同时保持所有分支的同步是一项相当大的工作,在当前的master中没有任何东西可以证明一个新的版本是合理的。
NIO不是Java独有的概念,实质上是为IO多路复用技术;它是由操作系统提供的系统调用,早期操作系统调用select,poll,但是性能低下,后来渐渐演化成了Linux下的epoll和Mac里的kqueue,使用最为广泛的是epoll;而Netty就是基于Java NIO技术封装的一套框架。为什么要封装,因为原生的Java NIO使用起来没那么方便,而且还有臭名昭著的bug,Netty把它封装之后,提供了一个易于操作的使用模式和接口,用户使用起来也就便捷多了。
关于NIO我们简单说明一下:
Netty 4.1.70 源码官网下载地址 https://github.com/netty/netty/archive/refs/tags/netty-4.1.70.Final.tar.gz
下载Netty,在Netty源码中example提供不同协议的样本代码示例,官网有有样例wiki说明,非常方便使用,各位伙伴可以根据具体场景选择使用。
echo:非常基本的客户机和服务器。
discard:了解如何异步发送无限数据流,而不会淹没写缓冲区。
uptime:实现自动重连机制。
telnet:一个经典的基于线路的网络应用程序。
securechat:一种基于tls的聊天服务器,由Telnet示例衍生而来。
objectecho:交换可序列化的Java对象。
factorial:使用自定义二进制协议编写有状态客户机和服务器。
worldclock:快速协议原型与谷歌协议缓冲区集成。
http snoop:构建自己的非常轻量级的HTTP客户机和服务器。
file:文件服务器,异步大文件流在HTTP。
http websocketx:使用Web Sockets向HTTP添加双向全双工通信通道
proxy:编写一个高效的隧道代理服务器。
udt bytes:在类似tcp的字节流模式下使用[UDT]
udt message:在类似udp的消息传递模式下使用[UDT]
udt rendezvousBytes:对称点对点会合连接模式下的字节流
udt rendezvous:对称点对点交会连接模式下的消息流
服务端示例代码EchoServer和EchoServerHandler
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.itxs.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.itxs.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("欢迎来到Java Netty开源世界,让我们一起学习吧!", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
客户端示例代码EchoServer和EchoServerHandler
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.itxs.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.itxs.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("大神你好,我想学习提升!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
启动服务端和客户端,输出如下
传统NIO缺点:
Netty优点:
Netty Channel和Jdk Nio包Channel关系简单一句话就是Netty包装Jdk Channel的对象,并设置为非阻塞模式
Netty 线程模型是典型的 Reactor 模型结构,其中常用的 Reactor 线程模型有三种,分别为:Reactor 单线程模型、Reactor 多线程模型和主从 Reactor 多线程模型。而在 Netty 的线程模型并非固定不变,通过在启动辅助类中创建不同的 EventLoopGroup 实例并通过适当的参数配置,就可以支持这三种 Reactor 线程模型。
//单线程模型简单代码示例
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
.channel(NioServerSocketChannel.class)
//多线程模型简单代码示例,bossGroup中只有一个线程,而workerGroup中的线程是CPU核心数乘以2,那么就对应Reactor的多线程模型。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
主从 Reactor 多线程模型:在并发极高的情况单独一个 Acceptor 线程可能会存在性能不足问题,为了解决性能问题,产生主从 Reactor 多线程模型。
//主从Reactor多线程模型简单代码示例
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
当系统在运行过程中,如果频繁的进行线程上下文切换,会带来额外的性能损耗。多线程并发执行某个业务流程,业务开发者还需要时刻对线程安全保持警惕,哪些数据可能会被并发修改,怎么保护?这不仅降低了开发效率,也会带来额外的性能损耗。
因此Netty采用了串行化设计理念,从消息的读取、编码以及后续 ChannelHandler 的执行,始终都由 IO 线程 EventLoop 负责,这就意外着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险。
EventLoopGroup 是一组 EventLoop 的抽象,一个 EventLoopGroup 当中会包含一个或多个 EventLoop,EventLoopGroup 提供 next 接口,可以从一组 EventLoop 里面按照一定规则获取其中一个 EventLoop 来处理任务。
EventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop ;一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;所有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;一个 Channel 在它的生命周期内只注册于一个 EventLoop;每一个 EventLoop 负责处理一个或多个 Channel;
在 Netty 服务器端编程中我们需要 Boss EventLoopGroup 和 Worker EventLoopGroup 这两个 EventLoopGroup 来进行工作。
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了 ServerSocketChannel 的 Selector 实例,EventLoop 的实现涵盖 IO 事件的分离和分发(Dispatcher),EventLoop 的实现充当 Reactor 模式中的分发(Dispatcher)的角色;所以通常可以将 BossEventLoopGroup 的线程数参数为 1。
BossEventLoop 只负责处理连接,故开销非常小,连接到来,马上按照策略将 SocketChannel 转发给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop 来将这 个SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。
粘包和半包,指的都不是一次是正常的 ByteBuf 缓存区接收。
Netty中的拆包器
IO模型、协议、线程模型(事件驱动、异步非阻塞、NIO多路复用非阻塞)都与性能强相关;IO通信性能三原则:传输(AIO)、协议(Http)、线程(主从Reactor线程模型)。
无锁串行化的设计理念:即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。
零拷贝:jdk bytebuffer:无法动态扩容,api使用复杂,读写切换时要手动调用flip和rewind等方法,capacity、readerindex、writerindex,支持顺序读写操作。
内存池管理,PoolByteBuf 是Netty内存池管理,比普通的new ByteBuf性能提高了数十倍。
高效并发编程:Volatile的大量、正确使用;CAS和原子类的广泛使用;线程安全容器的使用;通过读写锁提升并发性能。
对高性能序列化框架的支持,如protobuf。
灵活TCP参数调优。
传统意义的拷贝
是在发送数据的时候,传统的实现方式是:
File.read(bytes)
Socket.send(bytes)
这种方式需要四次数据拷贝和四次上下文切换:
零拷贝概念
明显上面的第二步和第三步是没有必要的,通过java的FileChannel.transferTo方法,可以避免上面两次多余的拷贝(当然这需要底层操作系统支持),下面的两次操作都不需要CPU参与,所以就达到了零拷贝。
Netty中的零拷贝主要体现在三个方面:
Composite Buffers:传统的ByteBuffer,如果需要将两个ByteBuffer中的数据组合到一起,我们需要首先创建一个size=size1+size2大小的新的数组,然后将两个数组中的数据拷贝到新的数组中。但是使用Netty提供的组合ByteBuf,就可以避免这样的操作,因为CompositeByteBuf并没有真正将多个Buffer组合起来,而是保存了它们的引用,从而避免了数据的拷贝,实现了零拷贝。
对于FileChannel.transferTo的使用:Netty中使用了FileChannel的transferTo方法,该方法依赖于操作系统实现零拷贝。
不改变原来buf只是做逻辑拆分、合并、包装,减少大量内存复制,并由此提升性能,使用 Netty 提供的 CompositeByteBuf 类, 可以将多个ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝。ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝。通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题。简单示例用法:
package com.itxs.main;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
public class AppMain {
public static void main(String[] args) {
byte[] arr = {1,2,3,4,5};
//wrappedBuffer方法:将byte数组包装ByteBuf对象
ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
System.out.println(byteBuf.getByte(3));
arr[3] = 6;
System.out.println(byteBuf.getByte(3));
ByteBuf byteBuf1 = Unpooled.wrappedBuffer("hello-netty".getBytes());
//slice方法:将一个ByteBuf对象切分为多个ByteBuf对象
ByteBuf sliceByteBuf = byteBuf1.slice(1, 2);
sliceByteBuf.unwrap();
System.out.println(sliceByteBuf.toString());ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
//CompositeByteBuf:将多个ByteBuf合并为一个逻辑上ByteBuf,避免各个ByteBuf之前的拷贝
CompositeByteBuf compositeByteBufNew = compositeByteBuf.addComponents(true, buffer1, buffer2);
System.out.println(compositeByteBufNew);
}
}
服务端流程
Channel
EventLoop
Channel和EventLoop关系
ChannelFuture
ChannelHandler和ChannelPipeline
EventloopGroup与EventLoop是什么关系?
EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含一个线程), EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理 ;且 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从而保证线程安全。
Boss EventloopGroup 用于接收连接,Worker EventloopGroup 用于具体的处理(消息的读写以及其他逻辑处理) ; 当客户端通过 connect 方法连接服务端时,BossGroup 处理客户端连接请求。当客户端处理完成后,会将这个连接提交给 WorkerGroup 来处理,然后 workerGroup 负责处理其 IO 相关操作。
**Bootstrap 和 ServerBootstrap **怎么理解?
Selector
Netty内部结构Selector位置如下图所示
自定义序列化编解码器
客户端启动类Bootstrap的connect过程有一些需要先配置比如核心的有netty的工作线程池EventLoopGroup和Netty Channel实现类、还有Channel内部Pipeline依赖处理器单元,配置完客户启动后先将Netty Channel的实现类进行反射创建出来一个Netty的Channel对象,初始化Channel内部Pipeline,将处理器单元也即是handler装载到内部的Channel管道里,后续IO操作可以用到,还需要一个线程池NioEventLoopGroup才能工作,将Channel注册EventLoop。使用Bootstrap创建启动器的步骤可分为以下几步:
NioEventLoop不单是一个线程,里面线程处理IO事件,当然也可以处理普通任务,因此有任务队列和NIO 核心类Selector,它调用操作系统函数完成一对多的监听,NioEventLoop类内部持有这个Selector对象来监听socket的,Channel注册到EventLoop其实底层就是将Netty Channel对象内部维护注册到的Jdk Nio Channel Selector对象。
创建后就会去执行自身的run方法,这里面是个死循环。EventLoop中维护一个Selector实例。
需要计算出一个IO选择策略,比如是使用Selector.select(阻塞到有socket就绪) select now(非阻塞),选择哪种策略主要看NioEventLoop任务队列内是否有本地任务执行,如果有调阻塞就不太优雅,因为这样会延迟非IO任务的执行。
接下来就要处理selectkeys集合他表示本次Selector刷选出来就绪状态的Channel,迭代key集合,从每个key拿到关联channel,再看是读就绪还是写就绪,比如读就绪,将socket缓冲区load到一个Bytebuf中,然后调用当前Channel的处理管道Pipeline传播读事件接口方法,也就是Pipeline的fireChannelRead方法,就这样从socket收到数据放到这个Channel的Pipeline中,在Pipeline中依次调用Inbound接口的Handler处理器进行读就绪事件处理,本次处理完IO事件后,NioEventLoop接下来会处理内部任务。
管道意思,可以安插任意处理单元,依次传递,in事件处理、out事件处理,加密、转换为json字符串、编码器,先写channel,最后会刷到channel关联的socket写缓存区,提供一个可插拔灵活的处理框架。
netty高可扩展性也正是来源pipeline责任链的设计模式。协议解码器:将二进制数据转为java对象;协议编码器:将java对象转为二进制数据;业务逻辑处理程序-执行实际的业务逻辑。
处理器Handler主要分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器),入站指的是数据从底层java NIO Channel到Netty的Channel。出站指的是通过Netty的Channel来操作底层的java NIO Channel。
在channel中装配ChannelHandler流水线处理器,一个channel可能需要多个channelHandler处理器和顺序的。pipeline相当于处理器的容器。初始化channel时,把channelHandler按顺序装在pipeline中,就可以实现按序执行channelHandler了。在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创建的时候创建。ChannelPipeline包含了一个ChannelHander形成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。
ChannelInboundHandlerAdapter处理器常用的事件有:
ChannelOutboundHandler处理器常用的事件有:
每个处理单元的Handler对象都有一个int类型的mask字段,表示当前handler对上层Inbound或Outbound指定这个接口方法有没有复写,有为1无为0,主要可以避免一些空调用,可以找到后面第一个实现事件Handler,可以跳过没有实现响应事件的Handler。
主要是Promise接口,他是Future接口的一个增强,submit返回future句柄, get获取任务结果阻塞线程,原因内部没有线程资源,Netty Promise实现类ChannelPromise内部有一个Channel对象,注册到NioEventLoop,同事也就持有NioEventLoop,内部有线程资源,Channel Promise有线程资源了,所以Channel Promise可以注册一些Listener对象,等任务完成后可以处理后续事情。
NioEventLoopGroup 默认的构造函数实际会起的线程数为 CPU核心数*2。
NioEventLoopGroup,NioEventLoop池和单个线程的关系,group线程池有next方法返回EventLoop线程对象,内部线程Thread是延迟创建的,虽然先创建出来,但是NioEventLoop内部线程对象并不会创建,等接到第一个任务才创建,NioEventLoop不像是单线程,一个线程的线程池(single exector方法),EventLoop内部有队列,可以做Selector工作也做普通任务,NioEventLoop内部队列也做了优化,原来采用LinkedBlockingQueue+Condition条件队列,Netty没有使用JDK JUC的Queue而是使用JcTools的Queue,单多生产者但多消费者场景,背后也应该是CAS实现。
每个NioEventLoopGroup对象内部都会分配一组NioEventLoop,其大小是 nThreads, 这样就构成了一个线程池, 一个NIOEventLoop 和一个线程相对应,这和我们上面说的 EventloopGroup 和 EventLoop关系这部分内容相对应。
在 TCP 保持长连接的过程中,可能会出现断网等网络异常出现,异常发生的时候, client 与 server 之间如果没有交互的话,它们是无法发现对方已经掉线的。为了解决这个问题, 我们就需要引入心跳机制。
心跳机制的工作原理是: 在 client 与 server 之间在一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器就会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互。所以, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.
TCP 实际上自带的就有长连接选项,本身是也有心跳包机制,也就是 TCP 的选项:SO_KEEPALIVE。但是,TCP 协议层面的长连接灵活性不够。所以,一般情况下我们都是在应用层协议上实现自定义心跳机制的,也就是在 Netty 层面通过编码实现。通过 Netty 实现心跳机制的话,核心类是 IdleStateHandler 。
如果Handler处理器有一些长时间的业务处理,可以交给taskQueue异步处理,代码使用示例如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
ctx.channel().eventLoop().execute(() -> {
try {
Thread.sleep(2000);
System.out.println("耗时业务处理");
} catch (Exception e) {
e.printStackTrace();
}
});
}
通过debug调试就可以看到taskQueue有一个刚刚添加进去的任务
延时任务队列和上面任务队列非常相似,只是多了一个可延迟一定时间再执行的设置,代码使用示例如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//长时间操作,不至于长时间的业务操作导致Handler阻塞
Thread.sleep(1000);
System.out.println("长时间的业务处理");
} catch (Exception e) {
e.printStackTrace();
}
}
},10, TimeUnit.SECONDS);//10秒后执行
}
同样在debug进行调试可以查看到有一个scheduleTaskQueue任务待执行中
手机扫一扫
移动阅读更方便
你可能感兴趣的文章