Netty高性能网络应用框架对标P7面试题分享v4.1.70.Final
阅读原文时间:2021年11月28日阅读:9

概述

**本人博客网站 **IT小神 www.itxiaoshen.com

Netty官网 https://netty.io/ 最新版本为4.1.70.Final

Netty是一个异步的、事件驱动网络应用框架,用于快速开发可靠、可维护的高性能协议服务器和客户端。简单的说Netty是一个基于NIO的客户、服务器端的编程框架,它可以大大简化如TCP和UDP套接字的网络编程.

Netty由JBOSS提供的一个Java开源框架,现为 Github上的独立项目Netty从许多协议如FTP、SMTP、HTTP和各种基于二进制和文本的协议的实现精心设计从而兼顾实现了易于开发、性能、稳定性和灵活性。分为核心层、传输服务、协议支持。

最下面一层是Netty最底层最核心的部分包括零拷贝、API库、可扩展的事件模型;上面右边橙色部分Protocol Support协议支持,包括Http协议、WebSocket、SSL(安全套接字协议)、谷歌Protobuf协议、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等;红色的部分Transport Services传输服务,包括Socket、Datagram、Http Tunnel、In-VM Pipe等等。足以看出Netty的功能、协议、传输方式都比较全,比较强大。

Netty 是目前Java技术栈中最流行的、首选的 NIO 框架,性能和稳定性都有保障,社区比较活跃,基于 Netty 进行二次定制服务开发成本小,提供了简单易用的API从网络处理代码中解耦业务逻辑,且已得到成百上千的商业及商用项目验证,许多框架和开源组件的底层 rpc 都是使用的Netty,如Dubbo、Elasticsearch 、RocketMQ以及大数据Hadoop、Spark等等。下面是Netty官方描述的特性:

  • 设计

    • 提供各种传输类型的统一API,使用阻塞和非阻塞套接字时候使用的是同一个 API,只是需要设置的参数不一样。
    • 基于灵活和可扩展的事件模型明确实现关注点的分离。
    • 高度可定制的线程模型-单线程,一个或多个线程池,如SEDA。
    • 真正的无连接数据报套接字支持(UDP,从3.1开始)。
  • 易用性

    • 完善的 Javadoc 文档和用户指南、示例代码。
    • 不需要额外的依赖,JDK 5 (Netty 3.x)或 JDK 6 (Netty 4.x)就足够了。
    • 注意:一些组件(如HTTP/2)可能有更多的要求。更多信息请参阅需求页面。
  • 性能

    • 更好的吞吐量,更低的延迟。
    • 更少的资源消耗。
    • 最小化不必要的内存拷贝。
  • 安全性

    • 完整的SSL/TLS和StartTLS支持
  • 社区

    • 发布的更早和更频繁。
    • 社区驱动,作者自2003年以来一直在编写类似的框架,关注反馈。

从官方上看4.x版本是当前官方推荐,4.x版本目前也一直在维护中,3.x版本是比较旧的版本,跟4.x版本相比变化比较大,特别是API。5.x是被舍弃的版本,官方不再支持! Netty 5.0以前是发布alpha版,之前也有一部分书籍是基于Netty5来写的,从作者在GitHub上的回复得出:使用ForkJoinPool增加了复杂性,并且没有显示出明显的性能优势。同时保持所有分支的同步是一项相当大的工作,在当前的master中没有任何东西可以证明一个新的版本是合理的。

NIO不是Java独有的概念,实质上是为IO多路复用技术;它是由操作系统提供的系统调用,早期操作系统调用select,poll,但是性能低下,后来渐渐演化成了Linux下的epoll和Mac里的kqueue,使用最为广泛的是epoll;而Netty就是基于Java NIO技术封装的一套框架。为什么要封装,因为原生的Java NIO使用起来没那么方便,而且还有臭名昭著的bug,Netty把它封装之后,提供了一个易于操作的使用模式和接口,用户使用起来也就便捷多了。

关于NIO我们简单说明一下:

  • 客户端监听(Listen)时,Accept是阻塞的,只有新连接来了,Accept才会返回,主线程才能继。
  • 读写socket时,Read是阻塞的,只有请求消息来了,Read才能返回,子线程才能继续处理。
  • 读写socket时,Write是阻塞的,只有客户端把消息收了,Write才能返回,子线程才能继续读取下一个请求。
  • 传统的BIO模式下,从头到尾的所有线程都是阻塞的,这些线程就干等着,占用系统的资源,什么事也不干。
  • 那么NIO首先是做到非阻塞,采用的是事件机制,通过线程Accept、读写操作,请求处理等;如果什么事都没得做也不会死循环,它会将线程休眠起来,直到下一个事件来了再继续干活。

Netty入门示例

Netty 4.1.70 源码官网下载地址 https://github.com/netty/netty/archive/refs/tags/netty-4.1.70.Final.tar.gz

Netty GitHub下载地址 https://github.com/netty/netty

下载Netty,在Netty源码中example提供不同协议的样本代码示例,官网有有样例wiki说明,非常方便使用,各位伙伴可以根据具体场景选择使用。

  • echo:非常基本的客户机和服务器。

  • discard:了解如何异步发送无限数据流,而不会淹没写缓冲区。

  • uptime:实现自动重连机制。

  • telnet:一个经典的基于线路的网络应用程序。

  • securechat:一种基于tls的聊天服务器,由Telnet示例衍生而来。

  • objectecho:交换可序列化的Java对象。

  • factorial:使用自定义二进制协议编写有状态客户机和服务器。

  • worldclock:快速协议原型与谷歌协议缓冲区集成。

  • http snoop:构建自己的非常轻量级的HTTP客户机和服务器。

  • file:文件服务器,异步大文件流在HTTP。

  • http websocketx:使用Web Sockets向HTTP添加双向全双工通信通道

  • proxy:编写一个高效的隧道代理服务器。

  • udt bytes:在类似tcp的字节流模式下使用[UDT]

  • udt message:在类似udp的消息传递模式下使用[UDT]

  • udt rendezvousBytes:对称点对点会合连接模式下的字节流

  • udt rendezvous:对称点对点交会连接模式下的消息流

    io.netty netty-all 4.1.70.Final

服务端示例代码EchoServer和EchoServerHandler

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("欢迎来到Java Netty开源世界,让我们一起学习吧!", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

客户端示例代码EchoServer和EchoServerHandler

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

/**
 * Sends one message when a connection is open and echoes back any received
 * data to the server.  Simply put, the echo client initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}


/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("大神你好,我想学习提升!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

启动服务端和客户端,输出如下

面试题

  • 传统NIO缺点:

    • NIO的类库和API繁杂,学习成本高,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
    • 需要熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉才能写出高质量的NIO程序;还需要考虑考虑断连重连、半包读写、失败缓存等问题处理。
    • 臭名昭著的epoll bug。它会导致Selector空轮询,最终导致CPU 100%。JDK NIO的bug,直到JDK1.7版本依然没得到根本性的解决。
  • Netty优点:

    • 异步事件驱动框架,可快速开发高性能的服务端和客户端.
    • API使用简单,学习成本低。封装了JDK底层BIO和NIO模型,提供更加简单易用安全的 API.
    • 功能强大,内置了多种解码编码器,自带编解码器解决拆包粘包问题,无需用户困扰,支持多种协议,自带各种协议栈。
    • 性能高,对比其他主流的NIO框架,Netty的性能最,Reactor线程模型支持高并发海量连接.
    • 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
    • Dubbo、Elasticsearch都采用了Netty,质量得到验证。

Netty Channel和Jdk Nio包Channel关系简单一句话就是Netty包装Jdk Channel的对象,并设置为非阻塞模式

Netty 线程模型是典型的 Reactor 模型结构,其中常用的 Reactor 线程模型有三种,分别为:Reactor 单线程模型、Reactor 多线程模型和主从 Reactor 多线程模型。而在 Netty 的线程模型并非固定不变,通过在启动辅助类中创建不同的 EventLoopGroup 实例并通过适当的参数配置,就可以支持这三种 Reactor 线程模型。

  • Reactor 单线程模型:Reactor 单线程模型指的是所有的 IO 操作都在同一个 NIO 线程上面完成。作为 NIO 服务端接收客户端的 TCP 连接,作为 NIO 客户端向服务端发起 TCP 连接,读取通信对端的请求或向通信对端发送消息请求或者应答消息。由于 Reactor 模式使用的是异步非阻塞 IO,所有的 IO 操作都不会导致阻塞,理论上一个线程可以独立处理所有 IO 相关的操作。

//单线程模型简单代码示例
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class)
  • Reactor 多线程模型:对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适,需要对该模型进行改进,演进为 Reactor 多线程模型。Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程处理 IO 操作;有专门一个 NIO 线程 -Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求;而 1 个 NIO 线程可以同时处理N条链路,但是 1 个链路只对应 1 个 NIO 线程,防止发生并发操作问题。网络 IO 操作-读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。

//多线程模型简单代码示例,bossGroup中只有一个线程,而workerGroup中的线程是CPU核心数乘以2,那么就对应Reactor的多线程模型。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
  • 主从 Reactor 多线程模型:在并发极高的情况单独一个 Acceptor 线程可能会存在性能不足问题,为了解决性能问题,产生主从 Reactor 多线程模型。

    • 主从 Reactor 线程模型的特点是:服务端用于接收客户端连接的不再是 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。
    • Acceptor 接收到客户端 TCP 连接请求处理完成后,将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责 SocketChannel 的读写和编解码工作。
    • Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO 操作。

//主从Reactor多线程模型简单代码示例
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
  • 当系统在运行过程中,如果频繁的进行线程上下文切换,会带来额外的性能损耗。多线程并发执行某个业务流程,业务开发者还需要时刻对线程安全保持警惕,哪些数据可能会被并发修改,怎么保护?这不仅降低了开发效率,也会带来额外的性能损耗。

  • 因此Netty采用了串行化设计理念,从消息的读取、编码以及后续 ChannelHandler 的执行,始终都由 IO 线程 EventLoop 负责,这就意外着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险。

  • EventLoopGroup 是一组 EventLoop 的抽象,一个 EventLoopGroup 当中会包含一个或多个 EventLoop,EventLoopGroup 提供 next 接口,可以从一组 EventLoop 里面按照一定规则获取其中一个 EventLoop 来处理任务。

  • EventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop ;一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;所有 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;一个 Channel 在它的生命周期内只注册于一个 EventLoop;每一个 EventLoop 负责处理一个或多个 Channel;

  • 在 Netty 服务器端编程中我们需要 Boss EventLoopGroup 和 Worker EventLoopGroup 这两个 EventLoopGroup 来进行工作。

  • BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了 ServerSocketChannel 的 Selector 实例,EventLoop 的实现涵盖 IO 事件的分离和分发(Dispatcher),EventLoop 的实现充当 Reactor 模式中的分发(Dispatcher)的角色;所以通常可以将 BossEventLoopGroup 的线程数参数为 1。

  • BossEventLoop 只负责处理连接,故开销非常小,连接到来,马上按照策略将 SocketChannel 转发给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop 来将这 个SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。

  • 粘包和半包,指的都不是一次是正常的 ByteBuf 缓存区接收。

    • 粘包,就是接收端读取的时候,多个发送过来的 ByteBuf “粘”在了一起。换句话说,接收端读取一次的 ByteBuf ,读到了多个发送端的 ByteBuf ,是为粘包。
    • 半包,就是接收端将一个发送端的ByteBuf “拆”开了,形成一个破碎的包,我们定义这种 ByteBuf 为半包。换句话说,接收端读取一次的 ByteBuf ,读到了发送端的一个 ByteBuf的一部分,是为半包。
    • 比如我们应用层面使用了Netty,而对于操作系统来说只认TCP协议,尽管我们的应用层是按照 ByteBuf 为 单位来发送数据,server按照Bytebuf读取,但是到了底层操作系统仍然是按照字节流发送数据,因此,数据到了服务端,也是按照字节流的方式读入,然后到了 Netty 应用层面,重新拼装成 ByteBuf,而这里的 ByteBuf 与客户端按顺序发送的 ByteBuf 可能是不对等的。因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。
    • 拆包和粘包是相对的,一端粘了包,另外一端就需要将粘过的包拆开,发送端将三个数据包粘成两个 TCP 数据包发送到接收端,接收端就需要根据应用协议将两个数据包重新组装成三个数据包。
    • 在没有 Netty 的情况下,用户如果自己需要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包,如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
  • Netty中的拆包器

    • 固定长度的拆包器 FixedLengthFrameDecoder:每个应用层数据包的都拆分成都是固定长度的大小,比如 1024字节。这个显然不大适应在 Java 聊天程序 进行实际应用。
    • 行拆包器 LineBasedFrameDecoder:每个应用层数据包,都以换行符作为分隔符,进行分割拆分。这个显然不大适应在 Java 聊天程序 进行实际应用。
    • 分隔符拆包器 DelimiterBasedFrameDecoder:每个应用层数据包,都通过自定义的分隔符,进行分割拆分。这个版本,是LineBasedFrameDecoder 的通用版本,本质上是一样的。这个显然不大适应在 Java 聊天程序 进行实际应用。
    • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder:将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度。
  • IO模型、协议、线程模型(事件驱动、异步非阻塞、NIO多路复用非阻塞)都与性能强相关;IO通信性能三原则:传输(AIO)、协议(Http)、线程(主从Reactor线程模型)。

  • 无锁串行化的设计理念:即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。

  • 零拷贝:jdk bytebuffer:无法动态扩容,api使用复杂,读写切换时要手动调用flip和rewind等方法,capacity、readerindex、writerindex,支持顺序读写操作。

  • 内存池管理,PoolByteBuf 是Netty内存池管理,比普通的new ByteBuf性能提高了数十倍。

  • 高效并发编程:Volatile的大量、正确使用;CAS和原子类的广泛使用;线程安全容器的使用;通过读写锁提升并发性能。

  • 对高性能序列化框架的支持,如protobuf。

  • 灵活TCP参数调优。

  • 传统意义的拷贝

    • 是在发送数据的时候,传统的实现方式是:

      • File.read(bytes)
      • Socket.send(bytes)
    • 这种方式需要四次数据拷贝和四次上下文切换:

      • 数据从磁盘读取到内核的read buffer。
      • 数据从内核缓冲区拷贝到用户缓冲区。
      • 数据从用户缓冲区拷贝到内核的socket buffer。
      • 数据从内核的socket buffer拷贝到网卡接口(硬件)的缓冲区。
  • 零拷贝概念

    • 明显上面的第二步和第三步是没有必要的,通过java的FileChannel.transferTo方法,可以避免上面两次多余的拷贝(当然这需要底层操作系统支持),下面的两次操作都不需要CPU参与,所以就达到了零拷贝。

      • 调用transferTo,数据从文件由DMA引擎拷贝到内核read buffer。
      • 接着DMA从内核read buffer将数据拷贝到网卡接口buffer。
  • Netty中的零拷贝主要体现在三个方面:

    • bytebuffer:Netty发送和接收消息主要使用bytebuffer,bytebuffer使用堆外内存(DirectMemory)直接进行Socket读写。原因:如果使用传统的堆内存进行Socket读写,JVM会将堆内存buffer拷贝一份到直接内存中然后再写入socket,多了一次缓冲区的内存拷贝。DirectMemory中可以直接通过DMA发送到网卡接口。堆外内存也即是DirectMemory,直接内存申请较慢但访问较快,一般操作堆内内存-》直接内存-》系统调用-》硬盘/网卡,而非直接内存需要从堆内-》直接内存的二次拷贝。

    • Composite Buffers:传统的ByteBuffer,如果需要将两个ByteBuffer中的数据组合到一起,我们需要首先创建一个size=size1+size2大小的新的数组,然后将两个数组中的数据拷贝到新的数组中。但是使用Netty提供的组合ByteBuf,就可以避免这样的操作,因为CompositeByteBuf并没有真正将多个Buffer组合起来,而是保存了它们的引用,从而避免了数据的拷贝,实现了零拷贝。

    • 对于FileChannel.transferTo的使用:Netty中使用了FileChannel的transferTo方法,该方法依赖于操作系统实现零拷贝。

    • 不改变原来buf只是做逻辑拆分、合并、包装,减少大量内存复制,并由此提升性能,使用 Netty 提供的 CompositeByteBuf 类, 可以将多个ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝。ByteBuf 支持 slice 操作, 因此可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf, 避免了内存的拷贝。通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题。简单示例用法:

      package com.itxs.main;

      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.CompositeByteBuf;
      import io.netty.buffer.Unpooled;

      public class AppMain {
      public static void main(String[] args) {
      byte[] arr = {1,2,3,4,5};
      //wrappedBuffer方法:将byte数组包装ByteBuf对象
      ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
      System.out.println(byteBuf.getByte(3));
      arr[3] = 6;
      System.out.println(byteBuf.getByte(3));

          ByteBuf byteBuf1 = Unpooled.wrappedBuffer("hello-netty".getBytes());
          //slice方法:将一个ByteBuf对象切分为多个ByteBuf对象
          ByteBuf sliceByteBuf = byteBuf1.slice(1, 2);
          sliceByteBuf.unwrap();
          System.out.println(sliceByteBuf.toString());
      ByteBuf buffer1 = Unpooled.buffer(3);
      buffer1.writeByte(1);
      ByteBuf buffer2 = Unpooled.buffer(3);
      buffer2.writeByte(4);
      CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
      //CompositeByteBuf:将多个ByteBuf合并为一个逻辑上ByteBuf,避免各个ByteBuf之前的拷贝
      CompositeByteBuf compositeByteBufNew = compositeByteBuf.addComponents(true, buffer1, buffer2);
      System.out.println(compositeByteBufNew);
      }

      }

  • 服务端流程

    • 1、创建ServerBootStrap实例。
    • 2、设置并绑定Reactor线程池:EventLoopGroup,EventLoop就是处理所有注册到本线程的Selector上面的Channel。
    • 3、设置并绑定服务端的channel。
    • 4、5、创建处理网络事件的ChannelPipeline和handler,网络时间以流的形式在其中流转,handler完成多数的功能定制:比如编解码 SSl安全认证.
    • 6、绑定并启动监听端口。
    • 7、当轮训到准备就绪的channel后,由Reactor线程:NioEventLoop执行pipline中的方法,最终调度并执行channelHandler客户端

  • 客户端流程

  • 一个网络请求步骤:
  1. 准备消息。
  2. 编码,比如有一个字符串的消息要发送出去,那么发送出去之前要把这个字符串消息转为字节(Byte),这是因为网络上传输的不能是原本的字符串;又比如要给发送出去的消息加一个消息标识,用来以后另一端收消息的程序可以用来解决粘包拆包问题。
  3. 将消息发送到网络通道,write方法。
  4. 网络传输。
  5. 程序另一端读取数据,read方法。
  6. 解码,和编码相对应,比如发送过来的消息,是字符串转为字节,那么解码要做的事就是把字节转为字符串;又或者数据还加了标识,就要根据这个标识去读取数据,解决粘包拆包问题。
  7. 处理业务逻辑,比如是做群聊场景,那么当程序收到消息之后,要将这条转发给对应群聊中的每一个人。
  8. 准备数据响应给消息发送者。
  • Channel

    • 一种连接到网络套接字或能进行读、写、连接和绑定等I/O操作的组件。
    • Channel 接口是 Netty 对网络操作抽象类,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。
    • 比较常用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
    • 此外OioSocketChannel为同步阻塞的客户端 TCP Socket 连接;OioServerSocketChannel为同步阻塞的服务器端 TCP Socket 连接。
  • EventLoop

    • EventLoop(事件循环)接口可以说是 Netty 中最核心的概念。EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
    • EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
  • Channel和EventLoop关系

    • Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 处理 I/O 操作,两者配合参与 I/O 操作。
  • ChannelFuture

    • ChannelFuture提供操作完成时一种异步通知的方式。一般在Socket编程中,等待响应结果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取类似观察者模式的形式进行获取结果
    • Netty 是异步非阻塞的,所有的 I/O 操作都为异步的;不能立刻得到操作是否执行成功,但是可以通过 ChannelFuture 接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。并且还可以通过ChannelFuture 的 channel() 方法获取关联的Channel。还可以通过 ChannelFuture 接口的 sync()方法让异步的操作变成同步的。
  • ChannelHandler和ChannelPipeline

    • 指定了序列化编解码器以及自定义的 ChannelHandler 处理消息。
    • ChannelHandler 是消息的具体处理器。他负责处理读写操作、客户端连接等事情。
    • ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。
    • 可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler ,因为一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler 。
    • Netty设计ChannelHandlerContext上下文对象,就可以拿到channel、pipeline等对象,就可以进行读写等操作。ChannelHandlerContext在pipeline中是一个链表的形式。
  • EventloopGroup与EventLoop是什么关系

    • EventLoopGroup 包含多个 EventLoop(每一个 EventLoop 通常内部包含一个线程), EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理 ;且 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理,即 Thread 和 EventLoop 属于 1 : 1 的关系,从而保证线程安全。

    • Boss EventloopGroup 用于接收连接,Worker EventloopGroup 用于具体的处理(消息的读写以及其他逻辑处理) ; 当客户端通过 connect 方法连接服务端时,BossGroup 处理客户端连接请求。当客户端处理完成后,会将这个连接提交给 WorkerGroup 来处理,然后 workerGroup 负责处理其 IO 相关操作。

  • **Bootstrap 和 ServerBootstrap **怎么理解?

    • Bootstrap 是客户端的启动引导类/辅助类。ServerBootstrap 客户端的启动引导类/辅助类。Bootstrap和ServerBootStrap是Netty提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类,根据上面的一些例子,其实也看得出来能大大地减少了开发的难度。都是继承于AbstractBootStrap抽象类,所以大致上的配置方法都相同。

    • Bootstrap 通常使用 connet() 方法连接到远程的主机和端口,作为一个 Netty TCP 协议通信中的客户端。另外,Bootstrap 也可以通过 bind() 方法绑定本地的一个端口,作为 UDP 协议通信中的一端;ServerBootstrap通常使用 bind() 方法绑定本地的端口上,然后等待客户端的连接。
    • Bootstrap 只需要配置一个线程组— EventLoopGroup ,而 ServerBootstrap需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的处理。
  • Selector

    • Netty中的Selector也和NIO的Selector是一样的,就是用于监听事件,管理注册到Selector中的channel,实现多路复用器。

    Netty内部结构Selector位置如下图所示

  • 自定义序列化编解码器

    • 在 Java 中自带的有实现 Serializable 接口来实现序列化,但由于它性能、安全性等原因一般情况下是不会被使用到的。
    • 通常情况下,我们使用 Protostuff、Hessian2、json 序列方式比较多,另外还有一些序列化性能非常好的序列化方式也是很好的选择:
    • 专门针对 Java 语言的:Kryo,FST 等等跨语言的:Protostuff(基于 protobuf 发展而来),ProtoBuf,Thrift,Avro,MsgPack 等等

客户端启动类Bootstrap的connect过程有一些需要先配置比如核心的有netty的工作线程池EventLoopGroup和Netty Channel实现类、还有Channel内部Pipeline依赖处理器单元,配置完客户启动后先将Netty Channel的实现类进行反射创建出来一个Netty的Channel对象,初始化Channel内部Pipeline,将处理器单元也即是handler装载到内部的Channel管道里,后续IO操作可以用到,还需要一个线程池NioEventLoopGroup才能工作,将Channel注册EventLoop。使用Bootstrap创建启动器的步骤可分为以下几步:

NioEventLoop不单是一个线程,里面线程处理IO事件,当然也可以处理普通任务,因此有任务队列和NIO 核心类Selector,它调用操作系统函数完成一对多的监听,NioEventLoop类内部持有这个Selector对象来监听socket的,Channel注册到EventLoop其实底层就是将Netty Channel对象内部维护注册到的Jdk Nio Channel Selector对象。

  • 创建后就会去执行自身的run方法,这里面是个死循环。EventLoop中维护一个Selector实例

  • 需要计算出一个IO选择策略,比如是使用Selector.select(阻塞到有socket就绪) select now(非阻塞),选择哪种策略主要看NioEventLoop任务队列内是否有本地任务执行,如果有调阻塞就不太优雅,因为这样会延迟非IO任务的执行。

  • 接下来就要处理selectkeys集合他表示本次Selector刷选出来就绪状态的Channel,迭代key集合,从每个key拿到关联channel,再看是读就绪还是写就绪,比如读就绪,将socket缓冲区load到一个Bytebuf中,然后调用当前Channel的处理管道Pipeline传播读事件接口方法,也就是Pipeline的fireChannelRead方法,就这样从socket收到数据放到这个Channel的Pipeline中,在Pipeline中依次调用Inbound接口的Handler处理器进行读就绪事件处理,本次处理完IO事件后,NioEventLoop接下来会处理内部任务。

  • 管道意思,可以安插任意处理单元,依次传递,in事件处理、out事件处理,加密、转换为json字符串、编码器,先写channel,最后会刷到channel关联的socket写缓存区,提供一个可插拔灵活的处理框架。

  • netty高可扩展性也正是来源pipeline责任链的设计模式。协议解码器:将二进制数据转为java对象;协议编码器:将java对象转为二进制数据;业务逻辑处理程序-执行实际的业务逻辑。

  • 处理器Handler主要分为两种:ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器),入站指的是数据从底层java NIO Channel到Netty的Channel。出站指的是通过Netty的Channel来操作底层的java NIO Channel。

  • 在channel中装配ChannelHandler流水线处理器,一个channel可能需要多个channelHandler处理器和顺序的。pipeline相当于处理器的容器。初始化channel时,把channelHandler按顺序装在pipeline中,就可以实现按序执行channelHandler了。在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创建的时候创建。ChannelPipeline包含了一个ChannelHander形成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。

  • ChannelInboundHandlerAdapter处理器常用的事件有

  • ChannelOutboundHandler处理器常用的事件有

每个处理单元的Handler对象都有一个int类型的mask字段,表示当前handler对上层Inbound或Outbound指定这个接口方法有没有复写,有为1无为0,主要可以避免一些空调用,可以找到后面第一个实现事件Handler,可以跳过没有实现响应事件的Handler。

主要是Promise接口,他是Future接口的一个增强,submit返回future句柄, get获取任务结果阻塞线程,原因内部没有线程资源,Netty Promise实现类ChannelPromise内部有一个Channel对象,注册到NioEventLoop,同事也就持有NioEventLoop,内部有线程资源,Channel Promise有线程资源了,所以Channel Promise可以注册一些Listener对象,等任务完成后可以处理后续事情。

  • NioEventLoopGroup 默认的构造函数实际会起的线程数为 CPU核心数*2

  • NioEventLoopGroup,NioEventLoop池和单个线程的关系,group线程池有next方法返回EventLoop线程对象,内部线程Thread是延迟创建的,虽然先创建出来,但是NioEventLoop内部线程对象并不会创建,等接到第一个任务才创建,NioEventLoop不像是单线程,一个线程的线程池(single exector方法),EventLoop内部有队列,可以做Selector工作也做普通任务,NioEventLoop内部队列也做了优化,原来采用LinkedBlockingQueue+Condition条件队列,Netty没有使用JDK JUC的Queue而是使用JcTools的Queue,单多生产者但多消费者场景,背后也应该是CAS实现。

  • 每个NioEventLoopGroup对象内部都会分配一组NioEventLoop,其大小是 nThreads, 这样就构成了一个线程池, 一个NIOEventLoop 和一个线程相对应,这和我们上面说的 EventloopGroup 和 EventLoop关系这部分内容相对应。

  • 在 TCP 保持长连接的过程中,可能会出现断网等网络异常出现,异常发生的时候, client 与 server 之间如果没有交互的话,它们是无法发现对方已经掉线的。为了解决这个问题, 我们就需要引入心跳机制。

  • 心跳机制的工作原理是: 在 client 与 server 之间在一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器就会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互。所以, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.

  • TCP 实际上自带的就有长连接选项,本身是也有心跳包机制,也就是 TCP 的选项:SO_KEEPALIVE。但是,TCP 协议层面的长连接灵活性不够。所以,一般情况下我们都是在应用层协议上实现自定义心跳机制的,也就是在 Netty 层面通过编码实现。通过 Netty 实现心跳机制的话,核心类是 IdleStateHandler 。

如果Handler处理器有一些长时间的业务处理,可以交给taskQueue异步处理,代码使用示例如下:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        ctx.channel().eventLoop().execute(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("耗时业务处理");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

通过debug调试就可以看到taskQueue有一个刚刚添加进去的任务

延时任务队列和上面任务队列非常相似,只是多了一个可延迟一定时间再执行的设置,代码使用示例如下:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    //长时间操作,不至于长时间的业务操作导致Handler阻塞
                    Thread.sleep(1000);
                    System.out.println("长时间的业务处理");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },10, TimeUnit.SECONDS);//10秒后执行
    }

同样在debug进行调试可以查看到有一个scheduleTaskQueue任务待执行中