定时任务与HashedWheelTimer

Redisson分布式锁的实现一文中,我们说到Redisson会调用scheduleExpirationRenewal方法创建一个定时任务来刷新锁的过期时间,防止任务执行完毕前锁就过期释放了。在那篇文章中,我们没有详述这个定时任务的原理,本文中我们来探究一番定时任务的原理。

定时任务的本质

一个Timer本质上是这样的一个数据结构:deadline越近的任务拥有越高优先级,提供以下3中基本操作:

  1. schedule新增
  2. cancel删除
  3. expire执行到期的任务
  4. updateDeadline更行到期时间(可选)

expire通常有两种工作方式:

  1. 轮询

    每隔一个时间片就去查找哪些任务已经到期

  2. 睡眠/唤醒

    1. 不停地查找deadline最近的任务,如到期则执行;否则sleep直到其到期
    2. 在sleep期间,如果有任务被cancelschedule,则deadline最近的任务有可能改变,线程会被唤醒并重新进行1的逻辑

具体实现的数据结构可以有很多选择:(假设任务持有自己在总体任务集合中对应的节点,cancel时不需要查找的过程)

  1. 有序链表

    • schedule:O(n)
    • cancel:O(1) //双向链表的节点删除
    • expire:O(1) //不停地查看第一个就可以了
  2. 堆heap

    • schedule:O(log2N) //调整heap
    • cancel:O(log2N) //调整heap
    • expire:O(1)

HashedWheelTimer

Redisson使用的定时任务是Netty提供的HashedWheelTimer

HashedWheelTime

Hash Wheel Timer是一个环形结构,可以想象成时钟,分为很多格子,一个格子代表一段时间(越短Timer精度越高),并用一个List保存在该格子上到期的所有任务。同时一个指针随着时间流逝一格一格转动,并执行对应List中所有到期的任务。任务通过取模决定应该放入哪个格子。

以上图为例,假设一个格子是1秒,则整个wheel能表示的时间段为8s,假设当前指针指向2,此时需要调度一个3s后执行的任务,显然应该加入到(2+3=5)的方格中,指针再走3次就可以执行了;如果任务要在10s后执行,应该等指针走完一个round零2格再执行,因此应放入4,同时将round(1)保存到任务中。检查到期任务应当只执行round为0的,格子上其他任务的round应减1.

  • schedule:O(1)
  • cancel:O(1)
  • expire:最坏情况O(n),平均O(1) //显然格子越多每个格子对应的List就越短,越接近O(1);最坏情况下所有的任务都在一个格子中,O(n)

使用

HashedWheelTimer的使用如下所示:

1
2
3
4
5
6
7
8
@Test
public void test01() throws IOException {
HashedWheelTimer timer = new HashedWheelTimer(); //使用默认参数
logger.info("start");
timer.newTimeout(timeout -> logger.info("running"), 3, TimeUnit.SECONDS); //延时3秒后开始执行

System.in.read();
}

执行结果如下:

1
2
14:16:50.743 [main] INFO love.wangqi.timer.TimerTest - start
14:16:53.888 [pool-1-thread-1] INFO love.wangqi.timer.TimerTest - running

从日志的打印时间,我们看到程序运行的大概3秒钟之后,我们定义的TimerTask开始执行。

我们知道,HashedWheelTimer以一个固定的时间间隔前进一个格子,并且激活对应格子里面的任务,但是这里有个缺陷,就是任务是串行的,也就是所有的任务是依次执行,如果调度的任务耗时比较长的话,容易出现调度超时的情况,因此很可能产生任务堆集的情况出现。

以如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void test02() throws IOException {
HashedWheelTimer timer = new HashedWheelTimer();
logger.info("start");
timer.newTimeout(timeout -> {
logger.info("running");
Thread.sleep(2000);
logger.info("end");
}, 1, TimeUnit.SECONDS);

timer.newTimeout(timeout -> {
logger.info("running");
Thread.sleep(2000);
logger.info("end");
}, 1, TimeUnit.SECONDS);

System.in.read();
}

执行结果如下:

1
2
3
4
5
14:24:02.872 [main] INFO love.wangqi.timer.TimerTest - start
14:24:04.020 [pool-1-thread-1] INFO love.wangqi.timer.TimerTest - running
14:24:06.023 [pool-1-thread-1] INFO love.wangqi.timer.TimerTest - end
14:24:06.023 [pool-1-thread-1] INFO love.wangqi.timer.TimerTest - running
14:24:08.025 [pool-1-thread-1] INFO love.wangqi.timer.TimerTest - end

我们的本意其实是程序执行后延时1秒,然后两个任务同时开始执行。但实际的执行结果显示task2task1阻塞了,直到task1执行结束task2才开始执行。这就导致了串行化的任务调度延时,因此,应该避免耗时比较长的任务在同一个线程中执行。

源码解读

关键的成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 指针转动以及任务执行的线程
private final Worker worker = new Worker();
private final Thread workerThread;

public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
// 工作线程的状态
private volatile int workerState;

// 每个tick所需的时间
private final long tickDuration;

// 时间轮的结构
private final HashedWheelBucket[] wheel;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public HashedWheelTimer(
ThreadFactory threadFactory, // 用于创建worker线程的线程工厂
long tickDuration, // tick的时长,也就是指针多久转一格
TimeUnit unit, // tickDuration的时间单位
int ticksPerWheel, // 一圈有几个格子
boolean leakDetection, // 是否开启内存泄露检测
long maxPendingTimeouts // 任务的超时等待时间
) {
// 一些参数校验
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// 创建时间轮基本的数据结构,一个数组。长度为不小于ticksPerWheel的最小2的n次方
wheel = createWheel(ticksPerWheel);
// 掩码,用于计算某个任务落在哪个格子上。
// 因为wheel.length为2的n次方,mask = 2^n - 1低位将全部是1,所以任务的deadline&mast == 任务的deadline&wheel.length
mask = wheel.length - 1;

// 转换成纳秒
this.tickDuration = unit.toNanos(tickDuration);

// 检验是否存在溢出。即指针转动的时间间隔不能太长而导致tickDuration*wheel.length>Long.MAX_VALUE
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
// 创建worker线程
workerThread = threadFactory.newThread(worker);
// 这里默认是启动内存泄露检测:当HashedWheelTimer实例超过当前CPU可用核数*4的时候,将发出警告
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
// 最大的任务数量。当HashedWheelTimer实例上的任务超出这个数量时会抛出错误
this.maxPendingTimeouts = maxPendingTimeouts;

// 如果HashedWheelTimer实例数超过了64,会报警。
// 因为时间轮是一个非常消耗资源的结构所以实例数目不能太高
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}

添加定时任务

在前面的实例中,我们看到使用newTimeout方法来添加定时任务。newTimeout方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 参数校验
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 增加任务数量
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 如果设置了最大任务数,且任务数量超过最大任务数,抛出异常
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果没有启动时间轮,则启动
start();

// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
// 计算任务的deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 这里定时任务不是直接加到对应的格子中,而是先加入到一个队列里,然后等到下一个tick的时候,会从队列里取出最多100000个任务加入到指定的格子中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}

这里使用的Queue不是普通Java自带的Queue的实现,而是使用JCTool——一个高性能的并发Queue实现包。

启动时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 这个方法不需要用户显式地调用。因为在添加定时任务(newTimeout方法)的时候会自动调用此方法
// 如果没有定时任务,时间轮就不需要启动
public void start() {
// 判断当前时间轮的状态。
// 如果是初始化状态,则采用CAS的方法将状态修改为开始状态,然后启动时间轮线程。
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

// Wait until the startTime is initialized by the worker.
// 等待woker线程初始化时间轮的启动时间
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}

AtomicIntegerFieldUpdater是JUC里面的类,原理是利用安全的反射进行原子操作,来获取实例的本身的属性。有比AtomicInteger更好的性能和更低的内存占用。

停止时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public Set<Timeout> stop() {
// worker线程不能停止时间轮,也就是加入的定时任务不能调用这个方法。不然会有恶意的定时任务调用这个方法而造成大量定时任务失效
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}

// CAS的方式尝试将当前状态替换为WORKER_STATE_SHUTDOWN。
// 如果替换失败,则当前状态只能是WORKER_STATE_STARTED或者WORKER_STATE_SHUTDOWN。直接将当前状态设置为WORKER_STATE_SHUTDOWN
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}

return Collections.emptySet();
}

try {
// 中断worker线程,尝试把正在进行任务的线程中断掉。
// 如果某些任务正在执行则会抛出InterruptedException异常,并且任务会尝试立即中断
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
// 如果中断掉了所有工作的线程,那么当前时间轮调度的线程会在随后关闭
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 返回未处理的任务
return worker.unprocessedTimeouts();
}

HashedWheelTimeout

newTimeout方法中会将我们的TimerTask包装成一个HashedWheelTimeout对象,然后添加到Queue<HashedWheelTimeout> timeouts队列中。

HashedWheelTimeout是一个定时任务的内部包装类,双向链表结构。保存定时任务到期执行的任务、deadline、round等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
private static final class HashedWheelTimeout implements Timeout {
// 定义定时任务的3个状态:初始化、取消、过期
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
// CAS方式更新定时任务状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

// 时间轮引用
private final HashedWheelTimer timer;
// 到期需要执行的任务
private final TimerTask task;
private final long deadline;

@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;

// 离任务执行的轮数,当将任务加入到格子中时计算该值,每过一轮,该值减1
long remainingRounds;

// 双向链表结构,由于只有worker线程会访问,这里不需要synchronization/volatile
HashedWheelTimeout next;
HashedWheelTimeout prev;

// 定时任务所在的格子
HashedWheelBucket bucket;

HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}

@Override
public Timer timer() {
return timer;
}

@Override
public TimerTask task() {
return task;
}

@Override
public boolean cancel() {
// 这里只是修改状态为ST_CANCELLED,会在下次tick时从格子中移除
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// 加入到时间轮的待取消队列,并在每次tick的时候,从相应格子中移除
timer.cancelledTimeouts.add(this);
return true;
}

// 从格子中移除自身
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}

public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}

public int state() {
return state;
}

@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}

@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}

// 过期并执行任务
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}

try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
}

HashedWheelBucket

HashedWheelBucket用来存放HashedWheelTimeout,结构类似于LinkedList。提供了expireTimeouts方法来执行格子中的过期任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
private static final class HashedWheelBucket {
// 指向格子中任务的首尾
private HashedWheelTimeout head;
private HashedWheelTimeout tail;

// 添加到链表的末尾
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}

// 执行格子中的过期任务,tick到该格子的时候,worker线程会调用这个方法
// 根据deadline和remainingRounds判断任务是否过期
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;

// 遍历格子中的所有定时任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) { // 定时任务到期
next = remove(timeout); // 从链表中移除到期任务
if (timeout.deadline <= deadline) {
timeout.expire(); // 执行到期的任务
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) { // 如果任务被取消,从链表中移除
next = remove(timeout);
} else {
timeout.remainingRounds --; // 没有到期,轮数-1
}
timeout = next;
}
}

// 从链表中移除到期任务
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}

if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}

// 清空这个格子,将没有过期或者是已经取消的任务保存在set中
public void clearTimeouts(Set<Timeout> set) {
for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}

// 从链表中取得最头上的任务
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}

// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}

Worker

前面我们看到时间轮启动时启动Worker线程。Worker是时间轮的核心线程类。tick的转动,过期任务的处理都是在这个线程中处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

private long tick;

@Override
public void run() {
// 初始化startTime,所有任务的deadline都是相对于这个时间点的
startTime = System.nanoTime();
// 由于System.nanoTime()可能返回0,甚至负数。0是一个标示符,用来判断startTime是否被初始化,所以当startTime=0的时候,重新赋值为1
if (startTime == 0) {
startTime = 1;
}

// 唤醒阻塞在start()的线程
startTimeInitialized.countDown();

// 只要时间轮的状态为WORKER_STATE_STARTED,就循环地转动tick,判断响应格子中的到期任务
do {
// waitForNextTick方法主要是计算下次tick的时间,然后sleep到下次tick
// 返回值是System.nanoTime() - startTime,也就是Timer启动后到这次tick所过去的时间
final long deadline = waitForNextTick();
if (deadline > 0) { // 溢出或者被中断的时候会返回负数,所以小于等于0不管
// 获取tick对应的格子索引
int idx = (int) (tick & mask);
// 移除被取消的任务
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 从任务队列中取出任务加入到对应的格子中
transferTimeoutsToBuckets();
// 执行格子中的任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// 时间轮停止了。
// 清除所有格子中的任务,并加入到未处理任务列表,以供stop()方法返回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将还没有加入到格子中的待处理定时任务队列中的任务取出,如果是未取消任务,则加入到未处理任务列表,以供stop()方法返回
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理取消的任务
processCancelledTasks();
}

// 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中
private void transferTimeoutsToBuckets() {
// 每次tick只处理10w个任务,以免阻塞worker线程
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
// 如果没有任务了,直接跳出循环
if (timeout == null) {
// all processed
break;
}
// 还没有放入到格子中就取消了,直接略过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}

// 计算任务需要经过多少个tick
long calculated = timeout.deadline / tickDuration;
// 计算任务的轮数
timeout.remainingRounds = (calculated - tick) / wheel.length;

// 如果任务在timeouts队列中放久了,以至于已经过了执行时间,这个时候就使用当前tick,也就是放到当前bucket中。此方法调用完后就会被执行。
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);

// 将任务加入到相应的格子中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}

// 将取消的任务取出,并从格子中移除
private void processCancelledTasks() {
for (;;) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}

// sleep,直到下次tick到来,然后返回该次tick和启动时间之间的时长
private long waitForNextTick() {
// 下次tick的时间点,用于计算需要sleep的时间
long deadline = tickDuration * (tick + 1);

for (;;) {
// 计算需要sleep的时间
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}

// 因为windows平台的定时调度最小单位为10ms,如果不是10ms的倍数,可能会引起sleep时间不准确
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}

try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 调用HashedWheelTimer.stop()时优雅退出
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}

总结

总体来说,HashedWheelTimer使用的是一个比较朴素的算法,要点有两个:

  1. 添加定时任务

    1. 如果worker线程没有执行则启动worker线程。
    2. 将定时任务task包装成HashedWheelTimeout,然后添加到Queue<HashedWheelTimeout> timeouts队列中
  2. worker线程的执行

    1. 调用waitForNextTick方法等待直到下一个tick
    2. 调用processCancelledTasks方法处理被取消的任务。从Queue<HashedWheelTimeout> cancelledTimeouts队列(调用cancel方法取消任务时会将任务添加到该队列中)中取出被取消的任务,然后将其从格子的任务列表中移除。
    3. 计算当前tick所在的格子(bucket)
    4. 调用transferTimeoutsToBuckets方法将timeouts队列中新建的任务转移到所在格子的链表中
    5. 调用HashedWheelBucket.expireTimeouts方法执行到期的任务

这里有几个值的注意的数据结构:

  1. 任务并不是直接放在格子中的,而是维护了一个双向链表,这种数据结构非常便于插入和移除。
  2. 新添加的任务并不直接放入格子,而是先放入一个队列中,这是为了避免多线程插入任务的冲突。在每个tick运行任务之前由worker线程自动对任务进行归集和分类,插入到对应的槽位里面。

http://novoland.github.io/并发/2014/07/26/定时器(Timer)的实现.html
https://zhuanlan.zhihu.com/p/32906730