Netty学习笔记(一) - 简介和组件设计
阅读原文时间:2023年07月13日阅读:1

在互联网发达的今天,网络已经深入到生活的方方面面,一个高效、性能可靠的网络通信已经成为一个重要的诉求,在Java方面需要寻求一种高性能网络编程的实践。

一、简介

当前JDK(本文使用的JDK 1.8)中已经有网络编程相关的API,使用过程中或多或少会存在以下几个问题:

  • 阻塞:早期JDK里的API是用阻塞式的实现方式,在读写数据调用时数据还没有准备好,或者目前不可写,操作就会被阻塞直到数据准备好或目标可写为止。虽然可以采用每一个连接创建一个线程进行处理,但是可能会造成大量线程得不到释放,消耗资源。从JDK 1.4开始提供非阻塞的实现。
  • 处理和调度IO烦琐:偏底层的API实现暴露了更多的与业务无关的操作细节,使得在高负载下实现一个可靠和高效的逻辑就变得复杂和烦琐。

Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。它拥有简单而强大的设计模型,易于使用,拥有比Java API更高的性能等特点,它屏蔽了底层实现的细节,使开发人员更关注业务逻辑的实现。

二、组件和设计

  • Channel:屏蔽底层网络传输细节,提供简单易用的诸如bind、connect、read、write方法。
  • EventLoop:线程模型。处理连接生命周期过程中发生的事件,以及其他一些任务。
  • ChannelFuture:异步接口,用于注册Listener以便在某个操作完成时得到通知。
  • ChannelHandler:处理入站和出站数据的的一系列接口和抽象类,开发人员扩展这些类型来完成业务逻辑。
  • ChannelPipline:管理ChannelHandler的容器,将多个ChannelHandler以链式的方式管理,数据将在这个链上依次流动并被ChannelHandler逐个处理。
  • 引导(Bootstrap、ServerBootstrap):初始化客户端和服务端的入口类。

三、一个简单的Demo

创建一个maven工程,引入Netty。为了方便调试,Demo中引入了日志和junit5。



io.netty netty-all 4.1.50.Final

 <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->  
 <dependency>  
     <groupId>org.slf4j</groupId>  
     <artifactId>slf4j-api</artifactId>  
     <version>1.7.30</version>  
 </dependency>

 <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->  
 <dependency>  
     <groupId>ch.qos.logback</groupId>  
     <artifactId>logback-classic</artifactId>  
     <version>1.2.3</version>  
 </dependency>

 <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-core -->  
 <dependency>  
     <groupId>ch.qos.logback</groupId>  
     <artifactId>logback-core</artifactId>  
     <version>1.2.3</version>  
 </dependency>

 <dependency>  
     <groupId>org.junit.jupiter</groupId>  
     <artifactId>junit-jupiter</artifactId>  
     <version>5.5.2</version>  
     <scope>test</scope>  
 </dependency>  

创建Client和Server

package com.niklai.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class Client {
private static final Logger logger = LoggerFactory.getLogger(Client.class.getSimpleName());

 public static void init() {  
     try {  
         Bootstrap bootstrap = new Bootstrap();              // 初始化客户端引导  
         NioEventLoopGroup group = new NioEventLoopGroup();  
         bootstrap.group(group)                              // 指定适用于NIO的EventLoop  
                 .channel(NioSocketChannel.class)            // 适用于NIO的Channel  
                 .remoteAddress(new InetSocketAddress("localhost", 9999))    // 指定要绑定的IP和端口  
                 .handler(new ChannelInitializer<SocketChannel>() {  
                     protected void initChannel(SocketChannel socketChannel) throws Exception {  
                         socketChannel.pipeline().addLast(new ClientHandler());      // 添加ChannelHandler到ChannelPipline  
                     }  
                 });  
         ChannelFuture future = bootstrap.connect().sync();      // 阻塞直到连接到远程节点  
         future.channel().closeFuture().sync();                  // 阻塞直到关闭Channel  
         group.shutdownGracefully().sync();                      // 释放资源  
     } catch (InterruptedException e) {  
         logger.error(e.getMessage(), e);  
     }  
 }

 static class ClientHandler extends ChannelInboundHandlerAdapter {  
     @Override  
     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
         logger.info("channel active....");

         String msg = "Client message!";  
         logger.info("send message: {}....", msg);  
         ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF\_8));  
     }

     @Override  
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
         ByteBuf buf = (ByteBuf) msg;  
         logger.info("read message: {}....", buf.toString(CharsetUtil.UTF\_8));  
     }  
 }  

}

package com.niklai.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class Server {
private static final Logger logger = LoggerFactory.getLogger(Server.class.getSimpleName());

 public static void init() {  
     try {  
         ServerBootstrap serverBootstrap = new ServerBootstrap();        // 初始化客户端引导  
         NioEventLoopGroup group = new NioEventLoopGroup();  
         serverBootstrap.group(group)                                    // 指定适用于NIO的EventLoop  
                 .channel(NioServerSocketChannel.class)                  // 适用于NIO的Channel  
                 .localAddress(new InetSocketAddress("localhost", 9999))     // 指定要绑定的IP和端口  
                 .childHandler(new ChannelInitializer<SocketChannel>() {  
                     protected void initChannel(SocketChannel socketChannel) throws Exception {  
                         socketChannel.pipeline().addLast(new ServerHandler());      // 添加ChannelHandler到ChannelPipline  
                     }  
                 });

         ChannelFuture future = serverBootstrap.bind().sync();           // 异步绑定阻塞直到完成  
         future.channel().closeFuture().sync();                          // 阻塞直到关闭Channel  
         group.shutdownGracefully().sync();                              // 释放资源  
     } catch (InterruptedException e) {  
         logger.error(e.getMessage(), e);  
     }  
 }

 static class ServerHandler extends ChannelInboundHandlerAdapter {  
     @Override  
     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
         logger.info("channel active.....");  
     }

     @Override  
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
         ByteBuf buf = (ByteBuf) msg;  
         logger.info("read message: {}.....", buf.toString(CharsetUtil.UTF\_8));  
     }

     @Override  
     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
         logger.info("read complete.....");  
         ctx.writeAndFlush(Unpooled.copiedBuffer("receive message!", CharsetUtil.UTF\_8))  
                 .addListener(ChannelFutureListener.CLOSE);  
     }  
 }  

}

日志配置文件



%date %level [%thread] %class#%method [%file:%line] %msg%n

 <root level="info">  
     <appender-ref ref="consoleOut" />  
 </root>  

logback.xml

单元测试代码

package com.niklai.demo;

import org.junit.jupiter.api.Test;

public class NettyTest {

 @Test  
 public void test1() throws InterruptedException {  
     new Thread(() -> {  
         // 服务端  
         Server.init();  
     }).start();  
     Thread.sleep(1000);

     // 客户端  
     Client.init();

     Thread.sleep(5000);  
 }  

}

运行结果如下图

从控制台日志中可以看到当Client连接到Server后, ServerHandler 和 ClientHandler 的 channerActive 方法都会被调用, ClientHandler 会调用 ctx.writeAndFlush() 方法给Server发送一条消息, ServerHandler 的 channelRead 方法被调用读取到消息,消息读取完毕后 channelReadComplete 方法被调用,发送应答消息给Client, ClientHandler 的 channelRead 方法被调用获取到应答消息。到此一个完整的发送--应答流程就结束了。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器