netty 的事件驱动
阅读原文时间:2023年07月16日阅读:1

netty 是事件驱动的,这里面有两个含义,一是 netty 接收到 socket 数据后,会产生事件,事件在 pipeline 上传播,二是事件由特定的线程池处理。

NioEventLoop 轮询网络事件

// io.netty.channel.nio.NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}

try {  
    int readyOps = k.readyOps();  
    // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise  
    // the NIO JDK channel implementation may throw a NotYetConnectedException.  
    if ((readyOps & SelectionKey.OP\_CONNECT) != 0) {  
        // remove OP\_CONNECT as otherwise Selector.select(..) will always return without blocking  
        // See https://github.com/netty/netty/issues/924  
        int ops = k.interestOps();  
        ops &= ~SelectionKey.OP\_CONNECT;  
        k.interestOps(ops);

        // 建立连接,深层会调用 fireChannelActive  
        unsafe.finishConnect();  
    }

    // Process OP\_WRITE first as we may be able to write some queued buffers and so free memory.  
    if ((readyOps & SelectionKey.OP\_WRITE) != 0) {  
        // Call forceFlush which will also take care of clear the OP\_WRITE once there is nothing left to write  
        ch.unsafe().forceFlush();  
    }

    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead  
    // to a spin loop  
    if ((readyOps & (SelectionKey.OP\_READ | SelectionKey.OP\_ACCEPT)) != 0 || readyOps == 0) {  
        // 读数据,在流水线上传播读事件和连接关闭事件  
        unsafe.read();  
    }  
} catch (CancelledKeyException ignored) {  
    unsafe.close(unsafe.voidPromise());  
}  

}

// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;  
boolean close = false;  
try {  
    do {  
        byteBuf = allocHandle.allocate(allocator);  
        allocHandle.lastBytesRead(doReadBytes(byteBuf));  
        if (allocHandle.lastBytesRead() <= 0) {  
            // nothing was read. release the buffer.  
            byteBuf.release();  
            byteBuf = null;  
            close = allocHandle.lastBytesRead() < 0;  
            if (close) {  
                // There is nothing left to read as we received an EOF.  
                readPending = false;  
            }  
            break;  
        }

        allocHandle.incMessagesRead(1);  
        readPending = false;  
        // 触发 ChannelRead  
        pipeline.fireChannelRead(byteBuf);  
        byteBuf = null;  
    } while (allocHandle.continueReading());

    allocHandle.readComplete();  
    // 触发 ChannelReadComplete  
    pipeline.fireChannelReadComplete();

    if (close) {  
        // 触发 ChannelInactive 和 ChannelUnregister  
        closeOnRead(pipeline);  
    }  
} catch (Throwable t) {  
    handleReadException(pipeline, byteBuf, t, close, allocHandle);  
} finally {  
    // Check if there is a readPending which was not processed yet.  
    // This could be for two reasons:  
    // \* The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method  
    // \* The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method  
    //  
    // See https://github.com/netty/netty/issues/2254  
    if (!readPending && !config.isAutoRead()) {  
        removeReadOp();  
    }  
}  

}

HandlerContext 中有一个整数 executionMask,不同的 bit 位表示不同的事件,为 1 表示可以处理该事件。

// io.netty.channel.AbstractChannelHandlerContext
private final int executionMask;

final class ChannelHandlerMask {
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;

private static final int MASK\_ALL\_INBOUND = MASK\_EXCEPTION\_CAUGHT | MASK\_CHANNEL\_REGISTERED |  
        MASK\_CHANNEL\_UNREGISTERED | MASK\_CHANNEL\_ACTIVE | MASK\_CHANNEL\_INACTIVE | MASK\_CHANNEL\_READ |  
        MASK\_CHANNEL\_READ\_COMPLETE | MASK\_USER\_EVENT\_TRIGGERED | MASK\_CHANNEL\_WRITABILITY\_CHANGED;  
private static final int MASK\_ALL\_OUTBOUND = MASK\_EXCEPTION\_CAUGHT | MASK\_BIND | MASK\_CONNECT | MASK\_DISCONNECT |  
        MASK\_CLOSE | MASK\_DEREGISTER | MASK\_READ | MASK\_WRITE | MASK\_FLUSH;  

}

以 ChannelActive 为例,通过比较 bit 位上的值,判断该 HandlerContext 是否处理 ChannelActive 事件

// io.netty.channel.AbstractChannelHandlerContext#fireChannelActive
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}

// io.netty.channel.AbstractChannelHandlerContext#findContextInbound
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

如何使用 UserEvent?

首先让自己的 handler 实现 userEventTriggered 方法

class MyInboundHandler extends SimpleChannelInboundHandler {

@Override  
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
    // 处理事件,简单打印  
    System.out.println(evt);  
    // 从当前 HandlerContext 向后传播 evt,如果没有这行代码,则不会向后传播事件了  
    super.userEventTriggered(ctx, evt);  
}

@Override  
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {  
    super.channelRead0(ctx, msg);  
}  

}

通过 pipeline 传播事件,从 HeadContext 向后传播事件

channel.pipeline().fireUserEventTriggered("i am an event");

read 事件,是从 HeadContext 开始向后传播

write 操作,是从 TailContext 开始向前传播

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章