Netty——启动过程分析

本篇文章我们来分析Netty服务端与客户端的启动过程。

服务端启动过程

如果我们要使用Netty来开发一个服务,大体上我们的代码会如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
final int port = 6789;
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new NettyServerFilter());

ChannelFuture f = bootstrap.bind().sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

主要分为以下3步:

  1. 初始化EventLoopGroup
  2. 配置ServerBootstrap
  3. 创建并绑定Channel

初始化EventLoopGroup

EventLoopGroup本质上来说可以看成是一个线程池。以NioEventLoopGroup为例,它的继承关系如下图所示:

NioEventLoopGroup

NioEventLoopGroup继承MultithreadEventLoopGroup,它在实例化过程中会调用MultithreadEventLoopGroup的构造函数:

1
2
3
4
5
6
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

如果我们没有指定线程数,默认维护逻辑处理器核数2倍的线程。

MultithreadEventExecutorGroup中维护了一个EventExecutor[] childrenEventExecutor是用于实际处理的线程,数组大小为前面指定的线程数。

EventExecutorNioEventLoopGroupnewChild方法建立,实际的类型为NioEventLoop

1
2
3
4
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop是一个单线程的线程池,核心方法是run()方法,一旦线程启动,就会不间断地查询任务队列taskQueue,将taskQueue中的任务按顺序取出并执行。关于NioEventLoop的内容我们在后面详细分析。

MultithreadEventExecutorGroup中维护了一个EventExecutorChooserFactory.EventExecutorChooser chooser,它是一个EventExecutor的选择器,负责从children中选择一个EventExecutor。根据children数组大小的不同,从PowerOfTwoEventExecutorChooserGenericEventExecutorChooser选择不同的实例。在children中轮询选择EventExecutor

配置ServerBootstrap

ServerBootstrap用于引导创建服务端Channel

  1. 调用ServerBootstrap中的public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法设置线程池。其中parentGroup是用于处理连接请求的group(即acceptor),childGroup是用于处理事件的group(即client)。

  2. 调用AbstractBootstrap中的public B channel(Class<? extends C> channelClass)方法设置channel的类型。

  3. 配置服务端监听的端口

  4. 调用ServerBootstrap中的public ServerBootstrap childHandler(ChannelHandler childHandler)方法配置消息处理器,由childHandler持有。

创建并绑定服务端Channel

调用AbstractBootstrappublic ChannelFuture bind()方法来创建并绑定服务端Channel,详细的操作在doBind()方法中,主要的步骤有两步:

  1. 调用initAndRegister初始化并注册服务端Channel
  2. 调用doBind0绑定服务端Channel

初始化并注册服务端Channel

初始化并注册服务端Channel的工作在AbstractBootstrap.initAndRegister方法中完成,它有以下几个步骤。

创建服务端Channel

首先调用channelFactorynewChannel()方法创建服务端Channel。前面我们设置了Channel的类型为NioServerSocketChannel,因此这里会根据Channel的类型通过反射的方式新建ChannelNioServerSocketChannel的构造过程如下:

  1. 调用newSocket()方法,通过JDK的SelectorProvider.openServerSocketChannel()方法来创建ServerSocketChannel
  2. 新建NioServerSocketChannelConfig,里面保存tcp参数等数据
  3. 调用父类AbstractNioChannel的构造方法。传入刚刚创建ServerSocketChannel,将这个ServerSocketChannel保存在变量ch中,之后可以通过javaChannel()方法来获取这个ServerSocketChannel。调用ServerSocketChannel.configureBlocking()方法将ServerSocketChannel设置成非阻塞模式。
  4. 调用父类AbstractChannel的构造方法,创建idunsafepipeline

初始化服务端Channel

接着调用ServerBootstrap.init方法初始化新建的Channel

  1. 设置channel的ChannelOptionsChannelAttrs
  2. 设置channel的ChildOptionsChildAttrs
  3. 调用Channel.pipeline()方法获取ChannelPipeline pipelinepipelineAbstractChannel中维护,类型为DefaultChannelPipelineChannelPipeline用于维护ChannelHandlerChannelHandler保存在DefaultChannelHandlerContext,以链表的形式保存在DefaultChannelPipeline
  4. 在pipeline中添加一个ChannelInitializerChannelInitializer在管道注册完成之后,往管道中添加一个ServerBootstrapAcceptor(继承ChannelInboundHandler),它持有对childGroupbossGroup)和childHandlerNettyServerFilter)的引用,这个处理器的功能就是为accept的新连接分配线程。

注册selector

最后调用config().group().register(channel)注册selector。获取group(这里的group是我们前面设置的bossGroup),调用MultithreadEventLoopGroup.register方法注册Channel

  1. 调用next()方法选择一个线程(EventLoop)。因为前面选择的是bossGroup,因此这里的EventLoop选择的是bossGroup中的EventLoop
  2. 然后调用SingleThreadEventLoop.register方法。

    register方法调用promise.channel().unsafe().register(this, promise)注册Channelunsafe()返回的是NioMessageUnsafe

  3. 最终调用的是AbstractUnsafe.register0方法。

    1. 调用前面新建的ServerSocketChanneldoRegister方法,将selector注册到jdk的channel上,将当前NioServerSocketChannel作为attachment。
    2. 调用pipeline.invokeHandlerAddedIfNeeded(),初始化Channel
    3. 调用pipeline.fireChannelRegistered(),传播注册成功事件

绑定服务端Channel

调用AbstractBootstrap.doBind0方法,在channel的线程池中绑定地址。

Channel的绑定调用AbstractChannel.bind来完成,其中的调用关系如下:

  1. AbstractChannel.bind
  2. DefaultChannelPipeline.bind
  3. AbstractChannelHandlerContext.bind
  4. AbstractChannelHandlerContext.invokeBind
  5. DefaultChannelPipeline.bind
  6. AbstractUnsafe.bind
  7. NioServerSocketChannel.doBind

    最终调用的是NioServerSocketChannel.doBind方法,其中调用底层的jdk的channel来绑定地址

  8. pipeline.fireChannelActive(),传播channelActive事件

    最后调用HeadContext.channelActive方法

    1. 调用ctx.fireChannelActive()传播active事件
    2. 这一步很重要,调用DefaultChannelPipeline.HeadContext.readIfIsAutoRead(),最后调用AbstractNioChannel.doBeginRead()方法在SelectionKey中注册accept事件

总结

  1. 首先调用newChannel()创建服务端的channel,这个过程实际上是调用JDK底层的API来创建JDK的channel,然后netty将其包装成自己的服务端channel,同时会创建一些基本的组件绑定在此channel上,比如pipeline
  2. 然后调用init()方法初始化服务端channel,这个过程最重要的是为服务端channel添加一个连接处理器。
  3. 随后调用register()方法注册Selector,这个过程中netty将JDK底层的channel注册到事件轮询器Selector中,并把netty的服务端channel作为一个attachment绑定到对应的JDK底层channel中。
  4. 最后调用doBind()方法,调用JDK底层的API实现对本地端口的监听。绑定成功之后,netty会重新向Selector注册一个accept事件,注册完成后netty就可以接收新的连接了。

客户端启动过程

下面来看看客户端的启动过程。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyClient {
public static String host = "127.0.0.1";
public static int port = 6789;

public static void main(String[] args) throws InterruptedException, ExecutionException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new NettyClientFilter());

ChannelFuture channelFuture = bootstrap.connect().sync();
Channel ch = channelFuture.channel();
ch.writeAndFlush("Hello Netty\r\n");
ch.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

客户端代码与服务端代码的差异主要在于两点:

  1. 使用Bootstrap而不是ServerBootstrap来配置参数,引导创建Channel
  2. 调用Bootstrapconnect方法建立连接

connect方法中首先新建并注册Channel,这个过程和服务端差不多,只是初始化Channel的时候在pipeline中添加的是自定义的handler,而服务端则是添加了一个ServerBootstrapAcceptor

然后调用Channelconnect方法,在ChannelPipelineconnect方法中调用AbstractNioUnsafe.connect方法,最终调用的是SocketChannel.connect方法建立连接。返回连接的结果,如果没有立即连上,在selectKey上设置OP_CONNECT事件。