我们回顾前面讲解的netty启动流程,服务端这边有两个EventLoopGroup,一个专门用来处理连接,一个用来处理后续的io事件
服务端启动还是跟nio一样,绑定端口进行监听,我们先来看绑定流程
// 绑定端口并同步阻塞直到绑定结束
ChannelFuture cf = serverBootstrap.bind(8080).sync();
private ChannelFuture doBind(final SocketAddress localAddress) {
// 注册一个NioServerScoketChannel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 注册完成 进行端口绑定,并返回一个异步绑定结果
ChannelPromise promise = channel.newPromise();
// 绑定端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 未注册完成 返回带注册结果的promise
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加一个注册异步监听器 但注册完成会进行绑定操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 注册时有发生异常 通知相关监听器
promise.setFailure(cause);
} else {
// 设置注册结果为true
promise.registered();
// 绑定端口
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
* 通过我们之前设置的socket类型进行实例化
* serverBootstrap.channel(NioServerSocketChannel.class) 会返回一个ReflectiveChannelFactory工厂
**/
channel = channelFactory.newChannel(); // 1
// 初始化channel
init(channel); // 2
} catch (Throwable t) {
if (channel != null) {
// 发生异常 将内部nio中的channel进行关闭
channel.unsafe().closeForcibly();
// 返回一个结果失败的Promise
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 将channel注册到bossEventLoopGroup上,里面会将channel注册到EventLoop中的selector
ChannelFuture regFuture = config().group().register(channel); // 3
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 添加一个异步绑定任务
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 绑定!!!
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // 4
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
对上述流程进行一个简单的说明: 先创建一个NioServerScoketChannel,然后进行初始化操作,然后注册到bossEventLoop中selector上,nio需要做的流程 netty都要做,然后进行绑定 返回一个绑定异步promise
绑定流程有4个比较重要的操作 我们来一一详解
/***********************_NioServerSocketChannel_**********************/
// selector提供者,提供创建selector、ServerSocketChannel、SocketChannel
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 返回一个nio中的ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 调用父类构造
super(null, channel, SelectionKey.OP_ACCEPT);
// NioServerSocketChannel配置类
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
/***********************_AbstractNioChannel_***********************/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 将ServerSocketChannel设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
/***********************_AbstractChannel*************************/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
// 创建一个unsafe 实例为NioMessageUnsafe类型
unsafe = newUnsafe();
// 创建pipeline,类型为DefaultChannelPipeline
pipeline = newChannelPipeline();
}
/***********************DefaultChannelPipeline_*************************/
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
/***********************_ServerBootstrap_*************************/
void init(Channel channel) throws Exception {
final Map
// 设置我们之前设置的options serverBootstrap.option(ChannelOption
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 设置channel自定义属性
final Map
synchronized (attrs) {
for (Entry
@SuppressWarnings("unchecked")
AttributeKey
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>\[\] currentChildOptions;
final Entry<AttributeKey<?>, Object>\[\] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 添加一个ChannelInitializer,用来注册后续的NioSocketChannel
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 这个handler就是后续监听客户端连接事件后,会将创建的NioSocketChannel 在这里进行注册
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// NioServerScoketChannel在监听accpet后会创建一个NioSocketChannel 然后调用handler将该channel传递进来,handler那边只是简单的创建 并没有完成注册
final Channel child = (Channel) msg;
// 在serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()添加的handler 加入到NioSocketChannel
child.pipeline().addLast(childHandler);
// 设置NioSocketChannel的Options
setChannelOptions(child, childOptions, logger);
// 设置NioSocketChannel的attr
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 将NioSocketChannel注册到workerEventLoopGroup上 然后添加一个监听器
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// 将autoread设置为false,一秒后重新设置为true
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
ctx.fireExceptionCaught(cause);
}
}
上图大致流程:将之前在ServerBootstrap设置的属性在这里相应设置,然后给NioServerScoketChannel添加一个处理器,这个处理器用来接收accept请求时将后续的NioSocketChannel注册到对应的workerEventLoop中,后续的io事件就交给NioSocketChannel来完成
/************************MultithreadEventLoopGroup*********************/
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
/***********************MultithreadEventExecutorGroup**********************/
public EventExecutor next() {
// 选择一个EventLoop
return chooser.next();
}
/***********************SingleThreadEventLoop**********************/
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
// 拿到channel对应的Unsafe进行注册
promise.channel().unsafe().register(this, promise);
return promise;
}
/***********************AbstractUnsafe**********************/
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// 判断当前注册promise是否被取消或者channel通道是否还处于打开状态
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 进行nio注册
doRegister();
neverRegistered = false;
registered = true;
// 注册好后调用我们之前设置的channelInitializer.initChannel(channel) 方法
pipeline.invokeHandlerAddedIfNeeded();
// 设置状态成功
safeSetSuccess(promise);
// 注册成功事件传播
pipeline.fireChannelRegistered();
// 完成注册正好是active状态
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
/***********************AbstractNioUnsafe**********************/
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将channel注册到EventLoop上的selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
/************************tailHandler*********************/
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
/************************headHandler*********************/
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
/************************AbstractUnsafe*********************/
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
/************************NioServerSocketChannel*********************/
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
服务端的启动流程就讲的差不多了,对上述流程大致进行一个梳理,进行端口绑定时首先对创建一个NioServerScoketChannel 用来处理accpet,然后注册到BossEventLoop上,通过给NioServerScoketChannel添加一个处理器 将来发生accpet时,将生成的NioSocketChannel注册到WorkerEventLoop上,后续的io事件就在该NioSocketChannel完成
客户端这边启动流程和服务端大致类似,连接ip端口时,创建一个用来和服务端通信的NioSocketChannel(内部包裹着SocketChannel),然后注册到对应的EventLoop上的selector开启事件监听
ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync();
cf.channel().closeFuture().sync();
/************************Bootstrap*********************/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 创建一个NioSocketChannel然后进行注册,流程跟NioServerScoketChannel差不多,返回一个注册future
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// 还未注册完成 添加一个监听器 用作将来注册成功后进行连接操作
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver
//解析器无法解析ip地址或者已经被解析
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
// 解析完成 进行连接
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// 添加一个监听器用来将来地址解析成功然后进行连接
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
// 提交一个连接任务到EventLoop上
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
// 添加一个关闭或失败的监听器,会将channel进行关闭
connectPromise.addListener(ChannelFutureListener.CLOSE\_ON\_FAILURE);
}
});
}
// 设置channel的options和attrs
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
至此我们服务端和客户端的启动流程都分析完了,我们前面说过EventLoop可以当做一个线程来执行, channel.eventLoop().execute(new Runnable(),后续的io事件监听和自定义任务处理都在EventLoop内执行,我们下节就来剖析其内部实现
手机扫一扫
移动阅读更方便
你可能感兴趣的文章