本篇文章我们来分析NioEventLoop
的功能。
NioEventLoop
是一个单线程的线程池,由EventLoopGroup
维护。以NioEventLoopGroup
为例,在它的父类MultithreadEventExecutorGroup
中持有一个NioEventLoop
的数组,执行任务时可以选择一个NioEventLoop
,在NioEventLoop
的线程中来异步执行任务。
继承关系如下所示:
新建
MultithreadEventExecutorGroup
以NioEventLoopGroup
为例,在其父类MultithreadEventExecutorGroup
的构造函数中调用newChild
方法新建NioEventLoop
。
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, |
NioEventLoop
在MultithreadEventExecutorGroup
的构造函数中被创建,如果不指定NioEventLoop
的数量,默认情况下会创建两倍的CPU核数的NioEventLoop
。下面是MultithreadEventExecutorGroup
构造函数的执行流程:
- 首先创建
ThreadPerTaskExecutor
,它是线程执行器,负责创建NioEventLoop
对应的底层线程 - 通过for循环调用
newChild
方法创建NioEventLoop
的对象数组 - 调用
chooserFactory.newChooser
创建线程选择器,线程选择器的作用是为每个新连接分配NioEventLoop
线程
ThreadPerTaskExecutor
ThreadPerTaskExecutor
执行的作用是每次执行任务的时候都会创建一个线程实体。
NioEventLoop
线程命名规则为nioEventLoop-{poolId}-{xx}
。{poolId}
表示线程池id,{xx}
表示NioEventLoopGroup
下的第几个NioEventLoop
。
newChild()
newChild()
方法如下:
1 |
|
调用NioEventLoop
的构造函数新建NioEventLoop
。
newChooser()
chooser
的作用是为了给新连接绑定对应的NioEventLoop
。
判断线程池中线程的数量,如果是2的幂则创建PowerOfTwoEventExecutorChooser
,否则创建GenericEventExecutorChooser
。
NioEventLoop
NioEventLoop
的构造函数如下:
1 | NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, |
parent
:前面我们新建的NioEventLoopGroup
executor
:MultithreadEventExecutorGroup
构造函数中新建的ThreadPerTaskExecutor
selectorProvider
:NioEventLoopGroup
构造函数中调用SelectorProvider.provider()
返回的SelectorProvider
strategy
:NioEventLoopGroup
构造函数中给定的DefaultSelectStrategyFactory
rejectedExecutionHandler
:NioEventLoopGroup
构造函数中给定的RejectedExecutionHandler
。该接口有一个唯一的接口方法rejected
,当尝试去添加一个任务到SingleThreadEventExecutor
中,但是由于容量的限制添加失败了,那么此时该方法就会被调用。
final SelectorTuple selectorTuple = openSelector();
开启Selector
,构造SelectorTuple
实例,SelectorTuple
是一个封装了原始selector
对象和封装后selector
对象(即SelectedSelectionKeySetSelector
对象)的类:
1 | private static final class SelectorTuple { |
这里,成员变量unwrappedSelector
就是通过SelectorProvider.provider().openSelector()
开启的Selector
;而成员变量selector
则是一个SelectedSelectionKeySetSelector
对象。
SelectedSelectionKeySetSelector
中持有unwrappedSelector
,并作为unwrappedSelector
的代理类,提供Selector
所需要的方法,而Selector
相关的操作底层实际都是由unwrappedSelector
来完成的,只是在操作中增加了对selectionKeys
进行相应的设置。
SelectedSelectionKeySetSelector
中除了持有unwrappedSelector
实例外还持有一个SelectedSelectionKeySet
对象。该对象是Netty提供的一个代替Selector
的selectedKeys
对象。openSelector()
方法中通过反射机制将程序构建的SelectedSelectionKeySet
对象给设置到了Selector
内部的selectedKeys
、publicSelectedKeys
属性。这使Selector
中所有对selectedKeys
、publicSelectedKeys
的操作实际上就是对SelectedSelectionKeySet
的操作。
SelectedSelectionKeySet
类主要通过成员变量SelectionKey[]
数组来维护被选择的SelectionKeys
,并将扩容操作简单地简化为newCapacity为oldCapacity的2倍来实现。同时不再支持remove
、contains
、iterator
方法。并添加了reset
方法来对SelectionKey[]
数组进行重置。
SelectedSelectionKeySetSelector
在每次select
操作的时候,都会先将selectedKeys
进行清除(reset
)操作。
启动
当NioEventLoop
新建完成之后,其线程并没有开始执行,只有当有任务被添加到该线程池中,其中的线程才开始执行。
添加任务的操作在其父类SingleThreadEventExecutor
的execute
方法中完成:
1 | public void execute(Runnable task) { |
步骤如下:
- 将任务
task
加入任务队列taskQueue
中 - 调用
inEventLoop()
方法判断当前线程是否是NioEventLoop
的线程 如果
inEventLoop()
方法返回false
,表示当前event loop并没有启动,此时调用startThread()
方法创建线程。创建线程的动作由前面创建的ThreadPerTaskExecutor
线程执行器调用其execute()
方法完成。execute()
方法会创建一个FastThreadLocalThread
,然后调用start()
方法进行启动,启动的时候会执行Runnable
里面的run()
方法。run()
方法中主要有以下两步:- 调用
thread = Thread.currentThread()
保存当前的线程,这个线程其实就是线程执行器创建的FastThreadLocalThread
- 调用
NioEventLoop.run()
方法,run()
方法是驱动netty
运转的核心方法。
- 调用
满足条件时,触发
wakeup()
方法
执行
当NioEventLoop
启动之后,就会在其run()
方法中循环执行,run()
方法是NioEventLoop
的核心。
run()
方法有一个无限的for
循环,循环里主要有三件事:
- 调用
select()
方法轮询注册到Selector
上的io事件 - 调用
processSelectedKeys()
方法处理io事件 - 调用
runAllTasks()
方法异步处理外部线程添加到taskQueue
中的任务
run()
方法的大循环主要完成下面几件事:
- 根据当前
NioEventLoop
中是否有待完成的任务得出select策略,进行相应的select操作 - 处理select操作得到的已经准备好处理的IO事件,以及处理提交到当前
EventLoop
的任务(包括定时和周期任务) - 如果
NioEventLoop
所在线程执行了关闭操作,则执行相关的关闭操作处理。
判断select策略
调用DefaultSelectStrategy
的calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)
方法判断select策略。
1 | // io.netty.channel.DefaultSelectStrategy#calculateStrategy |
selectSupplier
是经过封装的selector(即SelectedSelectionKeySetSelector
),hasTasks
是调用hasTask()
方法的返回值,hasTask()
用于判断taskQueue
或tailTasks
是否有任务。
calculateStrategy
方法的选择策略是:
- 如果当前的
EventLoop
中有待处理的任务,那么会调用selectSupplier.get()
方法,最终会调用Selector.selectNow()
方法,返回就绪通道的数量,并清空selectionKeys
。 - 如果当前的
EventLoop
没有待处理的任务,那么返回SelectStrategy.SELECT
(-1
)。
如果calculateStrategy
方法返回值大于0,则说明有就绪的IO事件待处理,跳出switch代码块,进入流程2。否则如果返回的是SelectStrategy.SELECT
,执行select(wakenUp.getAndSet(false))
:以CAS的方式获得wakenUp
当前的标识,并将wakenUp
设置为false
。将wakenUp
作为参数传入select(boolean oldWakenUp)
方法中。
select()
select
方法除了检查就绪通道以外,还有一个很重要的事,就是解决epoll bug
问题。epoll bug
会导致Selector
空轮询,IO线程CPU使用率100%,严重影响系统的安全性和可靠性。
Netty的解决策略是:
- 根据该BUG的特征,首先侦测该BUG是否发生
- 将问题Selector上注册的Channel转移到新建的Selector上
- 老的问题Selector关闭,使用新建的Selector替换
红色框中的代码是对epoll bug
的解决。
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
为false
,即time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) < currentTimeNanos
的意思是int selectedKeys = selector.select(timeoutMillis)
在timeoutMillis
时间到期前就返回了,并且selectedKeys == 0
。这意味着selector进行了一次空轮询,即发生了epoll bug
。Selector
不管有无感兴趣的时间发生,select()
方法总是不阻塞就返回,这会导致CPU的利用率达到100%。
SELECTOR_AUTO_REBUILD_THRESHOLD
默认为512
,也就是当Selector
连续执行了512
空轮询后,Netty就会进行Selector
的重建操作,即rebuildSelector()
操作。
绿色框中的代码主要说明了,当有定时/周期性任务即将到达执行时间(<0.5ms),或者NioEventLoop
的线程收到了新提交的任务等待被处理,或者有定时/周期性任务到达了可处理状态等待被处理,那么则退出select
方法转而去执行任务。这也说明了Netty总是会尽最大努力去保证任务队列中的任务以及定时/周期性任务能得到及时的处理。
1 | long currentTimeNanos = System.nanoTime(); |
该段代码会计算scheduledTaskQueue
中是否有即将要执行的任务,即在0.5ms
内就可执行的scheduledTask
,如果有则退出select
方法转而去执行任务。
delayNanos
方法会返回最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差(若scheduledTaskQueue
为空,也就是没有任务的定时/周期性任务,则返回1秒)。因此selectDeadLineNanos
就表示最近一个待执行的定时/周期性任务的可执行时间。
selectDeadLineNanos - currentTimeNanos
就表示:最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差。如果(selectDeadLineNanos - currentTimeNanos + 0.5ms) / 1ms <= 0
表示selectDeadLineNanos - currentTimeNanos < 0.5ms
,即scheduledTaskQueue
中在0.5ms内有可执行的任务,于是退出select
方法。
1 | if (hasTasks() && wakenUp.compareAndSet(false, true)) { |
在了解上面代码的用意之前,我们先来看下任务提交时的一下细节:
1 | public void execute(Runnable task) { |
当满足下面4个条件时,在有任务提交至EventLoop
后会触发Selector的wakeup()
方法:
- 成员变量
addTaskWakesUp
为false
。这里,在构造NioEventLoop
对象时,通过构造方法传进的参数addTaskWakesUp
正是false
,它会赋值给变量addTaskWakesUp
。因此该条件满足。 - 提交上来的任务不是一个
NonWakeupRunnable
任务
1 |
|
- 执行提交任务的线程不是EventLoop所在线程
- 当
wakenUp
成员变量当前的值为false
1 | protected void wakeup(boolean inEventLoop) { |
只有同时满足上面4个条件的情况下,Selector
的wakeup()
方法才会得以调用。
现在,我们再来说明这段代码块的用意:
1 | if (hasTasks() && wakenUp.compareAndSet(false, true)) { |
如果一个任务在wakenUp
值为true
的情况下被提交上来,那么这个任务将没有机会去调用Selector.wakeup()
。所以我们需要去再次检测任务中是否有待执行的任务,在执行Selector.select
操作之前。如果我们不这么做,那么任务队列中的任务将等待直到Selector.select
操作超时。如果ChannelPipeline
中存在IdleStateHandler
,那么IdleStateHandler
处理器可能会被挂起直到空闲超时。
这段代码在每次要执行Selector.select(long timeout)
之前我们会进行一个判断。如果hasTask()
为true
,即发现当前有任务待处理时,wakenUp.compareAndSet(false, true)
会返回true
,因为在每次调用当前这个select
方法时,都会将wakeUp
标识设置为false
(wakenUp.getAndSet(false)
这句代码)。而此时wakenUp
已经被置位为true
了,在此之后有任务提交至EventLoop
,那么是无法触发Selector.wakeup()
的。所以如果当前有待处理的任务,就不会进行下面的Selector.select(long timeout)
操作,而是退出select
方法,继而去处理任务。
因为如果不这么做的话,如果当前NioEventLoop
线程上已经有任务提交上来,这会使得这些任务可能会需要等待Selector.select(long timeout)
操作超时后才能得以执行。再者,假设我们的ChannelPipeline
中存在一个IdleStateHandler
,那么就可能导致因为Selector.select(long timeout)
操作的timeout
比IdleStateHandler
设置的idle timeout
长,而导致IdleSateHandler
不能对空闲超时做出及时的处理。
同时,我们注意,在执行break
退出select
方法前,会执行selector.selectNow()
,该方法不会阻塞,它会立即返回,同时它会抵消Selector.wakeup()
操作带来的影响。
所以,如果有非NioEventLoop
线程提交一个任务上来,那么这个线程会执行selector.wakeup()
方法,那么NioEventLoop
在if (hasTasks() && wakenUp.compareAndSet(false, true))
的后半个条件会返回false
,程序会执行到int selectedKeys = selector.select(timeoutMillis)
,但是此时select
不会阻塞,而是直接返回,因为前面已经先执行了selector.wakeup()
。
因为提交任务的线程是非NioEventLoop
线程,所以也可能是由NioEventLoop
线程成功执行了if (hasTasks() && wakenUp.compareAndSet(false, true))
,退出了select
方法转而去执行任务队列中的任务。注意,这时提交任务的非NioEventLoop
线程就不会执行selector.wakeup
。
1 | if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { |
同时,除了在每次Selector.select(long timeout)
操作前进行任务队列的检测外,在每次Selector.select(long timeout)
操作后也会检测任务队列是否已经有提交上来的任务待处理,以及是否有定时或周期性任务准备好被执行。如果有,也不会继续epoll bug
的检测,转而去执行待处理的任务。
如果检测到发生了epoll bug
,调用rebuildSelector()
进行Selector
的重构操作。重构操作流程如下:
- 调用
openSelector()
先构造一个新的SelectorTupe
。 - 然后,遍历
oldSelector
中的所有SelectionKey
,依次判断其有效性,如果有效则将其重新注册到新的Selector
上,并将旧的SelectionKey
执行cancel
操作,进行相关的数据清理,以便最后oldSelector
好进行关闭。 - 在将所有的
SelectionKey
数据移至新的Selector
后,将newSelectorTuple
的selector
和unwrappedSelector
赋值给相应的成员属性。 - 最后,调用
oldSelector.close()
关闭旧的Selector
以进行资源的释放。
处理IO事件
判断完select
策略之后,接着处理select
操作得到的已经准备好处理的IO事件,以及处理提交到当前EventLoop
的任务(包括定时和周期任务)。
1 | cancelledKeys = 0; |
- 首先将成员变量
cancelledKeys
和needsToSelectAgain
重置,即cancelledKeys
置为0,needsToSelectAgain
置为false
- 成员变量
ioRatio
的默认值为50
。ioRatio
是在事件循环中用于处理IO操作时间的百分比。默认为50%。也就是说,在事件循环中默认情况下用于处理IO操作的时间和用于处理任务的时间百分比都是50%。 - 调用
processSelectedKeys()
处理Selector.select
操作返回的待处理的IO事件 - 调用
runAllTasks
处理任务队列中的任务以及定时/周期性任务
processSelectedKeys()
processSelectedKeys()
方法依次取出准备好被处理的SelectionKey
,并对相应的待处理IO事件进行处理。
1 | private void processSelectedKeys() { |
selectedKeys.keys[i] = null
操作相当于我们在NIO编程中在处理已经触发的感兴趣的事件时,要将处理过的事件从selectedKeys
集合中移除的步骤。
在将ServerSocketChannel
注册到Selector
的时候,是会将其对应的NioServerSocketChannel
作为附加属性设置到SelectionKey
中。所以这里从k.attachment()
获取到的Object
对象实际就是NioServerSocketChannel
,而NioServerSocketChannel
就是一个AbstractNioChannel
的实现类。
首先检查当前的SelectionKey
是否有效(仅当SelectionKey
从Selector
上注销的时候,该SelectionKey
会为无效状态),如果无效的话:
- 获取该
SelectionKey
所关联的Channel
所注册的EventLoop
,如果获取Channel
的EventLoop
失败,则忽略错误直接返回。因为我们只处理注册到EventLoop
上的Channel
且有权去关闭这个Channel
- 如果获取到的
EventLoop
不是当前的执行线程所绑定的EventLoop
,或者获取到的EventLoop
为null,则直接返回。因为我们只关注依然注册在当前执行线程所绑定的EventLoop
上的Channel
。Channel
可能已经从当前的EventLoop
上注销,并且它的SelectionKey
可能已经被取消了,作为在注销处理流程的一部分。当然如果Channel
仍然健康的被注册在当前的EventLoop
上,则需要去关闭它 - 当能正确获取到
EventLoop
,且该EventLoop
非空并为当前执行线程所绑定的EventLoop
,则说明Channel
依旧注册去当前的EventLoop
上,那么执行关闭操作,来关闭相应的连接,释放相应的资源
如果SelectionKey
是有效的,获取readyOps
。
当SelectionKey.OP_CONNECT
事件就绪时:
- 将
SelectionKey.OP_CONNECT
事件从SelectionKey
所感兴趣的事件中移除,这样Selector
就不会再去监听该连接的SelectionKey.OP_CONNECT
事件了。而SelectionKey.OP_CONNECT
连接事件是只需要处理一次的事件,一旦连接建立完成,就可以进行读、写操作了 - 调用
unsafe.finishConnect()
方法,该方法会调用SocketChannel.finishConnect()
来标识连接的完成,如果我们不调用该方法,就去调用read/write
方法,则会抛出NotYetConnectedException
异常。在此之后,触发ChannelActive
事件,该事件会在该Channel
的ChannelPipeline
中传播。
runAllTasks()
runAllTasks()
方法处理任务队列中的任务以及定时/周期性任务。
将runAllTasks()
方法写在finally
块中,这是为了确保即便处理SelectedKeys
出现了异常,也要确保任务中的队列总能得到执行的机会。
步骤1
获取系统启动到当前的时间内已经过期的定时任务(即,延迟的时间已经满足或者定时执行任务的时间已经满足的任务)放入到taskQueue
中。从taskQueue
中获取任务,如果taskQueue
已经没有任务了,则依次执行tailTasks
队列里的所有任务。
fetchFromScheduledTaskQueue
方法获取过期的定时任务放入到taskQueue
中:
1 | private boolean fetchFromScheduledTaskQueue() { |
- 获取从系统启动到当前系统的时间间隔
- 从
scheduledTaskQueue
中获取在该时间间隔内已经过期的任务(即延迟周期或定时周期已经到时间的任务),将这些任务放入到taskQueue中 - 如果
taskQueue
满了,无法添加新的任务(taskQueue
队列的容量限制最大为2048),则将其重新放回到scheduledTaskQueue
默认情况下,taskQueue
是一个MpscUnboundedArrayQueue
实例
pollScheduledTask
方法根据给定的nanoTime
返回已经准备好被执行的Runnable
。必须使用AbstractScheduledEventExecutor.nanoTime()
方法来检索正确的nanoTime
:
1 | protected final Runnable pollScheduledTask(long nanoTime) { |
scheduledTaskQueue
是一个PriorityQueue
实例,它根据任务的deadlineNanos
属性的升序来维护一个任务队列,每次peek
能返回最先该被执行的定时任务。deadlineNanos
表示系统启动到该任务应该被执行的时间点的时间差。如果scheduledTask.deadlineNanos() <= nanoTime
则说明该任务的执行时间已经到了,因此将其从scheduledTaskQueue
移除,然后通过该方法返回后放入到taskQueue中等待被执行。
因此,可知每次执行taskQueue
前,taskQueue
中除了有用户自定义提交的任务,系统逻辑流程提交至该NioEventLoop
的任务,还有用户自定义或者系统设置的已经达到运行时间点的定时/周期性任务会一并放入到taskQueue
中,而taskQueue
的初始化容量为1024,最大长度限制为2048,也就是一次事件循环最多只能处理2048个任务。
afterRunningAllTasks()
方法会依次执行tailQueue
中的任务,tailTasks
中是用户自定义的一些列在本次事件循环遍历结束后会执行的任务,你可以通过类似以下的方式来添加tailTask
:
1 | ((NioEventLoop)ctx.channel().eventLoop()).executeAfterEventLoopIteration(() -> { |
步骤2
通过系统启动到当前的时间差
+可用于执行任务的时间
=系统启动到可用于执行任务时间的时间段(deadline)
。从taskQueue
中依次取出任务,如果task
为null
则说明已经没有待执行的任务,那么退出for循环。否则,同步执行task
,每执行64个任务后,就计算系统启动到当前的时间
是否大于等于deadline
,如果是则说明已经超过了分配给任务执行的时间,此时就不会继续执行taskQueue
中的任务了。
1 | protected static void safeExecute(Runnable task) { |
safeExecute()
方法调用task
的run
方法来同步执行任务:
1 | if ((runTasks & 0x3F) == 0) { |
63的16进制表示为0x3f
(二进制表示为0011 1111
),当已经执行的任务数量小于64时,其与0x3f
的位与操作会大于0,当其等于64(64的16进制表示为0x40
,二进制表示为0100 0000
)时,runTasks & 0x3f
的结果为0。所以是每执行64个任务后就进行一次时间的判断,以保证执行任务队列的任务不会严重的超出我们所设定的时间。
步骤3
依次执行tailTasks
队列里的所有任务。赋值全部属性lastExecutionTime
为最后一个任务执行完后的时间。
总结
NioEventLoop
创建用户代码创建
NioEventLoopGroup
的时候NioEventLoop
被创建,默认不传参数的时候会创建两倍的CPU核数的NioEventLoop
。每个NioEventLoopGroup
都会有一个chooser
进行现场的分配,chooser
也会根据NioEventLoop
的个数做一定程度的优化。NioEventLoop
的创建的时候会创建一个Selector
和一个定时任务队列。在创建Selector
的时候netty会通过反射的方式用数组实现来替换掉Selector
里面的两个hashset
数据结构。NioEventLoop
启动NioEventLoop
在首次调用execute
方法的时候启动线程,这个线程是一个FastThreadLocalThread
。启动线程之后,netty会将启动完成的线程保存到成员变量thread
中,这样就能在执行逻辑过程中判断当前线程是否在NioEventLoop
中。NioEventLoop
执行逻辑NioEventLoop
执行逻辑在run
方法里。主要包括三个过程:- 检测IO事件。
- 处理IO事件。
- 执行任务队列。