我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务

1. 概述

本文接 《精尽 Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 ,分享【处理定时任务】的部分。

因为 AbstractScheduledEventExecutor 在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 并未分享,并且它是本文的处理定时任务的前置,所以本文先写这部分内容。

2. ScheduledFutureTask

io.netty.util.concurrent.ScheduledFutureTask ,实现 ScheduledFuture、PriorityQueueNode 接口,继承 PromiseTask 抽象类,Netty 定时任务。

老艿艿:也有文章喜欢把“定时任务”叫作“调度任务”,意思是相同的,本文统一使用“定时任务”。

2.1 静态属性

/**
* 任务序号生成器,通过 AtomicLong 实现递增发号
*/
private static final AtomicLong nextTaskId = new AtomicLong();
/**
* 定时任务时间起点
*/
private static final long START_TIME = System.nanoTime();
  • nextTaskId 静态属性,任务序号生成器,通过 AtomicLong 实现递增发号。
  • START_TIME 静态属性,定时任务时间起点。在 ScheduledFutureTask 中,定时任务的执行时间,都是基于 START_TIME相对时间。😈 至于为什么使用相对时间?笔者暂时没有搞清楚。

    • 笔者也搜索了下和 System.nanoTime() 相关的内容,唯一能看的是 《System.nanoTime() 的隐患》 ,但是应该不是这个原因。
    • 和我的大表弟普架交流了一波,他的理解是:

      因为是定时调度,我改了系统时间也没关系
      存的是距离下次调度还要多长时间
      不受系统时间影响
      最大的好处

      • 哎哟,牛逼如我大表弟啊!!!

2.2 nanoTime

#nanoTime() 静态方法,获得当前时间,这个是相对 START_TIME 来算的。代码如下:

static long nanoTime() {
return System.nanoTime() - START_TIME;
}
  • 这是个重要的方法,后续很多方法都会调用到它。

2.3 deadlineNanos

#deadlineNanos(long delay) 静态方法,获得任务执行时间,这个也是相对 START_TIME 来算的。代码如下:

/**
* @param delay 延迟时长,单位:纳秒
* @return 获得任务执行时间,也是相对 {@link #START_TIME} 来算的。
* 实际上,返回的结果,会用于 {@link #deadlineNanos} 字段
*/
static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
// Guard against overflow 防御性编程
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

2.4 构造方法

/**
* 任务编号
*/
private final long id = nextTaskId.getAndIncrement();
/**
* 任务执行时间,即到了该时间,该任务就会被执行
*/
private long deadlineNanos;
/**
* 任务执行周期
*
* =0 - 只执行一次
* >0 - 按照计划执行时间计算
* <0 - 按照实际执行时间计算
*
* 推荐阅读文章 https://blog.csdn.net/gtuu0123/article/details/6040159
*/
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
/**
* 队列编号
*/
private int queueIndex = INDEX_NOT_IN_QUEUE;

ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Runnable runnable, V result, long nanoTime) {
this(executor, toCallable(runnable, result), nanoTime);
}

ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime;
periodNanos = period;
}

ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) {
super(executor, callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}
  • 每个字段比较简单,胖友看上面的注释。

2.5 delayNanos

#delayNanos(...) 方法,获得距离指定时间,还要多久可执行。代码如下:

/**
* @return 距离当前时间,还要多久可执行。若为负数,直接返回 0
*/
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}

/**
* @param currentTimeNanos 指定时间
* @return 距离指定时间,还要多久可执行。若为负数,直接返回 0
*/
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}

2.6 run

#run() 方法,执行定时任务。代码如下:

 1: @Override
2: public void run() {
3: assert executor().inEventLoop();
4: try {
5: if (periodNanos == 0) {
6: // 设置任务不可取消
7: if (setUncancellableInternal()) {
8: // 执行任务
9: V result = task.call();
10: // 通知任务执行成功
11: setSuccessInternal(result);
12: }
13: } else {
14: // 判断任务并未取消
15: // check if is done as it may was cancelled
16: if (!isCancelled()) {
17: // 执行任务
18: task.call();
19: if (!executor().isShutdown()) {
20: // 计算下次执行时间
21: long p = periodNanos;
22: if (p > 0) {
23: deadlineNanos += p;
24: } else {
25: deadlineNanos = nanoTime() - p;
26: }
27: // 判断任务并未取消
28: if (!isCancelled()) {
29: // 重新添加到任务队列,等待下次定时执行
30: // scheduledTaskQueue can never be null as we lazy init it before submit the task!
31: Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
32: ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
33: assert scheduledTaskQueue != null;
34: scheduledTaskQueue.add(this);
35: }
36: }
37: }
38: }
39: // 发生异常,通知任务执行失败
40: } catch (Throwable cause) {
41: setFailureInternal(cause);
42: }
43: }
  • 第 3 行:校验,必须在 EventLoop 的线程中。
  • 根据不同的任务执行周期 periodNanos ,在执行任务会略有不同。当然,大体是相同的。
  • 第 5 至 12 行:执行周期为“只执行一次”的定时任务。
    • 第 7 行:调用 PromiseTask#setUncancellableInternal() 方法,设置任务不可取消。具体的方法实现,我们在后续关于 Promise 的文章中分享。
    • 第 9 行:【重要】调用 Callable#call() 方法,执行任务。
    • 第 11 行:调用 PromiseTask#setSuccessInternal(V result) 方法,回调通知注册在定时任务上的监听器。为什么能这么做呢?因为 ScheduledFutureTask 继承了 PromiseTask 抽象类。
  • 第 13 至 38 行:执行周期为“固定周期”的定时任务。
    • 第 16 行:调用 DefaultPromise#isCancelled() 方法,判断任务是否已经取消。这一点,和【第 7 行】的代码,是不同的。具体的方法实现,我们在后续关于 Promise 的文章中分享。
    • 第 18 行:【重要】调用 Callable#call() 方法,执行任务。
    • 第 19 行:判断 EventExecutor 并未关闭。
    • 第 20 至 26 行:计算下次定时执行的时间。不同的执行 fixed 方式,计算方式不同。其中【第 25 行】的 - p 的代码,因为 p 是负数,所以通过负负得正来计算。另外,这块会修改定时任务的 deadlineNanos 属性,从而变成新的定时任务执行时间。
    • 第 28 行:和【第 16 行】的代码是一致的。
    • 第 29 至 34 行:重新添加到定时任务队列 scheduledTaskQueue 中,等待下次定时执行。
  • 第 39 至 42 行:发生异常,调用 PromiseTask#setFailureInternal(Throwable cause) 方法,回调通知注册在定时任务上的监听器。

2.7 cancel

有两个方法,可以取消定时任务。代码如下:

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning);
// 取消成功,移除出定时任务队列
if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
}
return canceled;
}

// 移除任务
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}
  • 差别在于,是否 调用 AbstractScheduledEventExecutor#removeScheduled(ScheduledFutureTask) 方法,从定时任务队列移除自己。

2.8 compareTo

#compareTo(Delayed o) 方法,用于队列( ScheduledFutureTask 使用 PriorityQueue 作为优先级队列 )排序。代码如下:

@Override
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}

ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
  • 按照 deadlineNanosid 属性升序排序。

2.9 priorityQueueIndex

#priorityQueueIndex(...) 方法,获得或设置 queueIndex 属性。代码如下:

@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) { // 获得
return queueIndex;
}

@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) { // 设置
queueIndex = i;
}
  • 因为 ScheduledFutureTask 实现 PriorityQueueNode 接口,所以需要实现这两个方法。

3. AbstractScheduledEventExecutor

io.netty.util.concurrent.AbstractScheduledEventExecutor ,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。

3.1 构造方法

/**
* 定时任务队列
*/
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

protected AbstractScheduledEventExecutor() {
}

protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
  • scheduledTaskQueue 属性,定时任务队列。

3.2 scheduledTaskQueue

#scheduledTaskQueue() 方法,获得定时任务队列。若未初始化,则进行创建。代码如下:

/**
* 定时任务排序器
*/
private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
new Comparator<ScheduledFutureTask<?>>() {
@Override
public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
return o1.compareTo(o2); //
}
};

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
  • 创建的队列是 io.netty.util.internal.DefaultPriorityQueue 类型。具体的代码实现,本文先不解析。在这里,我们只要知道它是一个优先级队列,通过 SCHEDULED_FUTURE_TASK_COMPARATOR 来比较排序 ScheduledFutureTask 的任务优先级( 顺序 )。
  • SCHEDULED_FUTURE_TASK_COMPARATOR 的具体实现,是调用 「2.8 compareTo」 方法来实现,所以队列首个任务,就是第一个需要执行的定时任务。

3.3 nanoTime

#nanoTime() 静态方法,获得当前时间。代码如下:

protected static long nanoTime() {
return ScheduledFutureTask.nanoTime();
}

3.4 schedule

#schedule(final ScheduledFutureTask<V> task) 方法,提交定时任务。代码如下:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
// 添加到定时任务队列
scheduledTaskQueue().add(task);
} else {
// 通过 EventLoop 的线程,添加到定时任务队列
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
  • 必须在 EventLoop 的线程中,才能添加到定时任务到队列中。

在 ScheduledFutureTask 中,有四个方法,会调用 #schedule(final ScheduledFutureTask<V> task) 方法,分别创建 3 种不同类型的定时任务。代码如下:

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
// 无视,已经废弃
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
// 无视,已经废弃
validateScheduled0(initialDelay, unit);
validateScheduled0(period, unit);

return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null), // Runnable => Callable
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (delay <= 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: > 0)", delay));
}
// 无视,已经废弃
validateScheduled0(initialDelay, unit);
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null), // Runnable => Callable
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
  • 每个方法,前面都是校验参数的代码,重点是在最后对 #schedule(final ScheduledFutureTask<V> task) 方法的调用。

3.5 removeScheduled

#removeScheduled(final ScheduledFutureTask<?> task) 方法,移除出定时任务队列。代码如下:

final void removeScheduled(final ScheduledFutureTask<?> task) {
if (inEventLoop()) {
// 移除出定时任务队列
scheduledTaskQueue().removeTyped(task);
} else {
// 通过 EventLoop 的线程,移除出定时任务队列
execute(new Runnable() {
@Override
public void run() {
removeScheduled(task);
}
});
}
}
  • 必须在 EventLoop 的线程中,才能移除出定时任务队列。

3.6 hasScheduledTasks

#hasScheduledTasks() 方法,判断是否有可执行的定时任务。代码如下:

/**
* Returns {@code true} if a scheduled task is ready for processing.
*/
protected final boolean hasScheduledTasks() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
// 获得队列首个定时任务。不会从队列中,移除该任务
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
// 判断该任务是否到达可执行的时间
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}
  • 代码比较简单,胖友直接看方法注释。

3.7 peekScheduledTask

#peekScheduledTask() 方法,获得队列首个定时任务。不会从队列中,移除该任务。代码如下:

final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}

3.8 nextScheduledTaskNano

#nextScheduledTaskNano() 方法,获得定时任务队列,距离当前时间,还要多久可执行。

  • 若队列为空,则返回 -1
  • 若队列非空,若为负数,直接返回 0 。实际等价,ScheduledFutureTask#delayNanos() 方法。

代码如下:

/**
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*/
protected final long nextScheduledTaskNano() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
// 获得队列首个定时任务。不会从队列中,移除该任务
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return -1;
}
// 距离当前时间,还要多久可执行。若为负数,直接返回 0 。实际等价,ScheduledFutureTask#delayNanos() 方法。
return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
}

3.9 pollScheduledTask

#pollScheduledTask(...) 方法,获得指定时间内,定时任务队列首个可执行的任务,并且从队列中移除。代码如下:

/**
* @see #pollScheduledTask(long)
*/
protected final Runnable pollScheduledTask() {
return pollScheduledTask(nanoTime()); // 当前时间
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
// 获得队列首个定时任务。不会从队列中,移除该任务
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
// 直接返回,若获取不到
if (scheduledTask == null) {
return null;
}

// 在指定时间内,则返回该任务
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove(); // 移除任务
return scheduledTask;
}
return null;
}

3.10 cancelScheduledTasks

#cancelScheduledTasks() 方法,取消定时任务队列的所有任务。代码如下:

/**
* Cancel all scheduled tasks.
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected void cancelScheduledTasks() {
assert inEventLoop();

// 若队列为空,直接返回
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}

// 循环,取消所有任务
final ScheduledFutureTask<?>[] scheduledTasks = scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[0]);
for (ScheduledFutureTask<?> task : scheduledTasks) {
task.cancelWithoutRemove(false);
}

scheduledTaskQueue.clearIgnoringIndexes();
}

private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
return queue == null || queue.isEmpty();
}
  • 代码比较简单,胖友自己看注释。

4. SingleThreadEventExecutor

《精尽 Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 中,有个 #fetchFromScheduledTaskQueue() 方法,将定时任务队列 scheduledTaskQueue 到达可执行的任务,添加到任务队列 taskQueue 中。代码如下:

private boolean fetchFromScheduledTaskQueue() {
// 获得当前时间
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 获得指定时间内,定时任务队列**首个**可执行的任务,并且从队列中移除。
Runnable scheduledTask = pollScheduledTask(nanoTime);
// 不断从定时任务队列中,获得
while (scheduledTask != null) {
// 将定时任务添加到 taskQueue 中。若添加失败,则结束循环,返回 false ,表示未获取完所有课执行的定时任务
if (!taskQueue.offer(scheduledTask)) {
// 将定时任务添加回 scheduledTaskQueue 中
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
// 获得指定时间内,定时任务队列**首个**可执行的任务,并且从队列中移除。
scheduledTask = pollScheduledTask(nanoTime);
}
// 返回 true ,表示获取完所有可执行的定时任务
return true;
}
  • 代码比较简单,胖友看下笔者的详细代码注释。哈哈哈

666. 彩蛋

没有彩蛋,简单水文一篇。

总访客数 && 总访问量