本篇文章我们来分析NioEventLoop的功能。
NioEventLoop是一个单线程的线程池,由EventLoopGroup维护。以NioEventLoopGroup为例,在它的父类MultithreadEventExecutorGroup中持有一个NioEventLoop的数组,执行任务时可以选择一个NioEventLoop,在NioEventLoop的线程中来异步执行任务。
继承关系如下所示:

新建
MultithreadEventExecutorGroup
以NioEventLoopGroup为例,在其父类MultithreadEventExecutorGroup的构造函数中调用newChild方法新建NioEventLoop。
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, |
NioEventLoop在MultithreadEventExecutorGroup的构造函数中被创建,如果不指定NioEventLoop的数量,默认情况下会创建两倍的CPU核数的NioEventLoop。下面是MultithreadEventExecutorGroup构造函数的执行流程:
- 首先创建
ThreadPerTaskExecutor,它是线程执行器,负责创建NioEventLoop对应的底层线程 - 通过for循环调用
newChild方法创建NioEventLoop的对象数组 - 调用
chooserFactory.newChooser创建线程选择器,线程选择器的作用是为每个新连接分配NioEventLoop线程
ThreadPerTaskExecutor
ThreadPerTaskExecutor执行的作用是每次执行任务的时候都会创建一个线程实体。
NioEventLoop线程命名规则为nioEventLoop-{poolId}-{xx}。{poolId}表示线程池id,{xx}表示NioEventLoopGroup下的第几个NioEventLoop。
newChild()
newChild()方法如下:
1 |
|
调用NioEventLoop的构造函数新建NioEventLoop。
newChooser()
chooser的作用是为了给新连接绑定对应的NioEventLoop。
判断线程池中线程的数量,如果是2的幂则创建PowerOfTwoEventExecutorChooser,否则创建GenericEventExecutorChooser。
NioEventLoop
NioEventLoop的构造函数如下:
1 | NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, |
parent:前面我们新建的NioEventLoopGroupexecutor:MultithreadEventExecutorGroup构造函数中新建的ThreadPerTaskExecutorselectorProvider:NioEventLoopGroup构造函数中调用SelectorProvider.provider()返回的SelectorProviderstrategy:NioEventLoopGroup构造函数中给定的DefaultSelectStrategyFactoryrejectedExecutionHandler:NioEventLoopGroup构造函数中给定的RejectedExecutionHandler。该接口有一个唯一的接口方法rejected,当尝试去添加一个任务到SingleThreadEventExecutor中,但是由于容量的限制添加失败了,那么此时该方法就会被调用。
final SelectorTuple selectorTuple = openSelector();开启Selector,构造SelectorTuple实例,SelectorTuple是一个封装了原始selector对象和封装后selector对象(即SelectedSelectionKeySetSelector对象)的类:
1 | private static final class SelectorTuple { |
这里,成员变量unwrappedSelector就是通过SelectorProvider.provider().openSelector()开启的Selector;而成员变量selector则是一个SelectedSelectionKeySetSelector对象。
SelectedSelectionKeySetSelector中持有unwrappedSelector,并作为unwrappedSelector的代理类,提供Selector所需要的方法,而Selector相关的操作底层实际都是由unwrappedSelector来完成的,只是在操作中增加了对selectionKeys进行相应的设置。
SelectedSelectionKeySetSelector中除了持有unwrappedSelector实例外还持有一个SelectedSelectionKeySet对象。该对象是Netty提供的一个代替Selector的selectedKeys对象。openSelector()方法中通过反射机制将程序构建的SelectedSelectionKeySet对象给设置到了Selector内部的selectedKeys、publicSelectedKeys属性。这使Selector中所有对selectedKeys、publicSelectedKeys的操作实际上就是对SelectedSelectionKeySet的操作。
SelectedSelectionKeySet类主要通过成员变量SelectionKey[]数组来维护被选择的SelectionKeys,并将扩容操作简单地简化为newCapacity为oldCapacity的2倍来实现。同时不再支持remove、contains、iterator方法。并添加了reset方法来对SelectionKey[]数组进行重置。
SelectedSelectionKeySetSelector在每次select操作的时候,都会先将selectedKeys进行清除(reset)操作。
启动
当NioEventLoop新建完成之后,其线程并没有开始执行,只有当有任务被添加到该线程池中,其中的线程才开始执行。
添加任务的操作在其父类SingleThreadEventExecutor的execute方法中完成:
1 | public void execute(Runnable task) { |
步骤如下:
- 将任务
task加入任务队列taskQueue中 - 调用
inEventLoop()方法判断当前线程是否是NioEventLoop的线程 如果
inEventLoop()方法返回false,表示当前event loop并没有启动,此时调用startThread()方法创建线程。创建线程的动作由前面创建的ThreadPerTaskExecutor线程执行器调用其execute()方法完成。execute()方法会创建一个FastThreadLocalThread,然后调用start()方法进行启动,启动的时候会执行Runnable里面的run()方法。run()方法中主要有以下两步:- 调用
thread = Thread.currentThread()保存当前的线程,这个线程其实就是线程执行器创建的FastThreadLocalThread - 调用
NioEventLoop.run()方法,run()方法是驱动netty运转的核心方法。
- 调用
满足条件时,触发
wakeup()方法
执行
当NioEventLoop启动之后,就会在其run()方法中循环执行,run()方法是NioEventLoop的核心。
run()方法有一个无限的for循环,循环里主要有三件事:
- 调用
select()方法轮询注册到Selector上的io事件 - 调用
processSelectedKeys()方法处理io事件 - 调用
runAllTasks()方法异步处理外部线程添加到taskQueue中的任务

run()方法的大循环主要完成下面几件事:
- 根据当前
NioEventLoop中是否有待完成的任务得出select策略,进行相应的select操作 - 处理select操作得到的已经准备好处理的IO事件,以及处理提交到当前
EventLoop的任务(包括定时和周期任务) - 如果
NioEventLoop所在线程执行了关闭操作,则执行相关的关闭操作处理。
判断select策略
调用DefaultSelectStrategy的calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)方法判断select策略。
1 | // io.netty.channel.DefaultSelectStrategy#calculateStrategy |
selectSupplier是经过封装的selector(即SelectedSelectionKeySetSelector),hasTasks是调用hasTask()方法的返回值,hasTask()用于判断taskQueue或tailTasks是否有任务。
calculateStrategy方法的选择策略是:
- 如果当前的
EventLoop中有待处理的任务,那么会调用selectSupplier.get()方法,最终会调用Selector.selectNow()方法,返回就绪通道的数量,并清空selectionKeys。 - 如果当前的
EventLoop没有待处理的任务,那么返回SelectStrategy.SELECT(-1)。
如果calculateStrategy方法返回值大于0,则说明有就绪的IO事件待处理,跳出switch代码块,进入流程2。否则如果返回的是SelectStrategy.SELECT,执行select(wakenUp.getAndSet(false)):以CAS的方式获得wakenUp当前的标识,并将wakenUp设置为false。将wakenUp作为参数传入select(boolean oldWakenUp)方法中。
select()
select方法除了检查就绪通道以外,还有一个很重要的事,就是解决epoll bug问题。epoll bug会导致Selector空轮询,IO线程CPU使用率100%,严重影响系统的安全性和可靠性。
Netty的解决策略是:
- 根据该BUG的特征,首先侦测该BUG是否发生
- 将问题Selector上注册的Channel转移到新建的Selector上
- 老的问题Selector关闭,使用新建的Selector替换

红色框中的代码是对epoll bug的解决。
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos为false,即time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) < currentTimeNanos的意思是int selectedKeys = selector.select(timeoutMillis)在timeoutMillis时间到期前就返回了,并且selectedKeys == 0。这意味着selector进行了一次空轮询,即发生了epoll bug。Selector不管有无感兴趣的时间发生,select()方法总是不阻塞就返回,这会导致CPU的利用率达到100%。
SELECTOR_AUTO_REBUILD_THRESHOLD默认为512,也就是当Selector连续执行了512空轮询后,Netty就会进行Selector的重建操作,即rebuildSelector()操作。
绿色框中的代码主要说明了,当有定时/周期性任务即将到达执行时间(<0.5ms),或者NioEventLoop的线程收到了新提交的任务等待被处理,或者有定时/周期性任务到达了可处理状态等待被处理,那么则退出select方法转而去执行任务。这也说明了Netty总是会尽最大努力去保证任务队列中的任务以及定时/周期性任务能得到及时的处理。
1 | long currentTimeNanos = System.nanoTime(); |
该段代码会计算scheduledTaskQueue中是否有即将要执行的任务,即在0.5ms内就可执行的scheduledTask,如果有则退出select方法转而去执行任务。
delayNanos方法会返回最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差(若scheduledTaskQueue为空,也就是没有任务的定时/周期性任务,则返回1秒)。因此selectDeadLineNanos就表示最近一个待执行的定时/周期性任务的可执行时间。
selectDeadLineNanos - currentTimeNanos就表示:最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差。如果(selectDeadLineNanos - currentTimeNanos + 0.5ms) / 1ms <= 0表示selectDeadLineNanos - currentTimeNanos < 0.5ms,即scheduledTaskQueue中在0.5ms内有可执行的任务,于是退出select方法。
1 | if (hasTasks() && wakenUp.compareAndSet(false, true)) { |
在了解上面代码的用意之前,我们先来看下任务提交时的一下细节:
1 | public void execute(Runnable task) { |
当满足下面4个条件时,在有任务提交至EventLoop后会触发Selector的wakeup()方法:
- 成员变量
addTaskWakesUp为false。这里,在构造NioEventLoop对象时,通过构造方法传进的参数addTaskWakesUp正是false,它会赋值给变量addTaskWakesUp。因此该条件满足。 - 提交上来的任务不是一个
NonWakeupRunnable任务
1 |
|
- 执行提交任务的线程不是EventLoop所在线程
- 当
wakenUp成员变量当前的值为false
1 | protected void wakeup(boolean inEventLoop) { |
只有同时满足上面4个条件的情况下,Selector的wakeup()方法才会得以调用。
现在,我们再来说明这段代码块的用意:
1 | if (hasTasks() && wakenUp.compareAndSet(false, true)) { |
如果一个任务在wakenUp值为true的情况下被提交上来,那么这个任务将没有机会去调用Selector.wakeup()。所以我们需要去再次检测任务中是否有待执行的任务,在执行Selector.select操作之前。如果我们不这么做,那么任务队列中的任务将等待直到Selector.select操作超时。如果ChannelPipeline中存在IdleStateHandler,那么IdleStateHandler处理器可能会被挂起直到空闲超时。
这段代码在每次要执行Selector.select(long timeout)之前我们会进行一个判断。如果hasTask()为true,即发现当前有任务待处理时,wakenUp.compareAndSet(false, true)会返回true,因为在每次调用当前这个select方法时,都会将wakeUp标识设置为false(wakenUp.getAndSet(false)这句代码)。而此时wakenUp已经被置位为true了,在此之后有任务提交至EventLoop,那么是无法触发Selector.wakeup()的。所以如果当前有待处理的任务,就不会进行下面的Selector.select(long timeout)操作,而是退出select方法,继而去处理任务。
因为如果不这么做的话,如果当前NioEventLoop线程上已经有任务提交上来,这会使得这些任务可能会需要等待Selector.select(long timeout)操作超时后才能得以执行。再者,假设我们的ChannelPipeline中存在一个IdleStateHandler,那么就可能导致因为Selector.select(long timeout)操作的timeout比IdleStateHandler设置的idle timeout长,而导致IdleSateHandler不能对空闲超时做出及时的处理。
同时,我们注意,在执行break退出select方法前,会执行selector.selectNow(),该方法不会阻塞,它会立即返回,同时它会抵消Selector.wakeup()操作带来的影响。
所以,如果有非NioEventLoop线程提交一个任务上来,那么这个线程会执行selector.wakeup()方法,那么NioEventLoop在if (hasTasks() && wakenUp.compareAndSet(false, true))的后半个条件会返回false,程序会执行到int selectedKeys = selector.select(timeoutMillis),但是此时select不会阻塞,而是直接返回,因为前面已经先执行了selector.wakeup()。
因为提交任务的线程是非NioEventLoop线程,所以也可能是由NioEventLoop线程成功执行了if (hasTasks() && wakenUp.compareAndSet(false, true)),退出了select方法转而去执行任务队列中的任务。注意,这时提交任务的非NioEventLoop线程就不会执行selector.wakeup。
1 | if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { |
同时,除了在每次Selector.select(long timeout)操作前进行任务队列的检测外,在每次Selector.select(long timeout)操作后也会检测任务队列是否已经有提交上来的任务待处理,以及是否有定时或周期性任务准备好被执行。如果有,也不会继续epoll bug的检测,转而去执行待处理的任务。
如果检测到发生了epoll bug,调用rebuildSelector()进行Selector的重构操作。重构操作流程如下:
- 调用
openSelector()先构造一个新的SelectorTupe。 - 然后,遍历
oldSelector中的所有SelectionKey,依次判断其有效性,如果有效则将其重新注册到新的Selector上,并将旧的SelectionKey执行cancel操作,进行相关的数据清理,以便最后oldSelector好进行关闭。 - 在将所有的
SelectionKey数据移至新的Selector后,将newSelectorTuple的selector和unwrappedSelector赋值给相应的成员属性。 - 最后,调用
oldSelector.close()关闭旧的Selector以进行资源的释放。
处理IO事件
判断完select策略之后,接着处理select操作得到的已经准备好处理的IO事件,以及处理提交到当前EventLoop的任务(包括定时和周期任务)。
1 | cancelledKeys = 0; |
- 首先将成员变量
cancelledKeys和needsToSelectAgain重置,即cancelledKeys置为0,needsToSelectAgain置为false - 成员变量
ioRatio的默认值为50。ioRatio是在事件循环中用于处理IO操作时间的百分比。默认为50%。也就是说,在事件循环中默认情况下用于处理IO操作的时间和用于处理任务的时间百分比都是50%。 - 调用
processSelectedKeys()处理Selector.select操作返回的待处理的IO事件 - 调用
runAllTasks处理任务队列中的任务以及定时/周期性任务
processSelectedKeys()
processSelectedKeys()方法依次取出准备好被处理的SelectionKey,并对相应的待处理IO事件进行处理。
1 | private void processSelectedKeys() { |
selectedKeys.keys[i] = null操作相当于我们在NIO编程中在处理已经触发的感兴趣的事件时,要将处理过的事件从selectedKeys集合中移除的步骤。
在将ServerSocketChannel注册到Selector的时候,是会将其对应的NioServerSocketChannel作为附加属性设置到SelectionKey中。所以这里从k.attachment()获取到的Object对象实际就是NioServerSocketChannel,而NioServerSocketChannel就是一个AbstractNioChannel的实现类。

首先检查当前的SelectionKey是否有效(仅当SelectionKey从Selector上注销的时候,该SelectionKey会为无效状态),如果无效的话:
- 获取该
SelectionKey所关联的Channel所注册的EventLoop,如果获取Channel的EventLoop失败,则忽略错误直接返回。因为我们只处理注册到EventLoop上的Channel且有权去关闭这个Channel - 如果获取到的
EventLoop不是当前的执行线程所绑定的EventLoop,或者获取到的EventLoop为null,则直接返回。因为我们只关注依然注册在当前执行线程所绑定的EventLoop上的Channel。Channel可能已经从当前的EventLoop上注销,并且它的SelectionKey可能已经被取消了,作为在注销处理流程的一部分。当然如果Channel仍然健康的被注册在当前的EventLoop上,则需要去关闭它 - 当能正确获取到
EventLoop,且该EventLoop非空并为当前执行线程所绑定的EventLoop,则说明Channel依旧注册去当前的EventLoop上,那么执行关闭操作,来关闭相应的连接,释放相应的资源
如果SelectionKey是有效的,获取readyOps。
当SelectionKey.OP_CONNECT事件就绪时:
- 将
SelectionKey.OP_CONNECT事件从SelectionKey所感兴趣的事件中移除,这样Selector就不会再去监听该连接的SelectionKey.OP_CONNECT事件了。而SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,就可以进行读、写操作了 - 调用
unsafe.finishConnect()方法,该方法会调用SocketChannel.finishConnect()来标识连接的完成,如果我们不调用该方法,就去调用read/write方法,则会抛出NotYetConnectedException异常。在此之后,触发ChannelActive事件,该事件会在该Channel的ChannelPipeline中传播。
runAllTasks()
runAllTasks()方法处理任务队列中的任务以及定时/周期性任务。
将runAllTasks()方法写在finally块中,这是为了确保即便处理SelectedKeys出现了异常,也要确保任务中的队列总能得到执行的机会。

步骤1
获取系统启动到当前的时间内已经过期的定时任务(即,延迟的时间已经满足或者定时执行任务的时间已经满足的任务)放入到taskQueue中。从taskQueue中获取任务,如果taskQueue已经没有任务了,则依次执行tailTasks队列里的所有任务。
fetchFromScheduledTaskQueue方法获取过期的定时任务放入到taskQueue中:
1 | private boolean fetchFromScheduledTaskQueue() { |
- 获取从系统启动到当前系统的时间间隔
- 从
scheduledTaskQueue中获取在该时间间隔内已经过期的任务(即延迟周期或定时周期已经到时间的任务),将这些任务放入到taskQueue中 - 如果
taskQueue满了,无法添加新的任务(taskQueue队列的容量限制最大为2048),则将其重新放回到scheduledTaskQueue
默认情况下,taskQueue是一个MpscUnboundedArrayQueue实例
pollScheduledTask方法根据给定的nanoTime返回已经准备好被执行的Runnable。必须使用AbstractScheduledEventExecutor.nanoTime()方法来检索正确的nanoTime:
1 | protected final Runnable pollScheduledTask(long nanoTime) { |
scheduledTaskQueue是一个PriorityQueue实例,它根据任务的deadlineNanos属性的升序来维护一个任务队列,每次peek能返回最先该被执行的定时任务。deadlineNanos表示系统启动到该任务应该被执行的时间点的时间差。如果scheduledTask.deadlineNanos() <= nanoTime则说明该任务的执行时间已经到了,因此将其从scheduledTaskQueue移除,然后通过该方法返回后放入到taskQueue中等待被执行。
因此,可知每次执行taskQueue前,taskQueue中除了有用户自定义提交的任务,系统逻辑流程提交至该NioEventLoop的任务,还有用户自定义或者系统设置的已经达到运行时间点的定时/周期性任务会一并放入到taskQueue中,而taskQueue的初始化容量为1024,最大长度限制为2048,也就是一次事件循环最多只能处理2048个任务。
afterRunningAllTasks()方法会依次执行tailQueue中的任务,tailTasks中是用户自定义的一些列在本次事件循环遍历结束后会执行的任务,你可以通过类似以下的方式来添加tailTask:
1 | ((NioEventLoop)ctx.channel().eventLoop()).executeAfterEventLoopIteration(() -> { |
步骤2
通过系统启动到当前的时间差+可用于执行任务的时间=系统启动到可用于执行任务时间的时间段(deadline)。从taskQueue中依次取出任务,如果task为null则说明已经没有待执行的任务,那么退出for循环。否则,同步执行task,每执行64个任务后,就计算系统启动到当前的时间是否大于等于deadline,如果是则说明已经超过了分配给任务执行的时间,此时就不会继续执行taskQueue中的任务了。
1 | protected static void safeExecute(Runnable task) { |
safeExecute()方法调用task的run方法来同步执行任务:
1 | if ((runTasks & 0x3F) == 0) { |
63的16进制表示为0x3f(二进制表示为0011 1111),当已经执行的任务数量小于64时,其与0x3f的位与操作会大于0,当其等于64(64的16进制表示为0x40,二进制表示为0100 0000)时,runTasks & 0x3f的结果为0。所以是每执行64个任务后就进行一次时间的判断,以保证执行任务队列的任务不会严重的超出我们所设定的时间。
步骤3
依次执行tailTasks队列里的所有任务。赋值全部属性lastExecutionTime为最后一个任务执行完后的时间。
总结
NioEventLoop创建用户代码创建
NioEventLoopGroup的时候NioEventLoop被创建,默认不传参数的时候会创建两倍的CPU核数的NioEventLoop。每个NioEventLoopGroup都会有一个chooser进行现场的分配,chooser也会根据NioEventLoop的个数做一定程度的优化。NioEventLoop的创建的时候会创建一个Selector和一个定时任务队列。在创建Selector的时候netty会通过反射的方式用数组实现来替换掉Selector里面的两个hashset数据结构。NioEventLoop启动NioEventLoop在首次调用execute方法的时候启动线程,这个线程是一个FastThreadLocalThread。启动线程之后,netty会将启动完成的线程保存到成员变量thread中,这样就能在执行逻辑过程中判断当前线程是否在NioEventLoop中。NioEventLoop执行逻辑NioEventLoop执行逻辑在run方法里。主要包括三个过程:- 检测IO事件。
- 处理IO事件。
- 执行任务队列。