Netty:Netty的介绍以及它的核心组件(二)—— ChannelFuture与回调
阅读原文时间:2023年07月11日阅读:5

Callback 回调

  一个 Callback(回调)就是一个方法,一个提供给另一个的方法的引用。

  这让另一个方法可以在适当的时候回过头来调用这个 callback 方法。Callback 在很多编程情形中被广泛使用,是用于通知相关方某个操作已经完成最常用的方法之一。Netty在处理事件时内部使用了 callback;当一个 callback被触发,事件可以被 ChannelHandler 的接口实现处理。

1 public class ConnectHandler extends ChannelInboundHandlerAdapter {
2 //当一个新的连接建立时,channelActive被调用
3 @Override
4 public void channelActive(ChannelHandlerContext ctx) throws Exception {
5 System.out.println("client:"+ ctx.channel().remoteAddress() + " connected.");
6 }
7 }

  这个 ConnectHandler 实例 (相当于被调用者)以参数的形式传入创建 Channel 连接的函数(调用者)中,之后这个函数创建新连接后,就会回来调用这个 ConnectHandler 的 channelActive 方法,这个过程就叫回调

Future 未来结果的占位符

  Netty 中通道上的每个 IO 操作都是无阻塞的。

  这意味着调用后立即返回所有操作。标准 Java 库中有一个 Future 接口,用来访问未来异步操作在某个时刻完成,并用其提供对结果的访问,但是对于 Netty 而言并不方便使用(操作繁琐),我们只能询问 Future。

  将 Future 视为保存结果的对象(它可能暂时不保存结果),但将来会保存(一旦 Callable 回调函数返回)。因此,Future 基本上是主线程可以跟踪进度以及其他线程的结果的一种方式。

  这就是 Netty 拥有自己的 ChannelFuture 接口的原因。我们可以将回调传递给 ChannelFuture,该回调将在操作完成时被调用。

  例如以下代码, ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); 服务器引导类绑定端口号,返回一个 ChannelFuture。

1 private ChannelFuture doBind(final SocketAddress localAddress) {
2 final ChannelFuture regFuture = initAndRegister();
3 final Channel channel = regFuture.channel();
4 if (regFuture.cause() != null) {
5 return regFuture;
6 }
7
8 if (regFuture.isDone()) {
9 // At this point we know that the registration was complete and successful.
10 ChannelPromise promise = channel.newPromise();
11 doBind0(regFuture, channel, localAddress, promise);
12 return promise;
13 } else {
14 // Registration future is almost always fulfilled already, but just in case it's not.
15 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
16 regFuture.addListener(new ChannelFutureListener() {
17 @Override
18 public void operationComplete(ChannelFuture future) throws Exception {
19 Throwable cause = future.cause();
20 if (cause != null) {
21 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
22 // IllegalStateException once we try to access the EventLoop of the Channel.
23 promise.setFailure(cause);
24 } else {
25 // Registration was successful, so set the correct executor to use.
26 // See https://github.com/netty/netty/issues/2586
27 promise.registered();
28
29 doBind0(regFuture, channel, localAddress, promise);
30 }
31 }
32 });
33 return promise;
34 }
35 }

  在 initAndRegister() 方法初始化并注册 NioServerSocketChannel 这个操作是异步非阻塞,通过这个 Future 来访问异步操作是否完成,以防万一,没有完成会添加一个 Listener 监听结果,当异步操作完成时通过回调来执行相应操作,成功执行 doBind0(regFuture, channel, localAddress, promise); 来设置要使用的正确执行器。

参考:《Netty In Action》