Netty——NioEventLoop

本篇文章我们来分析NioEventLoop的功能。

NioEventLoop是一个单线程的线程池,由EventLoopGroup维护。以NioEventLoopGroup为例,在它的父类MultithreadEventExecutorGroup中持有一个NioEventLoop的数组,执行任务时可以选择一个NioEventLoop,在NioEventLoop的线程中来异步执行任务。

继承关系如下所示:

NioEventLoop

新建

MultithreadEventExecutorGroup

NioEventLoopGroup为例,在其父类MultithreadEventExecutorGroup的构造函数中调用newChild方法新建NioEventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

NioEventLoopMultithreadEventExecutorGroup的构造函数中被创建,如果不指定NioEventLoop的数量,默认情况下会创建两倍的CPU核数的NioEventLoop。下面是MultithreadEventExecutorGroup构造函数的执行流程:

  1. 首先创建ThreadPerTaskExecutor,它是线程执行器,负责创建NioEventLoop对应的底层线程
  2. 通过for循环调用newChild方法创建NioEventLoop的对象数组
  3. 调用chooserFactory.newChooser创建线程选择器,线程选择器的作用是为每个新连接分配NioEventLoop线程

ThreadPerTaskExecutor

ThreadPerTaskExecutor执行的作用是每次执行任务的时候都会创建一个线程实体。

NioEventLoop线程命名规则为nioEventLoop-{poolId}-{xx}{poolId}表示线程池id,{xx}表示NioEventLoopGroup下的第几个NioEventLoop

newChild()

newChild()方法如下:

1
2
3
4
5
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

调用NioEventLoop的构造函数新建NioEventLoop

newChooser()

chooser的作用是为了给新连接绑定对应的NioEventLoop

判断线程池中线程的数量,如果是2的幂则创建PowerOfTwoEventExecutorChooser,否则创建GenericEventExecutorChooser

NioEventLoop

NioEventLoop的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
  • parent:前面我们新建的NioEventLoopGroup
  • executorMultithreadEventExecutorGroup构造函数中新建的ThreadPerTaskExecutor
  • selectorProviderNioEventLoopGroup构造函数中调用SelectorProvider.provider()返回的SelectorProvider
  • strategyNioEventLoopGroup构造函数中给定的DefaultSelectStrategyFactory
  • rejectedExecutionHandlerNioEventLoopGroup构造函数中给定的RejectedExecutionHandler。该接口有一个唯一的接口方法rejected,当尝试去添加一个任务到SingleThreadEventExecutor中,但是由于容量的限制添加失败了,那么此时该方法就会被调用。

final SelectorTuple selectorTuple = openSelector();开启Selector,构造SelectorTuple实例,SelectorTuple是一个封装了原始selector对象和封装后selector对象(即SelectedSelectionKeySetSelector对象)的类:

1
2
3
4
5
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
...
}

这里,成员变量unwrappedSelector就是通过SelectorProvider.provider().openSelector()开启的Selector;而成员变量selector则是一个SelectedSelectionKeySetSelector对象。

SelectedSelectionKeySetSelector中持有unwrappedSelector,并作为unwrappedSelector的代理类,提供Selector所需要的方法,而Selector相关的操作底层实际都是由unwrappedSelector来完成的,只是在操作中增加了对selectionKeys进行相应的设置。

SelectedSelectionKeySetSelector中除了持有unwrappedSelector实例外还持有一个SelectedSelectionKeySet对象。该对象是Netty提供的一个代替SelectorselectedKeys对象。openSelector()方法中通过反射机制将程序构建的SelectedSelectionKeySet对象给设置到了Selector内部的selectedKeyspublicSelectedKeys属性。这使Selector中所有对selectedKeyspublicSelectedKeys的操作实际上就是对SelectedSelectionKeySet的操作。

SelectedSelectionKeySet类主要通过成员变量SelectionKey[]数组来维护被选择的SelectionKeys,并将扩容操作简单地简化为newCapacity为oldCapacity的2倍来实现。同时不再支持removecontainsiterator方法。并添加了reset方法来对SelectionKey[]数组进行重置。

SelectedSelectionKeySetSelector在每次select操作的时候,都会先将selectedKeys进行清除(reset)操作。

启动

NioEventLoop新建完成之后,其线程并没有开始执行,只有当有任务被添加到该线程池中,其中的线程才开始执行。

添加任务的操作在其父类SingleThreadEventExecutorexecute方法中完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

步骤如下:

  1. 将任务task加入任务队列taskQueue
  2. 调用inEventLoop()方法判断当前线程是否是NioEventLoop的线程
  3. 如果inEventLoop()方法返回false,表示当前event loop并没有启动,此时调用startThread()方法创建线程。创建线程的动作由前面创建的ThreadPerTaskExecutor线程执行器调用其execute()方法完成。execute()方法会创建一个FastThreadLocalThread,然后调用start()方法进行启动,启动的时候会执行Runnable里面的run()方法。

    run()方法中主要有以下两步:

    1. 调用thread = Thread.currentThread()保存当前的线程,这个线程其实就是线程执行器创建的FastThreadLocalThread
    2. 调用NioEventLoop.run()方法,run()方法是驱动netty运转的核心方法。
  4. 满足条件时,触发wakeup()方法

执行

NioEventLoop启动之后,就会在其run()方法中循环执行,run()方法是NioEventLoop的核心。

run()方法有一个无限的for循环,循环里主要有三件事:

  1. 调用select()方法轮询注册到Selector上的io事件
  2. 调用processSelectedKeys()方法处理io事件
  3. 调用runAllTasks()方法异步处理外部线程添加到taskQueue中的任务

NioEventLoop.run

run()方法的大循环主要完成下面几件事:

  1. 根据当前NioEventLoop中是否有待完成的任务得出select策略,进行相应的select操作
  2. 处理select操作得到的已经准备好处理的IO事件,以及处理提交到当前EventLoop的任务(包括定时和周期任务)
  3. 如果NioEventLoop所在线程执行了关闭操作,则执行相关的关闭操作处理。

判断select策略

调用DefaultSelectStrategycalculateStrategy(IntSupplier selectSupplier, boolean hasTasks)方法判断select策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// io.netty.channel.DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};

// io.netty.channel.nio.NioEventLoop#selectNow
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}

// io.netty.channel.nio.SelectedSelectionKeySetSelector#selectNow
public int selectNow() throws IOException {
selectionKeys.reset();
return delegate.selectNow();
}

// io.netty.channel.nio.SelectedSelectionKeySetSelector#wakeup
public Selector wakeup() {
return delegate.wakeup();
}

selectSupplier是经过封装的selector(即SelectedSelectionKeySetSelector),hasTasks是调用hasTask()方法的返回值,hasTask()用于判断taskQueuetailTasks是否有任务。

calculateStrategy方法的选择策略是:

  • 如果当前的EventLoop中有待处理的任务,那么会调用selectSupplier.get()方法,最终会调用Selector.selectNow()方法,返回就绪通道的数量,并清空selectionKeys
  • 如果当前的EventLoop没有待处理的任务,那么返回SelectStrategy.SELECT-1)。

如果calculateStrategy方法返回值大于0,则说明有就绪的IO事件待处理,跳出switch代码块,进入流程2。否则如果返回的是SelectStrategy.SELECT,执行select(wakenUp.getAndSet(false)):以CAS的方式获得wakenUp当前的标识,并将wakenUp设置为false。将wakenUp作为参数传入select(boolean oldWakenUp)方法中。

select()

select方法除了检查就绪通道以外,还有一个很重要的事,就是解决epoll bug问题。epoll bug会导致Selector空轮询,IO线程CPU使用率100%,严重影响系统的安全性和可靠性。

Netty的解决策略是:

  1. 根据该BUG的特征,首先侦测该BUG是否发生
  2. 将问题Selector上注册的Channel转移到新建的Selector上
  3. 老的问题Selector关闭,使用新建的Selector替换

NioEventLoop.select

红色框中的代码是对epoll bug的解决。

time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanosfalse,即time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) < currentTimeNanos的意思是int selectedKeys = selector.select(timeoutMillis)timeoutMillis时间到期前就返回了,并且selectedKeys == 0。这意味着selector进行了一次空轮询,即发生了epoll bugSelector不管有无感兴趣的时间发生,select()方法总是不阻塞就返回,这会导致CPU的利用率达到100%。

SELECTOR_AUTO_REBUILD_THRESHOLD默认为512,也就是当Selector连续执行了512空轮询后,Netty就会进行Selector的重建操作,即rebuildSelector()操作。

绿色框中的代码主要说明了,当有定时/周期性任务即将到达执行时间(<0.5ms),或者NioEventLoop的线程收到了新提交的任务等待被处理,或者有定时/周期性任务到达了可处理状态等待被处理,那么则退出select方法转而去执行任务。这也说明了Netty总是会尽最大努力去保证任务队列中的任务以及定时/周期性任务能得到及时的处理。

1
2
3
4
5
6
7
8
9
10
11
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

该段代码会计算scheduledTaskQueue中是否有即将要执行的任务,即在0.5ms内就可执行的scheduledTask,如果有则退出select方法转而去执行任务。

delayNanos方法会返回最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差(若scheduledTaskQueue为空,也就是没有任务的定时/周期性任务,则返回1秒)。因此selectDeadLineNanos就表示最近一个待执行的定时/周期性任务的可执行时间。

selectDeadLineNanos - currentTimeNanos就表示:最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差。如果(selectDeadLineNanos - currentTimeNanos + 0.5ms) / 1ms <= 0表示selectDeadLineNanos - currentTimeNanos < 0.5ms,即scheduledTaskQueue中在0.5ms内有可执行的任务,于是退出select方法。

1
2
3
4
5
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

在了解上面代码的用意之前,我们先来看下任务提交时的一下细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

当满足下面4个条件时,在有任务提交至EventLoop后会触发Selector的wakeup()方法:

  1. 成员变量addTaskWakesUpfalse。这里,在构造NioEventLoop对象时,通过构造方法传进的参数addTaskWakesUp正是false,它会赋值给变量addTaskWakesUp。因此该条件满足。
  2. 提交上来的任务不是一个NonWakeupRunnable任务
1
2
3
4
@Override
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
}
  1. 执行提交任务的线程不是EventLoop所在线程
  2. wakenUp成员变量当前的值为false
1
2
3
4
5
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}

只有同时满足上面4个条件的情况下,Selectorwakeup()方法才会得以调用。

现在,我们再来说明这段代码块的用意:

1
2
3
4
5
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

如果一个任务在wakenUp值为true的情况下被提交上来,那么这个任务将没有机会去调用Selector.wakeup()。所以我们需要去再次检测任务中是否有待执行的任务,在执行Selector.select操作之前。如果我们不这么做,那么任务队列中的任务将等待直到Selector.select操作超时。如果ChannelPipeline中存在IdleStateHandler,那么IdleStateHandler处理器可能会被挂起直到空闲超时。

这段代码在每次要执行Selector.select(long timeout)之前我们会进行一个判断。如果hasTask()true,即发现当前有任务待处理时,wakenUp.compareAndSet(false, true)会返回true,因为在每次调用当前这个select方法时,都会将wakeUp标识设置为falsewakenUp.getAndSet(false)这句代码)。而此时wakenUp已经被置位为true了,在此之后有任务提交至EventLoop,那么是无法触发Selector.wakeup()的。所以如果当前有待处理的任务,就不会进行下面的Selector.select(long timeout)操作,而是退出select方法,继而去处理任务。

因为如果不这么做的话,如果当前NioEventLoop线程上已经有任务提交上来,这会使得这些任务可能会需要等待Selector.select(long timeout)操作超时后才能得以执行。再者,假设我们的ChannelPipeline中存在一个IdleStateHandler,那么就可能导致因为Selector.select(long timeout)操作的timeoutIdleStateHandler设置的idle timeout长,而导致IdleSateHandler不能对空闲超时做出及时的处理。

同时,我们注意,在执行break退出select方法前,会执行selector.selectNow(),该方法不会阻塞,它会立即返回,同时它会抵消Selector.wakeup()操作带来的影响。

所以,如果有非NioEventLoop线程提交一个任务上来,那么这个线程会执行selector.wakeup()方法,那么NioEventLoopif (hasTasks() && wakenUp.compareAndSet(false, true))的后半个条件会返回false,程序会执行到int selectedKeys = selector.select(timeoutMillis),但是此时select不会阻塞,而是直接返回,因为前面已经先执行了selector.wakeup()

因为提交任务的线程是非NioEventLoop线程,所以也可能是由NioEventLoop线程成功执行了if (hasTasks() && wakenUp.compareAndSet(false, true)),退出了select方法转而去执行任务队列中的任务。注意,这时提交任务的非NioEventLoop线程就不会执行selector.wakeup

1
2
3
4
5
6
7
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}

同时,除了在每次Selector.select(long timeout)操作前进行任务队列的检测外,在每次Selector.select(long timeout)操作后也会检测任务队列是否已经有提交上来的任务待处理,以及是否有定时或周期性任务准备好被执行。如果有,也不会继续epoll bug的检测,转而去执行待处理的任务。

如果检测到发生了epoll bug,调用rebuildSelector()进行Selector的重构操作。重构操作流程如下:

  1. 调用openSelector()先构造一个新的SelectorTupe
  2. 然后,遍历oldSelector中的所有SelectionKey,依次判断其有效性,如果有效则将其重新注册到新的Selector上,并将旧的SelectionKey执行cancel操作,进行相关的数据清理,以便最后oldSelector好进行关闭。
  3. 在将所有的SelectionKey数据移至新的Selector后,将newSelectorTupleselectorunwrappedSelector赋值给相应的成员属性。
  4. 最后,调用oldSelector.close()关闭旧的Selector以进行资源的释放。

处理IO事件

判断完select策略之后,接着处理select操作得到的已经准备好处理的IO事件,以及处理提交到当前EventLoop的任务(包括定时和周期任务)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
  1. 首先将成员变量cancelledKeysneedsToSelectAgain重置,即cancelledKeys置为0,needsToSelectAgain置为false
  2. 成员变量ioRatio的默认值为50ioRatio是在事件循环中用于处理IO操作时间的百分比。默认为50%。也就是说,在事件循环中默认情况下用于处理IO操作的时间和用于处理任务的时间百分比都是50%。
  3. 调用processSelectedKeys()处理Selector.select操作返回的待处理的IO事件
  4. 调用runAllTasks处理任务队列中的任务以及定时/周期性任务

processSelectedKeys()

processSelectedKeys()方法依次取出准备好被处理的SelectionKey,并对相应的待处理IO事件进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

selectedKeys.keys[i] = null操作相当于我们在NIO编程中在处理已经触发的感兴趣的事件时,要将处理过的事件从selectedKeys集合中移除的步骤。

在将ServerSocketChannel注册到Selector的时候,是会将其对应的NioServerSocketChannel作为附加属性设置到SelectionKey中。所以这里从k.attachment()获取到的Object对象实际就是NioServerSocketChannel,而NioServerSocketChannel就是一个AbstractNioChannel的实现类。

NioEventLoop.processSelectedKey

首先检查当前的SelectionKey是否有效(仅当SelectionKeySelector上注销的时候,该SelectionKey会为无效状态),如果无效的话:

  1. 获取该SelectionKey所关联的Channel所注册的EventLoop,如果获取ChannelEventLoop失败,则忽略错误直接返回。因为我们只处理注册到EventLoop上的Channel且有权去关闭这个Channel
  2. 如果获取到的EventLoop不是当前的执行线程所绑定的EventLoop,或者获取到的EventLoop为null,则直接返回。因为我们只关注依然注册在当前执行线程所绑定的EventLoop上的ChannelChannel可能已经从当前的EventLoop上注销,并且它的SelectionKey可能已经被取消了,作为在注销处理流程的一部分。当然如果Channel仍然健康的被注册在当前的EventLoop上,则需要去关闭它
  3. 当能正确获取到EventLoop,且该EventLoop非空并为当前执行线程所绑定的EventLoop,则说明Channel依旧注册去当前的EventLoop上,那么执行关闭操作,来关闭相应的连接,释放相应的资源

如果SelectionKey是有效的,获取readyOps

SelectionKey.OP_CONNECT事件就绪时:

  1. SelectionKey.OP_CONNECT事件从SelectionKey所感兴趣的事件中移除,这样Selector就不会再去监听该连接的SelectionKey.OP_CONNECT事件了。而SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,就可以进行读、写操作了
  2. 调用unsafe.finishConnect()方法,该方法会调用SocketChannel.finishConnect()来标识连接的完成,如果我们不调用该方法,就去调用read/write方法,则会抛出NotYetConnectedException异常。在此之后,触发ChannelActive事件,该事件会在该ChannelChannelPipeline中传播。

runAllTasks()

runAllTasks()方法处理任务队列中的任务以及定时/周期性任务。

runAllTasks()方法写在finally块中,这是为了确保即便处理SelectedKeys出现了异常,也要确保任务中的队列总能得到执行的机会。

NioEventLoop.runAllTasks

步骤1

获取系统启动到当前的时间内已经过期的定时任务(即,延迟的时间已经满足或者定时执行任务的时间已经满足的任务)放入到taskQueue中。从taskQueue中获取任务,如果taskQueue已经没有任务了,则依次执行tailTasks队列里的所有任务。

fetchFromScheduledTaskQueue方法获取过期的定时任务放入到taskQueue中:

1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
  1. 获取从系统启动到当前系统的时间间隔
  2. scheduledTaskQueue中获取在该时间间隔内已经过期的任务(即延迟周期或定时周期已经到时间的任务),将这些任务放入到taskQueue中
  3. 如果taskQueue满了,无法添加新的任务(taskQueue队列的容量限制最大为2048),则将其重新放回到scheduledTaskQueue

默认情况下,taskQueue是一个MpscUnboundedArrayQueue实例

pollScheduledTask方法根据给定的nanoTime返回已经准备好被执行的Runnable。必须使用AbstractScheduledEventExecutor.nanoTime()方法来检索正确的nanoTime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}

if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}

scheduledTaskQueue是一个PriorityQueue实例,它根据任务的deadlineNanos属性的升序来维护一个任务队列,每次peek能返回最先该被执行的定时任务。deadlineNanos表示系统启动到该任务应该被执行的时间点的时间差。如果scheduledTask.deadlineNanos() <= nanoTime则说明该任务的执行时间已经到了,因此将其从scheduledTaskQueue移除,然后通过该方法返回后放入到taskQueue中等待被执行。

因此,可知每次执行taskQueue前,taskQueue中除了有用户自定义提交的任务,系统逻辑流程提交至该NioEventLoop的任务,还有用户自定义或者系统设置的已经达到运行时间点的定时/周期性任务会一并放入到taskQueue中,而taskQueue的初始化容量为1024,最大长度限制为2048,也就是一次事件循环最多只能处理2048个任务。

afterRunningAllTasks()方法会依次执行tailQueue中的任务,tailTasks中是用户自定义的一些列在本次事件循环遍历结束后会执行的任务,你可以通过类似以下的方式来添加tailTask

1
2
3
((NioEventLoop)ctx.channel().eventLoop()).executeAfterEventLoopIteration(() -> {
// add some task to execute after eventLoop iteration
});

步骤2

通过系统启动到当前的时间差+可用于执行任务的时间=系统启动到可用于执行任务时间的时间段(deadline)。从taskQueue中依次取出任务,如果tasknull则说明已经没有待执行的任务,那么退出for循环。否则,同步执行task,每执行64个任务后,就计算系统启动到当前的时间是否大于等于deadline,如果是则说明已经超过了分配给任务执行的时间,此时就不会继续执行taskQueue中的任务了。

1
2
3
4
5
6
7
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}

safeExecute()方法调用taskrun方法来同步执行任务:

1
2
3
4
5
6
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

63的16进制表示为0x3f(二进制表示为0011 1111),当已经执行的任务数量小于64时,其与0x3f的位与操作会大于0,当其等于64(64的16进制表示为0x40,二进制表示为0100 0000)时,runTasks & 0x3f的结果为0。所以是每执行64个任务后就进行一次时间的判断,以保证执行任务队列的任务不会严重的超出我们所设定的时间。

步骤3

依次执行tailTasks队列里的所有任务。赋值全部属性lastExecutionTime为最后一个任务执行完后的时间。

总结

  • NioEventLoop创建

    用户代码创建NioEventLoopGroup的时候NioEventLoop被创建,默认不传参数的时候会创建两倍的CPU核数的NioEventLoop。每个NioEventLoopGroup都会有一个chooser进行现场的分配,chooser也会根据NioEventLoop的个数做一定程度的优化。NioEventLoop的创建的时候会创建一个Selector和一个定时任务队列。在创建Selector的时候netty会通过反射的方式用数组实现来替换掉Selector里面的两个hashset数据结构。

  • NioEventLoop启动

    NioEventLoop在首次调用execute方法的时候启动线程,这个线程是一个FastThreadLocalThread。启动线程之后,netty会将启动完成的线程保存到成员变量thread中,这样就能在执行逻辑过程中判断当前线程是否在NioEventLoop中。

  • NioEventLoop执行逻辑

    NioEventLoop执行逻辑在run方法里。主要包括三个过程:

    1. 检测IO事件。
    2. 处理IO事件。
    3. 执行任务队列。

https://www.jianshu.com/p/3f6e997efd27