Netty——新连接接入

在前面的文章中,我们分别分析了Netty的启动过程以及NioEventLoop的工作流程。本文我们来分析Netty怎样处理新连接的接入。

经过前文Netty——启动过程分析的描述,我们知道Netty在服务端channel的启动过程中将selector注册到jdk的channel上,并将NioServerSocketChannel作为attachment。服务端channel绑定过程中注册一个accept事件,注册完成后Netty就可以接收新的连接了。

OP_ACCEPT事件

当新连接接入时,select会接收到accept事件。NioEventLoop的运行过程中,判断是OP_ACCEPT事件,于是执行以下代码:

1
2
3
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}

服务端channel——NioServerSocketChannel持有的unsafeNioMessageUnsafe

NioMessageUnsafe.read()方法首先调用NioServerSocketChannel.doReadMessages方法:

NioServerSocketChannel.doReadMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

NioServerSocketChannel.doReadMessages方法主要执行了以下两个步骤:

  1. 调用accept方法从jdk的服务端channel——ServerSocketChannel中获取jdk的客户端channel——SocketChannel
  2. 使用jdk的客户端channel新建netty的客户端channel——NioSocketChannel

NioSocketChannel的新建完成了以下几件事:

  1. 创建idunsafepipeline,其中客户端channel持有的unsafe是NioSocketChannelUnsafe
  2. 保存jdk的客户端channel
  3. 保存感兴趣的事件——SelectionKey.OP_READ
  4. 设置阻塞模型为false,即非阻塞模式
  5. 新建客户端channel的配置——NioSocketChannelConfig。设置TcpNoDelaytrue

ServerBootstrapAcceptor.channelRead

回到NioMessageUnsafe.read()方法,生成客户端channel——NioSocketChannel之后,调用DefaultChannelPipeline.fireChannelRead方法传播读事件。

事件传播到ServerBootstrapAcceptor.channelRead方法。这是因为服务端Channel初始化过程中在channel的pipeline中添加了一个ServerBootstrapAcceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

channelRead方法首先将childHandler中添加到pipeline中,并且设置childOptions

然后调用childGroup.register(child)注册客户端channel,childGroup是我们在程序启动时新建的工作线程组(NioEventLoopGroup)。这个方法比较关键,将客户端channel分配到工作线程中去执行。

1
2
3
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

next()调用chooser在线程组中挑选一个线程来执行register操作。执行NioEventLoop的父类SingleThreadEventLoopregister方法:

1
2
3
4
5
6
7
8
9
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

最后调用客户端channel持有的unsafe——NioSocketChannelUnsafe的register方法,在eventLoop线程中执行NioSocketChannelUnsafe.register0方法。

NioSocketChannelUnsafe.register0

  1. 调用NioSocketChannel父类AbstractNioChanneldoRegister方法。在jdk的channel上注册selector,将客户端channel——NioSocketChannel作为attachment

  2. 调用pipeline的fireChannelRegistered方法传播注册事件

  3. 调用pipeline的fireChannelActive方法传播active事件

    HeadContext.channelActive方法:

    1
    2
    3
    4
    5
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
    }
    1. 继续传播active事件
    2. readIfIsAutoRead方法调用TailContext.read方法

      TailContext.read方法调用NioSocketChannelUnsafebeginRead方法,beginRead方法调用NioSocketChannel.doBeginRead()方法。NioSocketChannel.doBeginRead方法注册感兴趣的读事件。

OP_READ事件

NioEventLoop的运行过程中,判断是OP_READ事件,和OP_ACCEPT事件一样执行以下代码:

1
2
3
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}

注意,此时程序是在工作线程中执行的,当前的channel是客户端channel——NioSocketChannel,unsafe对应的是NioSocketChannelUnsafe

read方法执行步骤如下:

  1. 分配ByteBuf
  2. 调用doReadBytes方法从jdk的channel中读取数据保存在ByteBuf
  3. 调用pipeline的fireChannelRead方法传播read事件
  4. 调用pipeline的fireChannelReadComplete方法传播读完成事件

总结

新连接接入的过程充分体现了netty的线程模型:

  1. 在boss线程中由服务端channel——NioServerSocketChannel检测到accept事件
  2. 使用jdk的客户端channel新建netty的客户端channel——NioSocketChannel
  3. NioSocketChannel注册到工作线程中
  4. NioSocketChannel中注册读事件

结果上面的步骤,客户端channel——NioSocketChannel就可以响应读事件。