netty服务端客户端启动流程分析
阅读原文时间:2023年07月09日阅读:4

服务端启动流程

我们回顾前面讲解的netty启动流程,服务端这边有两个EventLoopGroup,一个专门用来处理连接,一个用来处理后续的io事件

服务端启动还是跟nio一样,绑定端口进行监听,我们先来看绑定流程

// 绑定端口并同步阻塞直到绑定结束
ChannelFuture cf = serverBootstrap.bind(8080).sync();

private ChannelFuture doBind(final SocketAddress localAddress) {
// 注册一个NioServerScoketChannel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {  
    // 注册完成 进行端口绑定,并返回一个异步绑定结果  
    ChannelPromise promise = channel.newPromise();  
    // 绑定端口  
    doBind0(regFuture, channel, localAddress, promise);  
    return promise;  
} else {  
    // 未注册完成 返回带注册结果的promise  
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);  
    // 添加一个注册异步监听器 但注册完成会进行绑定操作  
    regFuture.addListener(new ChannelFutureListener() {  
        @Override  
        public void operationComplete(ChannelFuture future) throws Exception {  
            Throwable cause = future.cause();  
            if (cause != null) {  
                // 注册时有发生异常 通知相关监听器  
                promise.setFailure(cause);  
            } else {  
                // 设置注册结果为true  
                promise.registered();  
                // 绑定端口  
                doBind0(regFuture, channel, localAddress, promise);  
            }  
        }  
    });  
    return promise;  
}  

}

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
* 通过我们之前设置的socket类型进行实例化
* serverBootstrap.channel(NioServerSocketChannel.class) 会返回一个ReflectiveChannelFactory工厂
**/
channel = channelFactory.newChannel(); // 1
// 初始化channel
init(channel); // 2
} catch (Throwable t) {
if (channel != null) {
// 发生异常 将内部nio中的channel进行关闭
channel.unsafe().closeForcibly();
// 返回一个结果失败的Promise
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

// 将channel注册到bossEventLoopGroup上,里面会将channel注册到EventLoop中的selector  
ChannelFuture regFuture = config().group().register(channel); // 3  
if (regFuture.cause() != null) {  
    if (channel.isRegistered()) {  
        channel.close();  
    } else {  
        channel.unsafe().closeForcibly();  
    }  
}  
return regFuture;  

}

private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// 添加一个异步绑定任务  
channel.eventLoop().execute(new Runnable() {  
    @Override  
    public void run() {  
        if (regFuture.isSuccess()) {  

          // 绑定!!!
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // 4
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

对上述流程进行一个简单的说明: 先创建一个NioServerScoketChannel,然后进行初始化操作,然后注册到bossEventLoop中selector上,nio需要做的流程 netty都要做,然后进行绑定 返回一个绑定异步promise

绑定流程有4个比较重要的操作 我们来一一详解

1、实例化NioServerScoketChannel,内部会创建一个ServerSocketChannel 然后持有,同时创建unsafe和pipeline

/***********************_NioServerSocketChannel_**********************/
// selector提供者,提供创建selector、ServerSocketChannel、SocketChannel
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 返回一个nio中的ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

public NioServerSocketChannel(ServerSocketChannel channel) {
// 调用父类构造
super(null, channel, SelectionKey.OP_ACCEPT);
// NioServerSocketChannel配置类
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

/***********************_AbstractNioChannel_***********************/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 将ServerSocketChannel设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

    throw new ChannelException("Failed to enter non-blocking mode.", e);  
}  

}

/***********************_AbstractChannel*************************/ protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); // 创建一个unsafe 实例为NioMessageUnsafe类型 unsafe = newUnsafe(); // 创建pipeline,类型为DefaultChannelPipeline pipeline = newChannelPipeline(); } /***********************DefaultChannelPipeline_*************************/
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);  
head = new HeadContext(this);

head.next = tail;  
tail.prev = head;  

}

2、初始化NioServerScoketChannel

/***********************_ServerBootstrap_*************************/
void init(Channel channel) throws Exception {
final Map, Object> options = options0();
// 设置我们之前设置的options serverBootstrap.option(ChannelOption option, T value)
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 设置channel自定义属性
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey key = (AttributeKey) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;  
final ChannelHandler currentChildHandler = childHandler;  
final Entry<ChannelOption<?>, Object>\[\] currentChildOptions;  
final Entry<AttributeKey<?>, Object>\[\] currentChildAttrs;  
synchronized (childOptions) {  
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));  
}  
synchronized (childAttrs) {  
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));  
}

// 添加一个ChannelInitializer,用来注册后续的NioSocketChannel  
p.addLast(new ChannelInitializer<Channel>() {  
    @Override  
    public void initChannel(final Channel ch) throws Exception {  
        final ChannelPipeline pipeline = ch.pipeline();  
        ChannelHandler handler = config.handler();  
        if (handler != null) {  
            pipeline.addLast(handler);  
        }

        ch.eventLoop().execute(new Runnable() {  
            @Override  
            public void run() {  
                // 这个handler就是后续监听客户端连接事件后,会将创建的NioSocketChannel 在这里进行注册  
                pipeline.addLast(new ServerBootstrapAcceptor(  
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  
            }  
        });  
    }  
});  

}

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry, Object>[] childOptions, Entry, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;

    enableAutoReadTask = new Runnable() {  
        @Override  
        public void run() {  
            channel.config().setAutoRead(true);  
        }  
    };  
}

@Override  
@SuppressWarnings("unchecked")  
public void channelRead(ChannelHandlerContext ctx, Object msg) {  
    // NioServerScoketChannel在监听accpet后会创建一个NioSocketChannel 然后调用handler将该channel传递进来,handler那边只是简单的创建 并没有完成注册  
    final Channel child = (Channel) msg;  
    // 在serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()添加的handler 加入到NioSocketChannel  
    child.pipeline().addLast(childHandler);  
    // 设置NioSocketChannel的Options  
    setChannelOptions(child, childOptions, logger);  
    // 设置NioSocketChannel的attr  
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
    }

    try {  
        // 将NioSocketChannel注册到workerEventLoopGroup上 然后添加一个监听器  
        childGroup.register(child).addListener(new ChannelFutureListener() {  
            @Override  
            public void operationComplete(ChannelFuture future) throws Exception {  
                if (!future.isSuccess()) {  
                    forceClose(child, future.cause());  
                }  
            }  
        });  
    } catch (Throwable t) {  
        forceClose(child, t);  
    }  
}

private static void forceClose(Channel child, Throwable t) {  
    child.unsafe().closeForcibly();  
    logger.warn("Failed to register an accepted channel: {}", child, t);  
}

@Override  
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
    final ChannelConfig config = ctx.channel().config();  
    if (config.isAutoRead()) {  
        // 将autoread设置为false,一秒后重新设置为true  
        config.setAutoRead(false);  
        ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);  
    }  
    ctx.fireExceptionCaught(cause);  
}  

}

上图大致流程:将之前在ServerBootstrap设置的属性在这里相应设置,然后给NioServerScoketChannel添加一个处理器,这个处理器用来接收accept请求时将后续的NioSocketChannel注册到对应的workerEventLoop中,后续的io事件就交给NioSocketChannel来完成

3、channel注册到EventLoop,其实就是将channel注册到EventLoop中的selector上

/************************MultithreadEventLoopGroup*********************/
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

/***********************MultithreadEventExecutorGroup**********************/
public EventExecutor next() {
// 选择一个EventLoop
return chooser.next();
}

/***********************SingleThreadEventLoop**********************/
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
// 拿到channel对应的Unsafe进行注册
promise.channel().unsafe().register(this, promise);
return promise;
}

/***********************AbstractUnsafe**********************/
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {  
    register0(promise);  
} else {  
    try {  
        eventLoop.execute(new Runnable() {  
            @Override  
            public void run() {  
                register0(promise);  
            }  
        });  
    } catch (Throwable t) {  
        closeForcibly();  
        closeFuture.setClosed();  
        safeSetFailure(promise, t);  
    }  
}  

}

private void register0(ChannelPromise promise) {
try {
// 判断当前注册promise是否被取消或者channel通道是否还处于打开状态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 进行nio注册
doRegister();
neverRegistered = false;
registered = true;

    // 注册好后调用我们之前设置的channelInitializer.initChannel(channel) 方法  
    pipeline.invokeHandlerAddedIfNeeded();  
    // 设置状态成功  
    safeSetSuccess(promise);  
    // 注册成功事件传播  
    pipeline.fireChannelRegistered();  
    // 完成注册正好是active状态  
    if (isActive()) {  
        if (firstRegistration) {  
            pipeline.fireChannelActive();  
        } else if (config().isAutoRead()) {  
            beginRead();  
        }  
    }  
} catch (Throwable t) {  
    closeForcibly();  
    closeFuture.setClosed();  
    safeSetFailure(promise, t);  
}  

}

/***********************AbstractNioUnsafe**********************/
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将channel注册到EventLoop上的selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

4、绑定端口,通过pipeline.bind发起绑定,绑定端口是出站事件,由tail像前进行传递,直到执行到head的bind()方法,然后通过其中的unsafe调用NioServerScoketChannel的doBind()进行绑定

/************************tailHandler*********************/
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

/************************headHandler*********************/
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}

/************************AbstractUnsafe*********************/
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {  
    return;  
}

boolean wasActive = isActive();  
try {  
    doBind(localAddress);  
} catch (Throwable t) {  
    safeSetFailure(promise, t);  
    closeIfClosed();  
    return;  
}

if (!wasActive && isActive()) {  
    invokeLater(new Runnable() {  
        @Override  
        public void run() {  
            pipeline.fireChannelActive();  
        }  
    });  
}

safeSetSuccess(promise);  

}

/************************NioServerSocketChannel*********************/
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

服务端的启动流程就讲的差不多了,对上述流程大致进行一个梳理,进行端口绑定时首先对创建一个NioServerScoketChannel 用来处理accpet,然后注册到BossEventLoop上,通过给NioServerScoketChannel添加一个处理器 将来发生accpet时,将生成的NioSocketChannel注册到WorkerEventLoop上,后续的io事件就在该NioSocketChannel完成

客户端启动流程

客户端这边启动流程和服务端大致类似,连接ip端口时,创建一个用来和服务端通信的NioSocketChannel(内部包裹着SocketChannel),然后注册到对应的EventLoop上的selector开启事件监听

ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
cf.channel().closeFuture().sync();

/************************Bootstrap*********************/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 创建一个NioSocketChannel然后进行注册,流程跟NioServerScoketChannel差不多,返回一个注册future
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

if (regFuture.isDone()) {  
    if (!regFuture.isSuccess()) {  
        return regFuture;  
    }  
    return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());  
} else {  
    // 还未注册完成 添加一个监听器 用作将来注册成功后进行连接操作  
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);  
    regFuture.addListener(new ChannelFutureListener() {  
        @Override  
        public void operationComplete(ChannelFuture future) throws Exception {  
            Throwable cause = future.cause();  
            if (cause != null) {  
                promise.setFailure(cause);  
            } else {  
                promise.registered();  
                doResolveAndConnect0(channel, remoteAddress, localAddress, promise);  
            }  
        }  
    });  
    return promise;  
}  

}

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver resolver = this.resolver.getResolver(eventLoop);

    //解析器无法解析ip地址或者已经被解析  
    if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {  
        doConnect(remoteAddress, localAddress, promise);  
        return promise;  
    }

    final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

    if (resolveFuture.isDone()) {  
        // 解析完成 进行连接  
        final Throwable resolveFailureCause = resolveFuture.cause();

        if (resolveFailureCause != null) {  
            // Failed to resolve immediately  
            channel.close();  
            promise.setFailure(resolveFailureCause);  
        } else {  
            // Succeeded to resolve immediately; cached? (or did a blocking lookup)  
            doConnect(resolveFuture.getNow(), localAddress, promise);  
        }  
        return promise;  
    }

    // 添加一个监听器用来将来地址解析成功然后进行连接  
    resolveFuture.addListener(new FutureListener<SocketAddress>() {  
        @Override  
        public void operationComplete(Future<SocketAddress> future) throws Exception {  
            if (future.cause() != null) {  
                channel.close();  
                promise.setFailure(future.cause());  
            } else {  
                doConnect(future.getNow(), localAddress, promise);  
            }  
        }  
    });  
} catch (Throwable cause) {  
    promise.tryFailure(cause);  
}  
return promise;  

}

private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

final Channel channel = connectPromise.channel();  
// 提交一个连接任务到EventLoop上  
channel.eventLoop().execute(new Runnable() {  
    @Override  
    public void run() {  
        if (localAddress == null) {  
            channel.connect(remoteAddress, connectPromise);  
        } else {  
            channel.connect(remoteAddress, localAddress, connectPromise);  
        }  
        // 添加一个关闭或失败的监听器,会将channel进行关闭  
        connectPromise.addListener(ChannelFutureListener.CLOSE\_ON\_FAILURE);  
    }  
});  

}

// 设置channel的options和attrs
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());

final Map<ChannelOption<?>, Object> options = options0();  
synchronized (options) {  
    setChannelOptions(channel, options, logger);  
}

final Map<AttributeKey<?>, Object> attrs = attrs0();  
synchronized (attrs) {  
    for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {  
        channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
    }  
}  

}

至此我们服务端和客户端的启动流程都分析完了,我们前面说过EventLoop可以当做一个线程来执行, channel.eventLoop().execute(new Runnable(),后续的io事件监听和自定义任务处理都在EventLoop内执行,我们下节就来剖析其内部实现