我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行

1. 概述

本文我们分享 EventLoop 的运行相关代码的实现。

因为 EventLoop 的运行主要是通过 NioEventLoop 的 #run() 方法实现,考虑到内容相对的完整性,在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 一文中,我们并未分享 NioEventLoop 的初始化,所以本文也会分享这部分的内容。

OK ,还是老样子,自上而下的方式,一起来看看 NioEventLoop 的代码实现。

老艿艿,本文的重点在 「2.9 run」「2.12 select」 中。

2. NioEventLoop

io.netty.channel.nio.NioEventLoop ,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。

2.1 static

static 代码块中,初始化了 NioEventLoop 的静态属性们。代码如下:

/**
* TODO 1007 NioEventLoop cancel
*/
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.

/**
* 是否禁用 SelectionKey 的优化,默认开启
*/
private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

/**
* 少于该 N 值,不开启空轮询重建新的 Selector 对象的功能
*/
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
/**
* NIO Selector 空轮询该 N 次后,重建新的 Selector 对象
*/
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;

static {
// 解决 Selector#open() 方法 // <1>
final String key = "sun.nio.ch.bugLevel";
final String buglevel = SystemPropertyUtil.get(key);
if (buglevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}

// 初始化
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}

2.2 构造方法

/**
* The NIO {@link Selector}.
*
* 包装的 Selector 对象,经过优化
*
* {@link #openSelector()}
*/
private Selector selector;
/**
* 未包装的 Selector 对象
*/
private Selector unwrappedSelector;
/**
* 注册的 SelectionKey 集合。Netty 自己实现,经过优化。
*/
private SelectedSelectionKeySet selectedKeys;
/**
* SelectorProvider 对象,用于创建 Selector 对象
*/
private final SelectorProvider provider;

/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*
* 唤醒标记。因为唤醒方法 {@link Selector#wakeup()} 开销比较大,通过该标识,减少调用。
*
* @see #wakeup(boolean)
* @see #run()
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* Select 策略
*
* @see #select(boolean)
*/
private final SelectStrategy selectStrategy;
/**
* 处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例
*/
private volatile int ioRatio = 50;
/**
* 取消 SelectionKey 的数量
*
* TODO 1007 NioEventLoop cancel
*/
private int cancelledKeys;
/**
* 是否需要再次 select Selector 对象
*
* TODO 1007 NioEventLoop cancel
*/
private boolean needsToSelectAgain;

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建 Selector 对象 <1>
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
  • Selector 相关:
  • wakenUp 属性,唤醒标记。因为唤醒方法 Selector#wakeup() 开销比较大,通过该标识,减少调用。详细解析,见 「2.8 wakeup」
  • selectStrategy 属性,Select 策略。详细解析,见 「2.10 SelectStrategy」
  • ioRatio 属性,在 NioEventLoop 中,会三种类型的任务:1) Channel 的就绪的 IO 事件;2) 普通任务;3) 定时任务。而 ioRatio 属性,处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例。
  • 取消 SelectionKey 相关:
    • cancelledKeys 属性, 取消 SelectionKey 的数量。TODO 1007 NioEventLoop cancel
    • needsToSelectAgain 属性,是否需要再次 select Selector 对象。TODO 1007 NioEventLoop cancel

2.3 openSelector

#openSelector() 方法,创建 NIO Selector 对象。

考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》

2.4 rebuildSelector

#rebuildSelector() 方法,重建 NIO Selector 对象。

考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》

2.5 newTaskQueue

#newTaskQueue(int maxPendingTasks) 方法,创建任务队列。代码如下:

该方法覆写父类的该方法。

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
  • 调用 PlatformDependent#newMpscQueue(...) 方法,创建 mpsc 队列。我们来看看代码注释对 mpsc 队列的描述:

    Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single consumer (one thread!).

2.6 pendingTasks

#pendingTasks() 方法,获得待执行的任务数量。代码如下:

该方法覆写父类的该方法。

@Override
public int pendingTasks() {
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
// See https://github.com/netty/netty/issues/5297
if (inEventLoop()) {
return super.pendingTasks();
} else {
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
}
}
  • 因为 MpscQueue 仅允许单消费,所以获得队列的大小,仅允许在 EventLoop 的线程中调用。

2.7 setIoRatio

#setIoRatio(int ioRatio) 方法,设置 ioRatio 属性。代码如下:

/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}

2.8 wakeup

#wakeup(boolean inEventLoop) 方法,唤醒线程。代码如下:

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) { // <2>
selector.wakeup(); // <1>
}
}
  • <1> 处,因为 NioEventLoop 的线程阻塞,主要是调用 Selector#select(long timeout) 方法,阻塞等待有 Channel 感兴趣的 IO 事件,或者超时。所以需要调用 Selector#wakeup() 方法,进行唤醒 Selector 。
  • <2> 处,因为 Selector#wakeup() 方法的唤醒操作是开销比较大的操作,并且每次重复调用相当于重复唤醒。所以,通过 wakenUp 属性,通过 CAS 修改 false => true ,保证有且仅有进行一次唤醒。
  • 当然,详细的解析,可以结合 「2.9 run」 一起看,这样会更加清晰明了。

2.9 run

#run() 方法,NioEventLoop 运行,处理任务。这是本文最重要的方法。代码如下:

 1: @Override
2: protected void run() {
3: for (;;) {
4: try {
5: switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
6: case SelectStrategy.CONTINUE: // 默认实现下,不存在这个情况。
7: continue;
8: case SelectStrategy.SELECT:
9: // 重置 wakenUp 标记为 false
10: // 选择( 查询 )任务
11: select(wakenUp.getAndSet(false));
12:
13: // 'wakenUp.compareAndSet(false, true)' is always evaluated
14: // before calling 'selector.wakeup()' to reduce the wake-up
15: // overhead. (Selector.wakeup() is an expensive operation.)
16: //
17: // However, there is a race condition in this approach.
18: // The race condition is triggered when 'wakenUp' is set to
19: // true too early.
20: //
21: // 'wakenUp' is set to true too early if:
22: // 1) Selector is waken up between 'wakenUp.set(false)' and
23: // 'selector.select(...)'. (BAD)
24: // 2) Selector is waken up between 'selector.select(...)' and
25: // 'if (wakenUp.get()) { ... }'. (OK)
26: //
27: // In the first case, 'wakenUp' is set to true and the
28: // following 'selector.select(...)' will wake up immediately.
29: // Until 'wakenUp' is set to false again in the next round,
30: // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
31: // any attempt to wake up the Selector will fail, too, causing
32: // the following 'selector.select(...)' call to block
33: // unnecessarily.
34: //
35: // To fix this problem, we wake up the selector again if wakenUp
36: // is true immediately after selector.select(...).
37: // It is inefficient in that it wakes up the selector for both
38: // the first case (BAD - wake-up required) and the second case
39: // (OK - no wake-up required).
40:
41: // 唤醒。原因,见上面中文注释
42: if (wakenUp.get()) {
43: selector.wakeup();
44: }
45: // fall through
46: default:
47: }
48:
49: // TODO 1007 NioEventLoop cancel 方法
50: cancelledKeys = 0;
51: needsToSelectAgain = false;
52:
53: final int ioRatio = this.ioRatio;
54: if (ioRatio == 100) {
55: try {
56: // 处理 Channel 感兴趣的就绪 IO 事件
57: processSelectedKeys();
58: } finally {
59: // 运行所有普通任务和定时任务,不限制时间
60: // Ensure we always run tasks.
61: runAllTasks();
62: }
63: } else {
64: final long ioStartTime = System.nanoTime();
65: try {
66: // 处理 Channel 感兴趣的就绪 IO 事件
67: processSelectedKeys();
68: } finally {
69: // 运行所有普通任务和定时任务,限制时间
70: // Ensure we always run tasks.
71: final long ioTime = System.nanoTime() - ioStartTime;
72: runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
73: }
74: }
75: } catch (Throwable t) {
76: handleLoopException(t);
77: }
78: // TODO 1006 EventLoop 优雅关闭
79: // Always handle shutdown even if the loop processing threw an exception.
80: try {
81: if (isShuttingDown()) {
82: closeAll();
83: if (confirmShutdown()) {
84: return;
85: }
86: }
87: } catch (Throwable t) {
88: handleLoopException(t);
89: }
90: }
91: }
  • 第 3 行:“死”循环,直到 NioEventLoop 关闭,即【第 78 至 89 行】的代码。
  • 第 5 行:调用 SelectStrategy#calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) 方法,获得使用的 select 策略。详细解析,胖友先跳到 「2.10 SelectStrategy」 中研究。😈 看完回来。
    • 我们知道 SelectStrategy#calculateStrategy(...) 方法,有 3 种返回的情况。
    • 第 6 至 7 行:第一种,SelectStrategy.CONTINUE ,默认实现下,不存在这个情况。
    • 第 8 至 44 行:第二种,SelectStrategy.SELECT ,进行 Selector 阻塞 select 。
      • 第 11 行:重置 wakeUp 标识为 false ,并返回修改前的值。
      • 第 11 行:调用 #select(boolean oldWakeUp) 方法,选择( 查询 )任务。直接看这个方法不能完全表达出该方法的用途,所以详细解析,见 「2.12 select」
      • 第 41 至 44 行:若唤醒标识 wakeuptrue 时,调用 Selector#wakeup() 方法,唤醒 Selector 。可能看到此处,很多胖友会和我一样,一脸懵逼。实际上,耐下性子,答案在上面的英文注释中。笔者来简单解析下:
        • 1)在 wakenUp.getAndSet(false)#select(boolean oldWakeUp) 之间,在标识 wakeUp 设置为 false 时,在 #select(boolean oldWakeUp) 方法中,正在调用 Selector#select(...) 方法,处于阻塞中。
        • 2)此时,有另外的线程调用了 #wakeup() 方法,会将标记 wakeUp 设置为 true ,并唤醒 Selector#select(...) 方法的阻塞等待。
        • 3)标识 wakeUptrue ,所以再有另外的线程调用 #wakeup() 方法,都无法唤醒 Selector#select(...) 。为什么呢?因为 #wakeup() 的 CAS 修改 false => true失败,导致无法调用 Selector#wakeup() 方法。
        • 解决方式:所以在 #select(boolean oldWakeUp) 执行完后,增加了【第 41 至 44 行】来解决。
        • 😈😈😈 整体比较绕,胖友结合实现代码 + 英文注释,再好好理解下。
    • 第 46 行:第三种,>= 0 ,已经有可以处理的任务,直接向下。
  • 第 49 至 51 行:TODO 1007 NioEventLoop cancel 方法
  • 第 53 至 74 行:根据 ioRatio 的配置不同,分成略有差异的 2 种:
    • 第一种,ioRatio 为 100 ,则不考虑时间占比的分配。
    • 第二种,ioRatio< 100 ,则考虑时间占比的分配。
      • 第 64 行:记录当前时间。
      • 第 67 行:和【第 57 行】的代码一样
      • 第 71 至 72 行:🙂 比较巧妙的方式,是不是和胖友之前认为的不太一样。它是以 #processSelectedKeys() 方法的执行时间作为基准,计算 #runAllTasks(long timeoutNanos) 方法可执行的时间。
      • 第 72 行:调用 #runAllTasks(long timeoutNanos)` 方法,运行所有普通任务和定时任务,限制时间
  • 第 75 至 77 行:当发生异常时,调用 #handleLoopException(Throwable t) 方法,处理异常。代码如下:

    private static void handleLoopException(Throwable t) {
    logger.warn("Unexpected exception in the selector loop.", t);

    // Prevent possible consecutive immediate failures that lead to
    // excessive CPU consumption.
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    // Ignore.
    }
    }
  • 第 78 至 89 行:TODO 1006 EventLoop 优雅关闭

  • 总的来说,#run() 的执行过程,就是如下一张图:run

2.10 SelectStrategy

io.netty.channel.SelectStrategy ,选择( select )策略接口。代码如下:

public interface SelectStrategy {

/**
* Indicates a blocking select should follow.
*
* 表示使用阻塞 select 的策略。
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*
* 表示需要进行重试的策略。
*/
int CONTINUE = -2;

/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;

}
  • calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) 接口方法有 3 种返回的情况:
    • SELECT-1 ,表示使用阻塞 select 的策略。
    • CONTINUE-2,表示需要进行重试的策略。实际上,默认情况下,不会返回 CONTINUE 的策略。
    • >= 0 ,表示不需要 select ,目前已经有可以执行的任务了。

2.10.1 DefaultSelectStrategy

io.netty.channel.DefaultSelectStrategy ,实现 SelectStrategy 接口,默认选择策略实现类。代码如下:

final class DefaultSelectStrategy implements SelectStrategy {

/**
* 单例
*/
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

private DefaultSelectStrategy() { }

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

}
  • hasTaskstrue ,表示当前已经有任务,所以调用 IntSupplier#get() 方法,返回当前 Channel 新增的 IO 就绪事件的数量。代码如下:

    private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
    return selectNow();
    }
    };
    • io.netty.util.IntSupplier ,代码如下:

      public interface IntSupplier {

      /**
      * Gets a result.
      *
      * @return a result
      */
      int get() throws Exception;

      }
      • 类似 Java 自带的 Callable<Int>
    • IntSupplier 在 NioEventLoop 中的实现为 selectNowSupplier 属性。在它的内部会调用 #selectNow() 方法。详细解析,见 「2.11 selectNow」
    • 实际上,这里不调用 IntSupplier#get() 方法,也是可以的。只不过考虑到,可以通过 #selectNow() 方法,无阻塞的 select Channel 是否有感兴趣的就绪事件。
  • hasTasksfalse 时,直接返回 SelectStrategy.SELECT ,进行阻塞 select Channel 感兴趣的就绪 IO 事件。

2.11 selectNow

#selectNow() 方法,代码如下:

int selectNow() throws IOException {
try {
return selector.selectNow(); // <1>
} finally {
// restore wakeup state if needed <2>
if (wakenUp.get()) {
selector.wakeup();
}
}
}
  • <1> 处,调用 Selector#selectorNow() 方法,立即( 无阻塞 )返回 Channel 新增的感兴趣的就绪 IO 事件数量。
  • <2> 处,若唤醒标识 wakeuptrue 时,调用 Selector#wakeup() 方法,唤醒 Selector 。因为 <1> 处的 Selector#selectorNow() 会使用我们对 Selector 的唤醒,所以需要进行复原。有一个冷知道,可能有胖友不知道:

    注意,如果有其它线程调用了 #wakeup() 方法,但当前没有线程阻塞在 #select() 方法上,下个调用 #select() 方法的线程会立即被唤醒。😈 有点神奇。

2.12 select

#select(boolean oldWakenUp) 方法,选择( 查询 )任务。这是本文最重要的方法。代码如下:

  1: private void select(boolean oldWakenUp) throws IOException {
2: // 记录下 Selector 对象
3: Selector selector = this.selector;
4: try {
5: // select 计数器
6: int selectCnt = 0; // cnt 为 count 的缩写
7: // 记录当前时间,单位:纳秒
8: long currentTimeNanos = System.nanoTime();
9: // 计算 select 截止时间,单位:纳秒。
10: long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
11:
12: for (;;) {
13: // 计算本次 select 的超时时长,单位:毫秒。
14: // + 500000L 是为了四舍五入
15: // / 1000000L 是为了纳秒转为毫秒
16: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
17: // 如果超时时长,则结束 select
18: if (timeoutMillis <= 0) {
19: if (selectCnt == 0) { // 如果是首次 select ,selectNow 一次,非阻塞
20: selector.selectNow();
21: selectCnt = 1;
22: }
23: break;
24: }
25:
26: // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
27: // Selector#wakeup. So we need to check task queue again before executing select operation.
28: // If we don't, the task might be pended until select operation was timed out.
29: // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
30: // 若有新的任务加入
31: if (hasTasks() && wakenUp.compareAndSet(false, true)) {
32: // selectNow 一次,非阻塞
33: selector.selectNow();
34: // 重置 select 计数器
35: selectCnt = 1;
36: break;
37: }
38:
39: // 阻塞 select ,查询 Channel 是否有就绪的 IO 事件
40: int selectedKeys = selector.select(timeoutMillis);
41: // select 计数器 ++
42: selectCnt ++;
43:
44: // 结束 select ,如果满足下面任一一个条件
45: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
46: // - Selected something,
47: // - waken up by user, or
48: // - the task queue has a pending task.
49: // - a scheduled task is ready for processing
50: break;
51: }
52: // 线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。
53: if (Thread.interrupted()) {
54: // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
55: // As this is most likely a bug in the handler of the user or it's client library we will
56: // also log it.
57: //
58: // See https://github.com/netty/netty/issues/2426
59: if (logger.isDebugEnabled()) {
60: logger.debug("Selector.select() returned prematurely because " +
61: "Thread.currentThread().interrupt() was called. Use " +
62: "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
63: }
64: selectCnt = 1;
65: break;
66: }
67:
68: // 记录当前时间
69: long time = System.nanoTime();
70: // 符合 select 超时条件,重置 selectCnt 为 1
71: if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
72: // timeoutMillis elapsed without anything selected.
73: selectCnt = 1;
74: // 不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建
75: } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
76: selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
77: // The selector returned prematurely many times in a row.
78: // Rebuild the selector to work around the problem.
79: logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
80:
81: // 重建 Selector 对象
82: rebuildSelector();
83: // 修改下 Selector 对象
84: selector = this.selector;
85:
86: // Select again to populate selectedKeys.
87: // 立即 selectNow 一次,非阻塞
88: selector.selectNow();
89: // 重置 selectCnt 为 1
90: selectCnt = 1;
91: // 结束 select
92: break;
93: }
94:
95: currentTimeNanos = time;
96: }
97:
98: if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
99: if (logger.isDebugEnabled()) {
100: logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
101: }
102: }
103: } catch (CancelledKeyException e) {
104: if (logger.isDebugEnabled()) {
105: logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e);
106: }
107: // Harmless exception - log anyway
108: }
109: }
  • 第 3 行:获得使用的 Selector 对象,不需要每次访问使用 volatile 修饰的 selector 属性。
  • 第 6 行:获得 select 操作的计数器。主要用于记录 Selector 空轮询次数,所以每次在正在轮询完成( 例如:轮询超时 ),则重置 selectCnt 为 1 。
  • 第 8 行:记录当前时间,单位:纳秒。
  • 第 10 行:计算 select 操作的截止时间,单位:纳秒。
    • #delayNanos(currentTimeNanos) 方法返回的为下一个定时任务距离现在的时间,如果不存在定时任务,则默认返回 1000 ms 。该方法的详细解析,见后续文章。
  • 第 12 行:“死”循环,直到符合如下任一一种情况后结束
    1. select 操作超时,对应【第 18 至 24 行】。
    2. 若有新的任务加入,对应【第 26 至 37 行】。
    3. 查询到任务或者唤醒,对应【第 45 至 51 行】。
    4. 线程被异常打断,对应【第 52 至 66 行】。
    5. 发生 NIO 空轮询的 Bug 后重建 Selector 对象后,对应【第 75 至 93 行】。
  • 第 16 行:计算本次 select 的超时时长,单位:毫秒。因为【第 40 行】的 Selector#select(timeoutMillis) 方法,可能因为各种情况结束,所以需要循环,并且每次重新计算超时时间。至于 + 500000L/ 1000000L 的用途,看下代码注释。
  • 第 17 至 24 行:如果超过 select 超时时长,则结束 select 。
    • 第 19 至 21 行:如果是首次 select ,则调用 Selector#selectNow() 方法,获得非阻塞的 Channel 感兴趣的就绪的 IO 事件,并重置 selectCnt 为 1 。
  • 第 26 至 37 行:若有新的任务加入。这里实际要分成两种情况:
    • 第一种,提交的任务的类型是 NonWakeupRunnable ,那么它并不会调用 #wakeup() 方法,原因胖友自己看 #execute(Runnable task) 思考下。Netty 在 #select() 方法的设计上,能尽快执行任务。此时如果标记 wakeupfalse ,说明符合这种情况,直接结束 select 。
    • 第二种,提交的任务的类型不是 NonWakeupRunnable ,那么在 #run() 方法的【第 8 至 11 行】的 wakenUp.getAndSet(false) 之前,发起了一次 #wakeup() 方法,那么因为 wakenUp.getAndSet(false) 会将标记 wakeUp 设置为 false ,所以就能满足 hasTasks() && wakenUp.compareAndSet(false, true) 的条件。
      • 这个解释,就和【第 27 至 28 行】的英文注释 So we need to check task queue again before executing select operation.If we don't, the task might be pended until select operation was timed out. 有出入了?这是为什么呢?因为 Selector 被提前 wakeup 了,所以下一次 Selector 的 select 是被直接唤醒结束的。
    • 第 33 行:虽然已经发现任务,但是还是调用 Selector#selectNow() 方法,非阻塞的获取一次 Channel 新增的就绪的 IO 事件。
    • 对应 Github 的代码提交为 https://github.com/lightningMan/netty/commit/f44f3e7926f1676315ae86d0f18bdd9b95681d9f
  • 第 40 行:调用 Selector#select(timeoutMillis) 方法,阻塞 select ,获得 Channel 新增的就绪的 IO 事件的数量。
  • 第 42 行:select 计数器加 1 。
  • 第 44 至 51 行:如果满足下面任一一个条件,结束 select :
    1. selectedKeys != 0 时,表示有 Channel 新增的就绪的 IO 事件,所以结束 select ,很好理解。
    2. oldWakenUp || wakenUp.get() 时,表示 Selector 被唤醒,所以结束 select 。
    3. hasTasks() || hasScheduledTasks() ,表示有普通任务或定时任务,所以结束 select 。
    4. 那么剩余的情况,主要是 select 超时或者发生空轮询,即【第 68 至 93 行】的代码。
  • 第 52 至 66 行:线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。感兴趣的胖友,可以看看 https://github.com/netty/netty/issues/2426
  • 第 69 行:记录当前时间。
    • 第 70 至 73 行:若满足 time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos ,说明到达此处时,Selector 是超时 select ,那么是正常的,所以重置 selectCnt 为 1 。
    • 第 74 至 93 行:不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建。这就是 Netty 判断发生 NIO Selector 空轮询的方式,N ( 默认 512 )次 select 并未阻塞超时这么长,那么就认为发生 NIO Selector 空轮询。过多的 NIO Selector 将会导致 CPU 100% 。
      • 第 82 行:调用 #rebuildSelector() 方法,重建 Selector 对象。
      • 第 84 行:重新获得使用的 Selector 对象。
      • 第 86 至 90 行:同【第 20 至 21 行】的代码。
      • 第 92 行:结束 select 。
  • 第 95 行:记录新的当前时间,用于【第 16 行】,重新计算本次 select 的超时时长。

666. 彩蛋

总的来说还是比较简单的,比较困难的,在于对标记 wakeup 的理解。真的是,细思极恐!!!感谢在理解过程中,闪电侠和大表弟普架的帮助。

推荐阅读文章:

老艿艿:全文的 NIO Selector 空轮询,指的是 epoll cpu 100% 的 bug 。

总访客数 && 总访问量