我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Dubbo 源码解析 —— 优雅停机

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文分享 Dubbo 的优雅停机 Graceful Shutdown ,对应 《Dubbo 用户指南 —— 优雅停机》

定义如下:

Dubbo 是通过 JDK 的 ShutdownHook 来完成优雅停机的,所以如果用户使用 kill -9 PID 等强制关闭指令,是不会执行优雅停机的,只有通过 kill PID 时,才会执行。

原理如下:

服务提供方

  • 停止时,先标记为不接收新请求,新请求过来时直接报错,让客户端重试其它机器。 //<1>
  • 然后,检测线程池中的线程是否正在运行,如果有,等待所有线程执行完成,除非超时,则强制关闭。 // <2>

服务消费方

  • 停止时,不再发起新的调用请求,所有新的调用在客户端即报错。 // <3>
  • 然后,检测有没有请求的响应还没有返回,等待响应返回,除非超时,则强制关闭。 // <4>

2. ShutdownHook

Dubbo 的优雅停机 ShutdownHook 在 AbstractConfig 的静态代码块初始化,代码如下:

static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (logger.isInfoEnabled()) {
logger.info("Run shutdown hook now.");
}
ProtocolConfig.destroyAll();
}
}, "DubboShutdownHook"));
}
  • 从代码的位置上来说,这不是一个的位置。但是考虑到保证能够被初始化到 ShutdownHook ,这又是一个合适的位置。当然,从官方的 TODO 来说,未来可能会换地方。
  • ProtocolConfig#destroyAll() 方法,代码如下:

     1: public static void destroyAll() {
    2: // 忽略,若已经销毁
    3: if (!destroyed.compareAndSet(false, true)) {
    4: return;
    5: }
    6: // 销毁 Registry 相关
    7: AbstractRegistryFactory.destroyAll();
    8:
    9: // 等到服务消费,接收到注册中心通知到该服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。
    10: // Wait for registry notification
    11: try {
    12: Thread.sleep(ConfigUtils.getServerShutdownTimeout());
    13: } catch (InterruptedException e) {
    14: logger.warn("Interrupted unexpectedly when waiting for registry notification during shutdown process!");
    15: }
    16:
    17: // 销毁 Protocol 相关
    18: ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
    19: for (String protocolName : loader.getLoadedExtensions()) {
    20: try {
    21: Protocol protocol = loader.getLoadedExtension(protocolName);
    22: if (protocol != null) {
    23: protocol.destroy();
    24: }
    25: } catch (Throwable t) {
    26: logger.warn(t.getMessage(), t);
    27: }
    28: }
    29: }
    • 第 2 至 5 行:忽略,若已经销毁。
    • 第 7 行:调用 AbstractRegistryFactory#destroyAll() 方法,销毁所有 Registry ,取消应用程序中的服务提供者和消费者的订阅注册。详细解析,见 「2.1 AbstractRegistryFactory」 中。
    • 第 9 至 15 行:sleep 等待一段时间,用于其他应用程序的服务消费者,接收到注册中心通知,该应用程序的服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。

      • 😈 当然,这不是绝对能等待到,而是开发者自己配置 "dubbo.service.shutdown.wait" 参数,设置等待的时长(单位:毫秒)。ConfigUtils#getServerShutdownTimeout() 方法,代码如下:

        public static int getServerShutdownTimeout() {
        // 默认,10 * 1000 毫秒
        int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
        // 获得 "dubbo.service.shutdown.wait" 配置项,单位:毫秒
        String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);
        if (value != null && value.length() > 0) {
        try {
        timeout = Integer.parseInt(value);
        } catch (Exception e) {
        }
        // 若为空,获得 "dubbo.service.shutdown.wait.seconds" 配置项,单位:秒。
        // ps:目前已经废弃该参数,推荐使用 "dubbo.service.shutdown.wait"
        } else {
        value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);
        if (value != null && value.length() > 0) {
        try {
        timeout = Integer.parseInt(value) * 1000;
        } catch (Exception e) {
        }
        }
        }
        // 返回
        return timeout;
        }
        • 默认 10 * 1000 毫秒。
      • ISSUE#1021:Enhancement for graceful shutdown ,是针对这块的讨论,非常有趣,胖友一定要看看。

        目前不管是用的最多的2.5.3版本还是最新的2.5.7版本,亲自测试在不设置重试机制下是无法做到优雅停机的,这次改动主要是修改一点点代码,加上可配置的等待时间,就能简单的做到“不开启重试也能优雅停机”。

        其主要实现机制就是在【provider断连注册中心之后,关闭应答之前】和【consumer移除掉invoker后,关闭client之前】这两个阶段加入可配置的等待时间,目前亲测可以做到不配置重试也能优雅停机。

        因为现在大多数用dubbo的公司,为了避免极端情况下的雪崩和流量风暴,大部分接口都会关闭重试机制,这样,对于当前dubbo优雅停机的设定,就无法做到优雅停机了,所以这里通过比较简单的方式,加大了在不重试情况下优雅停机的成功率。

  • 第 17 至 28 行:销毁所有 Protocol 。目前分层两类 Protocol :

    • 和 Registry 集成的 Protocol 实现类 RegistryProtocol ,关注服务的注册。具体的销毁逻辑,见 「2.3 RegistryProtocol」 中。
    • 具体协议对应的 Protocol 实现类,例如 dubbo:// 对应的 DubboProtocol 、hessian:// 对应的 HessianProtocol ,关注服务的暴露引用。因为 DubboProtocol 是最常用的,所以我们以它为例子,在 「2.2 DubboProtocol」 中分享。

2.1 AbstractRegistryFactory

#destroyAll() 方法,销毁所有 Registry 。代码如下:

private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// 获得锁
LOCK.lock();
try {
// 销毁
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 清空缓存
REGISTRIES.clear();
} finally {
// 释放锁
LOCK.unlock();
}
}
  • 调用 Registry#destroy() 方法,销毁每个 Registry 。
  • AbstractRegistry 实现了公用的销毁逻辑:取消注册订阅。代码如下:

    @Override
    public void destroy() {
    // 已销毁,跳过
    if (!destroyed.compareAndSet(false, true)) {
    return;
    }
    if (logger.isInfoEnabled()) {
    logger.info("Destroy registry:" + getUrl());
    }
    // 取消注册
    Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
    if (!destroyRegistered.isEmpty()) {
    for (URL url : new HashSet<URL>(getRegistered())) {
    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
    try {
    unregister(url); // 取消注册
    if (logger.isInfoEnabled()) {
    logger.info("Destroy unregister url " + url);
    }
    } catch (Throwable t) {
    logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
    }
    }
    }
    }
    // 取消订阅
    Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!destroySubscribed.isEmpty()) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
    URL url = entry.getKey();
    for (NotifyListener listener : entry.getValue()) {
    try {
    unsubscribe(url, listener); // 取消订阅
    if (logger.isInfoEnabled()) {
    logger.info("Destroy unsubscribe url " + url);
    }
    } catch (Throwable t) {
    logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
    }
    }
    }
    }
    }
    • 无论是服务提供者还是消费者,都会向 Registry 发起注册订阅,所以需要进行取消。
  • AbstractRegistry 的子类 FailbackRegistry ,实现销毁公用重试任务。代码如下:

    @Override
    public void destroy() {
    // 忽略,若已经销毁
    if (!canDestroy()) {
    return;
    }
    // 调用父方法,取消注册和订阅
    super.destroy();
    // 销毁重试任务
    try {
    retryFuture.cancel(true);
    } catch (Throwable t) {
    logger.warn(t.getMessage(), t);
    }
    }

    protected boolean canDestroy(){
    return destroyed.compareAndSet(false, true);
    }
  • FailbackRegistry 有多种实现类,会有销毁其对应的客户端连接的逻辑。以 ZookeeperRegistry 举例子。代码如下:

    @Override
    public void destroy() {
    // 调用父方法,取消注册和订阅
    super.destroy();
    try {
    // 关闭 Zookeeper 客户端连接
    zkClient.close();
    } catch (Exception e) {
    logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
    }
    }

2.2 DubboProtocol

#destroy() 方法,销毁所有通信 ExchangeClient 和 ExchangeServer 。代码如下:

 1: @SuppressWarnings("Duplicates")
2: @Override
3: public void destroy() {
4: // 销毁所有 ExchangeServer
5: for (String key : new ArrayList<String>(serverMap.keySet())) {
6: ExchangeServer server = serverMap.remove(key);
7: if (server != null) {
8: try {
9: if (logger.isInfoEnabled()) {
10: logger.info("Close dubbo server: " + server.getLocalAddress());
11: }
12: server.close(ConfigUtils.getServerShutdownTimeout());
13: } catch (Throwable t) {
14: logger.warn(t.getMessage(), t);
15: }
16: }
17: }
18:
19: // 销毁所有 ExchangeClient
20: for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
21: ExchangeClient client = referenceClientMap.remove(key);
22: if (client != null) {
23: try {
24: if (logger.isInfoEnabled()) {
25: logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
26: }
27: client.close(ConfigUtils.getServerShutdownTimeout()); // 销毁
28: } catch (Throwable t) {
29: logger.warn(t.getMessage(), t);
30: }
31: }
32: }
33: // 销毁所有幽灵 ExchangeClient
34: for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
35: ExchangeClient client = ghostClientMap.remove(key);
36: if (client != null) {
37: try {
38: if (logger.isInfoEnabled()) {
39: logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
40: }
41: client.close(ConfigUtils.getServerShutdownTimeout()); // 销毁
42: } catch (Throwable t) {
43: logger.warn(t.getMessage(), t);
44: }
45: }
46: }
47: // 【TODO 8033】 参数回调
48: stubServiceMethodsMap.clear();
49: super.destroy();
50: }
  • 实际情况下,一个应用程序即可以是服务提供者,又是服务消费者。因此,需要关闭 ExchangeClient 和 ExchangeServer 。
  • 第 4 至 17 行:循环调用 HeaderExchangeServer#close(timeout) 方法,销毁所有 ExchangeServer 。详细解析,见 「2.2.1 HeaderExchangeServer」
  • 第 19 至 32 行:循环调用 ReferenceCountExchangeClient#close(timeout) 方法,销毁所有 ReferenceCountExchangeClient 。在该方法内部,会调用 HeaderExchangeClient#close(timeout) 方法,关闭 HeaderExchangeClient 对象。详细解析,见 「2.2.2 HeaderExchangeClient」
  • 第 33 至 46 行:循环调用 LazyConnectExchangeClient#close(timeout) 方法,进行关闭。关于 LazyConnectExchangeClient ,详细见 精尽 Dubbo 源码分析 —— 服务引用(二)之远程引用(Dubbo)「5.2 LazyConnectExchangeClient」
  • 第 48 行:【TODO 8033】 参数回调
  • 第 49 行:调用 AbstractExporter#unexport() 方法,取消服务的暴露( Exporter )。代码如下:

     1: @Override
    2: public void destroy() {
    3: // 销毁协议对应的服务消费者的所有 Invoker
    4: for (Invoker<?> invoker : invokers) {
    5: if (invoker != null) {
    6: invokers.remove(invoker);
    7: try {
    8: if (logger.isInfoEnabled()) {
    9: logger.info("Destroy reference: " + invoker.getUrl());
    10: }
    11: invoker.destroy();
    12: } catch (Throwable t) {
    13: logger.warn(t.getMessage(), t);
    14: }
    15: }
    16: }
    17: // 销毁协议对应的服务提供者的所有 Exporter
    18: for (String key : new ArrayList<String>(exporterMap.keySet())) {
    19: Exporter<?> exporter = exporterMap.remove(key);
    20: if (exporter != null) {
    21: try {
    22: if (logger.isInfoEnabled()) {
    23: logger.info("Unexport service: " + exporter.getInvoker().getUrl());
    24: }
    25: exporter.unexport();
    26: } catch (Throwable t) {
    27: logger.warn(t.getMessage(), t);
    28: }
    29: }
    30: }
    31: }
    • 第 3 至 16 行:循环,销毁协议( 此处为 DubboProtocol )对应的服务消费者的所有 Invoker( 此处为 DubboInvoker ) 。详细解析,见 「2.2.3 DubboInvoker」
    • 第 17 至 30 行:循环,销毁协议( 此处为 DubboProtocol )对应的服务提供者的所有 Exporter( 此处为 DubboExporter ) 。详细解析,见 「2.2.4 DubboExporter」

2.2.1 HeaderExchangeServer

#close(timeout) 方法,整体流程如下图:

close

  • 框部分:因为 ProtocolListenerWrapper 和 ProtocolFilterWrapper 和 Protocol 的 Dubbo SPI Wrapper 实现类,所以调用 DubboProtocol#destroy() 方法时,会先调用它们。目前仅仅是一层包装,没有逻辑 😈 ,代码如下:

     // ProtocolListenerWrapper.java
    @Override
    public void destroy() {
    protocol.destroy();
    }

    // ProtocolFilterWrapper.java
    @Override
    public void destroy() {
    protocol.destroy();
    }

2.2.2 HeaderExchangeClient

#close(timeout) 方法,整体流程如下图:

close

2.2.3 DubboInvoker

#destroy() 方法,销毁 ExchangeClient 。代码如下:

 1: @Override
2: public void destroy() {
3: // 忽略,若已经销毁
4: if (super.isDestroyed()) {
5: return;
6: } else {
7: // double check to avoid dup close
8: // 双重锁校验,避免已经关闭
9: destroyLock.lock();
10: try {
11: if (super.isDestroyed()) {
12: return;
13: }
14: // 标记关闭
15: super.destroy();
16: // 移除出 `invokers`
17: if (invokers != null) {
18: invokers.remove(this);
19: }
20: // 关闭 ExchangeClient 们
21: for (ExchangeClient client : clients) {
22: try {
23: client.close(ConfigUtils.getServerShutdownTimeout());
24: } catch (Throwable t) {
25: logger.warn(t.getMessage(), t);
26: }
27: }
28: } finally {
29: // 释放锁
30: destroyLock.unlock();
31: }
32: }
33: }
  • 代码比较易懂,胖友看代码注释。下面只挑选几个方法分享。
  • AbstractInvoker#isDestroyed() 方法,判断是否已经销毁。代码如下:

    /**
    * 是否销毁
    */
    private AtomicBoolean destroyed = new AtomicBoolean(false);

    public boolean isDestroyed() {
    return destroyed.get();
    }
  • AbstractInvoker#destroy() 方法,标记已经销毁。代码如下:

    /**
    * 是否可用
    */
    private volatile boolean available = true;

    @Override
    public void destroy() {
    if (!destroyed.compareAndSet(false, true)) {
    return;
    }
    setAvailable(false);
    }

    protected void setAvailable(boolean available) {
    this.available = available;
    }
    • 并且,会标记 DubboInvoker 已经不可用
    • 标记已经销毁后,再调用 #invoke(Invocation) 方法,会抛出 RpcException 异常。代码如下:

      @Override
      public Result invoke(Invocation inv) throws RpcException {
      if (destroyed.get()) {
      throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
      + " use dubbo version " + Version.getVersion()
      + " is DESTROYED, can not be invoked any more!");
      }

      // ... 省略其他代码
      }
      • x
  • 第 20 至 27 行:循环,调用 ReferenceCountExchangeClient#close(timeout) 方法,关闭客户端。😈 实际上,在 DubboProtocol#destroy() 方法中,已经关闭客户端。虽然看起来重复,实际不然。因为远程服务提供者关闭时, DubboInvoker 需要进行销毁,此时必须关闭客户端的链接。所以,DubboInvoker 必须有这块逻辑。

2.2.4 DubboExporter

#unexport() 方法,取消暴露。代码如下:

/**
* 服务键
*/
private final String key;
/**
* Exporter 集合
*
* key: 服务键
*
* 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;

@Override
public void unexport() {
// 取消暴露
super.unexport();
// 移除自己
exporterMap.remove(key);
}
  • 调用 AbstractExporter#unexport() 方法,取消暴露。代码如下:

    /**
    * Invoker 对象
    */
    private final Invoker<T> invoker;
    /**
    * 是否取消暴露服务
    */
    private volatile boolean unexported = false;

    @Override
    public void unexport() {
    // 标记已经取消暴露
    if (unexported) {
    return;
    }
    unexported = true;
    // 销毁
    getInvoker().destroy();
    }
    • 其中,invoker 如下图所示:invoker

      • 这个 Invoker 通过 JavassistProxyFactory 创建,实际实现了 AbstractProxyInvoker 抽象类。所以 #destroy() 方法如下,代码如下:

         @Override
        public void destroy() {
        }
        • 😈 空的,嘿嘿嘿。

2.3 RegistryProtocol

#destroy() 方法,取消所有 Exporter 的暴露。代码如下:

/**
* 绑定关系集合。
*
* key:服务 Dubbo URL
*/
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();

@Override
public void destroy() {
// 获得 Exporter 数组
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
// 取消所有 Exporter 的暴露
for (Exporter<?> exporter : exporters) {
exporter.unexport();
}
// 清空
bounds.clear();
}
  • 循环,调用 ExporterChangeableWrapper#unexport() 方法,取消服务暴露。代码如下:

    /**
    * 暴露的 Exporter 对象
    */
    private Exporter<T> exporter;

    @Override
    public void unexport() {
    String key = getCacheKey(this.originInvoker);
    // 移除出 `bounds`
    bounds.remove(key);
    // 取消暴露
    exporter.unexport();
    }
    • 因为服务提供者集成了配置规则 Configurator ,所以需要使用到 ExporterChangeableWrapper ,保存原有 Invoker 对象。
      • 也因此,上述所有取消暴露逻辑,是无法销毁 ExporterChangeableWrapper 在 bounds 的映射,需要通过 RegistryProtocol 的 #destroy() 方法实现。
      • 也因此,此处调用暴露的 Exporter 对象 exporter ,已经被 AbstractExporter#unexport() 方法,取消暴露。但是呢,这里又不能去掉这块逻辑,因为没准有地方,需要调用 ExporterChangeableWrapper#unexport() 方法呢。

3. ExecutorUtil

3.1 gracefulShutdown

#gracefulShutdown(executor, timeout) 方法,优雅关闭,禁止新的任务提交,将原有任务执行完。

public static void gracefulShutdown(Executor executor, int timeout) {
// 忽略,若不是 ExecutorService ,或者已经关闭
if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
return;
}
// 关闭,禁止新的任务提交,将原有任务执行完
final ExecutorService es = (ExecutorService) executor;
try {
es.shutdown(); // Disable new tasks from being submitted <1>
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
// 等待原有任务执行完。若等待超时,强制结束所有任务
try {
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
// 发生 InterruptedException 异常,也强制结束所有任务
es.shutdownNow();
Thread.currentThread().interrupt();
}
// 若未关闭成功,新开线程去关闭
if (!isShutdown(es)) {
newThreadToCloseExecutor(es);
}
}

3.2 shutdownNow

#shutdownNow(executor, timeout) 方法,强制关闭,包括打断原有执行中的任务。

public static void shutdownNow(Executor executor, final int timeout) {
// 忽略,若不是 ExecutorService ,或者已经关闭
if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
return;
}
// 立即关闭,包括原有任务也打断
final ExecutorService es = (ExecutorService) executor;
try {
es.shutdownNow(); // <1>
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
// 等待原有任务被打断完成
try {
es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// 若未关闭成功,新开线程去关闭
if (!isShutdown(es)) {
newThreadToCloseExecutor(es);
}
}
  • #gracefulShutdown(executor, timeout) 方法不同的是,<1> 处调用的是 #shutdownNow() 方法,而不是 #shutdown() 方法。

3.3 newThreadToCloseExecutor

#newThreadToCloseExecutor(ExecutorService) 方法,新开线程,不断强制关闭。

private static void newThreadToCloseExecutor(final ExecutorService es) {
if (!isShutdown(es)) {
shutdownExecutor.execute(new Runnable() {
public void run() {
try {
// 循环 1000 次,不断强制结束线程池
for (int i = 0; i < 1000; i++) {
// 立即关闭,包括原有任务也打断
es.shutdownNow();
// 等待原有任务被打断完成
if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
});
}
}

666. 彩蛋

知识星球

理论来说,服务提供者如果要关闭,大体流程如下:

provider => registry :移除自己
provider => consumer :我准备关闭了,不要调用我
所有 consumer => provider :好的,我知道了
provider => consumer :处理完所有原有请求
provider 关闭

但是实际情况非常复杂,如果依赖 consumer 去应答和确认。所以 Dubbo 的选择是:

  • provider 从 registry ,移除自己。并且 sleep 等待一定时间(开发者可配),等待 consumer 获得到通知。当然,这个过程不是绝对能够成功的。例如,consumer 连接不上 registry ,但是连的上 provider 。
  • provider 通知 consumer ,自己准备关闭,不要请求自己。全部通知完成后,等处理完原有请求。完成后,关闭本地服务器及线程池。

当然,consumer 也有优雅关闭,等待所有发起的请求结束。相对简单的多。

推荐阅读文章:

总访客数 && 总访问量