我是一段不羁的公告!
记得给艿艿这 3 个项目加油,添加一个 STAR 噢。
https://github.com/YunaiV/SpringBoot-Labs
https://github.com/YunaiV/onemall
https://github.com/YunaiV/ruoyi-vue-pro

精尽 Dubbo 源码分析 —— 服务暴露(二)之远程暴露(Dubbo)

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》 一文中,我们已经分享了本地暴露服务。在本文中,我们来分享远程暴露服务。在 Dubbo 中提供多种协议( Protocol ) 的实现,大体流程一致,本文以 Dubbo Protocol 为例子,这也是 Dubbo 的默认协议。

如果不熟悉该协议的同学,可以先看看 《Dubbo 使用指南 —— dubbo://》 ,简单了解即可。

特性

缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 remoting 交互。

  • 连接个数:单连接
  • 连接方式:长连接
  • 传输协议:TCP
  • 传输方式:NIO 异步传输
  • 序列化:Hessian 二进制序列化
  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。
  • 适用场景:常规远程服务方法调用

相比本地暴露远程暴露会多做如下几件事情:

  • 启动通信服务器,绑定服务端口,提供远程调用。
  • 向注册中心注册服务提供者,提供服务消费者从注册中心发现服务。

2. 远程暴露

远程暴露服务的顺序图如下:

远程暴露顺序图

#doExportUrlsFor1Protocol(protocolConfig, registryURLs) 方法中,涉及远程暴露服务的代码如下:

 1: // 服务远程暴露
2: // export to remote if the config is not local (export to local only when config is local)
3: if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
4: if (logger.isInfoEnabled()) {
5: logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
6: }
7: if (registryURLs != null && !registryURLs.isEmpty()) {
8: for (URL registryURL : registryURLs) {
9: // "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
10: url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
11: // 获得监控中心 URL
12: URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控
13: if (monitorUrl != null) {
14: url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
15: }
16: if (logger.isInfoEnabled()) {
17: logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
18: }
19: // 使用 ProxyFactory 创建 Invoker 对象
20: Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
21:
22: // 创建 DelegateProviderMetaDataInvoker 对象
23: DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
24:
25: // 使用 Protocol 暴露 Invoker 对象
26: Exporter<?> exporter = protocol.export(wrapperInvoker);
27: // 添加到 `exporters`
28: exporters.add(exporter);
29: }
30: } else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html 。主要用于开发测试环境使用。
31: // 使用 ProxyFactory 创建 Invoker 对象
32: Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
33:
34: // 创建 DelegateProviderMetaDataInvoker 对象
35: DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
36:
37: // 使用 Protocol 暴露 Invoker 对象
38: Exporter<?> exporter = protocol.export(wrapperInvoker);
39: // 添加到 `exporters`
40: exporters.add(exporter);
41: }
42: }
  • 第 30 至 41 行:大体和【第 7 至 29 行】逻辑相同。差别在于,当配置注册中心为 "N/A" 时,表示即使远程暴露服务,也不向注册中心注册。这种方式用于被服务消费者直连服务提供者,参见 《Dubbo 用户指南 —— 直连提供者》 文档。

    开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。

  • 第 8 行:循环祖册中心 URL 数组 registryURLs

  • 第 10 行:"dynamic" 配置项,服务是否动态注册。如果设为 false ,注册后将显示后 disable 状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
  • 第 12 行:调用 #loadMonitor(registryURL) 方法,获得监控中心 URL 。
  • 第 13 至 15 行:调用 URL#addParameterAndEncoded(key, value) 方法,将监控中心的 URL 作为 "monitor" 参数添加到服务提供者的 URL 中,并且需要编码。通过这样的方式,服务提供者的 URL 中,包含了监控中心的配置
  • 第 20 行:调用 URL#addParameterAndEncoded(key, value) 方法,将服务体用这的 URL 作为 "export" 参数添加到注册中心的 URL 中。通过这样的方式,注册中心的 URL 中,包含了服务提供者的配置
  • 第 20 行:调用 ProxyFactory#getInvoker(proxy, type, url) 方法,创建 Invoker 对象。该 Invoker 对象,执行 #invoke(invocation) 方法时,内部会调用 Service 对象( ref )对应的调用方法。
    • 🙂 详细的实现,后面单独写文章分享。
    • 😈 为什么传递的是注册中心的 URL 呢?下文会详细解析。
  • 第 23 行:创建 com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker 对象。该对象在 Invoker 对象的基础上,增加了当前服务提供者 ServiceConfig 对象。
  • 第 26 行:调用 Protocol#export(invoker) 方法,暴露服务。

    • 此处 Dubbo SPI 自适应的特性的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,invoker 传入后,根据 invoker.url 自动获得对应 Protocol 拓展实现为 DubboProtocol 。
    • 实际上,Protocol 有两个 Wrapper 拓展实现类: ProtocolFilterWrapper、ProtocolListenerWrapper 。所以,#export(...) 方法的调用顺序是:

      • Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
      • =>
      • Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
      • 也就是说,这一条大的调用链,包含两条小的调用链。原因是:
        • 首先,传入的是注册中心的 URL ,通过 Protocol$Adaptive 获取到的是 RegistryProtocol 对象。
        • 其次,RegistryProtocol 会在其 #export(...) 方法中,使用服务提供者的 URL ( 即注册中心的 URL 的 export 参数值),再次调用 Protocol$Adaptive 获取到的是 DubboProtocol 对象,进行服务暴露。
      • 为什么是这样的顺序?通过这样的顺序,可以实现类似 AOP 的效果,在本地服务器启动完成后,再向注册中心注册。伪代码如下:

        RegistryProtocol#export(...) {

        // 1. 启动本地服务器
        DubboProtocol#export(...);

        // 2. 向注册中心注册。
        }
        • 这也是为什么上文提到的 “为什么传递的是注册中心的 URL 呢?” 的原因。
    • 🙂 如果无法理解,在 「3. Protocol」 在解析代码,进一步理顺。
  • 第 28 行:添加到 exporters 集合中。

2.1 loadMonitor

友情提示,监控中心不是本文的重点,简单分享下该方法的逻辑。
可直接跳过。

#loadMonitor(registryURL) 方法,加载监控中心 com.alibaba.dubbo.common.URL 数组。代码如下:

 1: /**
2: * 加载监控中心 URL
3: *
4: * @param registryURL 注册中心 URL
5: * @return 监控中心 URL
6: */
7: protected URL loadMonitor(URL registryURL) {
8: // 从 属性配置 中加载配置到 MonitorConfig 对象。
9: if (monitor == null) {
10: String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
11: String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
12: if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) {
13: return null;
14: }
15:
16: monitor = new MonitorConfig();
17: if (monitorAddress != null && monitorAddress.length() > 0) {
18: monitor.setAddress(monitorAddress);
19: }
20: if (monitorProtocol != null && monitorProtocol.length() > 0) {
21: monitor.setProtocol(monitorProtocol);
22: }
23: }
24: appendProperties(monitor);
25: // 添加 `interface` `dubbo` `timestamp` `pid` 到 `map` 集合中
26: Map<String, String> map = new HashMap<String, String>();
27: map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
28: map.put("dubbo", Version.getVersion());
29: map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
30: if (ConfigUtils.getPid() > 0) {
31: map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
32: }
33: // 将 MonitorConfig ,添加到 `map` 集合中。
34: appendParameters(map, monitor);
35: // 获得地址
36: String address = monitor.getAddress();
37: String sysaddress = System.getProperty("dubbo.monitor.address");
38: if (sysaddress != null && sysaddress.length() > 0) {
39: address = sysaddress;
40: }
41: // 直连监控中心服务器地址
42: if (ConfigUtils.isNotEmpty(address)) {
43: // 若不存在 `protocol` 参数,默认 "dubbo" 添加到 `map` 集合中。
44: if (!map.containsKey(Constants.PROTOCOL_KEY)) {
45: if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) {
46: map.put(Constants.PROTOCOL_KEY, "logstat");
47: } else {
48: map.put(Constants.PROTOCOL_KEY, "dubbo");
49: }
50: }
51: // 解析地址,创建 Dubbo URL 对象。
52: return UrlUtils.parseURL(address, map);
53: // 从注册中心发现监控中心地址
54: } else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) {
55: return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map));
56: }
57: return null;
58: }
  • 第 8 至 24 行:从属性配置中,加载配置到 MonitorConfig 对象。
  • 第 25 至 32 行:添加 interface dubbo timestamp pidmap 集合中。
  • 第 34 行:调用 #appendParameters(map, config) 方法,将 MonitorConfig ,添加到 map 集合中。
  • 第 35 至 40 行:获得监控中心的地址。
  • 第 42 行:当 address 非空时,直连监控中心服务器地址的情况
    • 第 43 至 50 行:若不存在 protocol 参数,缺省默认为 “dubbo” ,并添加到 map 集合中。
      • 第 44 至 55 行:可以忽略。因为,logstat 这个拓展实现已经不存在。
    • 第 52 行:调用 UrlUtils#parseURL(address, map) 方法,解析 address ,创建 Dubbo URL 对象。
      • 🙂 已经添加了代码注释,胖友点击链接查看。
  • 第 54 至 56 行:当 protocol = registry 时,并且注册中心 URL 非空时,从注册中心发现监控中心地址。以 registryURL 为基础,创建 URL :
    • protocol = dubbo
    • parameters.protocol = registry
    • parameters.refer = map
  • 第 57 行:无注册中心,返回空。
  • ps :后续会有文章,详细分享。

3. Protocol

本文涉及的 Protocol 类图如下:

Protocol 类图

3.1 ProtocolFilterWrapper

《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「 3.2 ProtocolFilterWrapper」 小节。

#export(invoker) 方法,代码如下:

1: public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2: // 注册中心
3: if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4: return protocol.export(invoker);
5: }
6: // 建立带有 Filter 过滤链的 Invoker ,再暴露服务。
7: return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
8: }
  • 第 2 至 5 行:当 invoker.url.protocl = registry注册中心的 URL ,无需创建 Filter 过滤链。
  • 第 7 行:调用 #buildInvokerChain(invoker, key, group) 方法,创建带有 Filter 过滤链的 Invoker 对象。
  • 第 7 行:调用 protocol#export(invoker) 方法,继续暴露服务。
  • 在 RegistryProtocol 中,会调用 DubboProtocol#export(...) 方法时,会走【第 7 行】的流程。

3.2 RegistryProtocol

com.alibaba.dubbo.registry.integration.RegistryProtocol ,实现 Protocol 接口,注册中心协议实现类。

3.2.1 属性

属性相关,代码如下:

友情提示,仅包含本文涉及的属性。

// ... 省略部分和本文无关的属性。

/**
* 单例。在 Dubbo SPI 中,被初始化,有且仅有一次。
*/
private static RegistryProtocol INSTANCE;

/**
* 绑定关系集合。
*
* key:服务 Dubbo URL
*/
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
// 用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露
// providerurl <--> exporter
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
/**
* Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。
*/
private Protocol protocol;
/**
* RegistryFactory 自适应拓展实现类,通过 Dubbo SPI 自动注入。
*/
private RegistryFactory registryFactory;

public RegistryProtocol() {
INSTANCE = this;
}

public static RegistryProtocol getRegistryProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load
}
return INSTANCE;
}
  • INSTANCE 静态属性,单例。通过 Dubbo SPI 加载创建,有且仅有一次。
    • #getRegistryProtocol() 静态方法,获得单例。
  • bounds 属性,绑定关系集合。其中,Key 为服务提供者 URL
  • protocol 属性,Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。
  • registryFactory 属性,自适应拓展实现类,通过 Dubbo SPI 自动注入。
    • 用于创建注册中心 Registry 对象。

3.2.2 export

本文涉及的 #export(invoker) 方法,代码如下:

 1: public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2: // 暴露服务
3: // export invoker
4: final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
5:
6: // 获得注册中心 URL
7: URL registryUrl = getRegistryUrl(originInvoker);
8:
9: // 获得注册中心对象
10: // registry provider
11: final Registry registry = getRegistry(originInvoker);
12:
13: // 获得服务提供者 URL
14: final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
15:
16: //to judge to delay publish whether or not
17: boolean register = registedProviderUrl.getParameter("register", true);
18:
19: // 向本地注册表,注册服务提供者
20: ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
21:
22: // 向注册中心注册服务提供者(自己)
23: if (register) {
24: register(registryUrl, registedProviderUrl);
25: ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // 标记向本地注册表的注册服务提供者,已经注册
26: }
27:
28: // 使用 OverrideListener 对象,订阅配置规则
29: // Subscribe the override data
30: // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
31: final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
32: final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
33: overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
34: registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
35: //Ensure that a new exporter instance is returned every time export
36: return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
37: }
  • 第 4 行:调用 #doLocalExport(invoker) 方法,暴露服务。
  • 第 7 行:调用 #getRegistryUrl(originInvoker) 方法,获得注册中心 URL 。代码如下:

    /**
    * 获得注册中心 URL
    *
    * @param originInvoker 原始 Invoker
    * @return URL
    */
    private URL getRegistryUrl(Invoker<?> originInvoker) {
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // protocol
    String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
    registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryUrl;
    }
  • 第 11 行:获得注册中心对象。
  • 第 14 行:调用 #getRegistedProviderUrl(originInvoker) 方法,获得服务提供者 URL 。代码如下:

    private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
    // 从注册中心的 export 参数中,获得服务提供者的 URL
    URL providerUrl = getProviderUrl(originInvoker);
    //The address you see at the registry
    return providerUrl.removeParameters(getFilteredKeys(providerUrl)) // 移除 .hide 为前缀的参数
    .removeParameter(Constants.MONITOR_KEY) // monitor
    .removeParameter(Constants.BIND_IP_KEY) // bind.ip
    .removeParameter(Constants.BIND_PORT_KEY) // bind.port
    .removeParameter(QOS_ENABLE) // qos.enable
    .removeParameter(QOS_PORT) // qos.port
    .removeParameter(ACCEPT_FOREIGN_IP); // qos.accept.foreign.ip
    }

    private URL getProviderUrl(final Invoker<?> origininvoker) {
    String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); // export
    if (export == null || export.length() == 0) {
    throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
    }
    return URL.valueOf(export);
    }

    private static String[] getFilteredKeys(URL url) {
    Map<String, String> params = url.getParameters();
    if (params != null && !params.isEmpty()) {
    List<String> filteredKeys = new ArrayList<String>();
    for (Map.Entry<String, String> entry : params.entrySet()) {
    if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
    filteredKeys.add(entry.getKey());
    }
    }
    return filteredKeys.toArray(new String[filteredKeys.size()]);
    } else {
    return new String[]{};
    }
    }
    • 重点,从注册中心的 URL 中获得 export 参数对应的值,即服务提供者的 URL 。
    • 移除多余的参数。因为,这些参数注册到注册中心没有实际的用途。
  • 第 17 行:配置项 register ,服务提供者是否注册到配置中心。
  • 第 20 行:调用 ProviderConsumerRegTable#registerProvider(invoker, registryUrl) 方法,向本地注册表,注册服务提供者。
  • 第 24 行:调用 #register(registryUrl, registedProviderUrl) 方法,向注册中心注册服务提供者(自己)。代码如下:

    public void register(URL registryUrl, URL registedProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
    }
  • 第 25 行:标记向本地注册表的注册服务提供者,已经注册。

  • 第 28 至 34 行:使用 OverrideListener 对象,订阅配置规则。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 中。

  • 第 36 行:创建 DestroyableExporter 对象。

3.2.3 doLocalExport

#doLocalExport() 方法,暴露服务。此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。代码如下:

 1: /**
2: * 暴露服务。
3: *
4: * 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。
5: *
6: * @param originInvoker 原始 Invoker
7: * @param <T> 泛型
8: * @return Exporter 对象
9: */
10: @SuppressWarnings("unchecked")
11: private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
12: // 获得在 `bounds` 中的缓存 Key
13: String key = getCacheKey(originInvoker);
14: // 从 `bounds` 获得,是不是已经暴露过服务
15: ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
16: if (exporter == null) {
17: synchronized (bounds) {
18: exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
19: // 未暴露过,进行暴露服务
20: if (exporter == null) {
21: // 创建 Invoker Delegate 对象
22: final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
23: // 暴露服务,创建 Exporter 对象
24: // 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象
25: exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
26: // 添加到 `bounds`
27: bounds.put(key, exporter);
28: }
29: }
30: }
31: return exporter;
32: }
  • 第 13 行:调用 #getCacheKey(originInvoker) 方法,获得在 bounds 中的缓存 Key 。代码如下:

    /**
    * Get the key cached in bounds by invoker
    *
    * 获 取invoker 在 bounds中 缓存的key
    *
    * @param originInvoker 原始 Invoker
    * @return url 字符串
    */
    private String getCacheKey(final Invoker<?> originInvoker) {
    URL providerUrl = getProviderUrl(originInvoker);
    return providerUrl.removeParameters("dynamic", "enabled").toFullString();
    }
  • 第 14 至 18 行:从 bounds 中,获得已经暴露过的 ExporterChangeableWrapper 对象。

  • 第 20 至 28 行:未暴露过,进行暴露服务。
    • 第 22 行:调用 #getProviderUrl(originInvoker) 方法,获得服务提供者的 URL 。
    • 第 22 行:创建 com.alibaba.dubbo.registry.integration.RegistryProtocol.InvokerDelegete 对象。
      • InvokerDelegete 实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 类,主要增加了 #getInvoker() 方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。因为,可能会存在 InvokerDelegete.invoker 也是 InvokerDelegete 类型的情况。
    • 第 25 行:调用 DubboProtocol#export(invoker) 方法,暴露服务,返回 Exporter 对象。
      • 「4.3 DubboProtocol」 中,详细分享。
      • 🙂 若此若是其他协议,若调用对应协议的 XXXProtocol#export(invoker) 方法。
    • 第 25 行:使用【创建的 Exporter 对象】+【originInvoker】,创建 ExporterChangeableWrapper 对象。这样,originInvoker 就和 Exporter 对象,形成了绑定的关系。
    • 第 27 行:添加到 bounds

3.3 DubboProtocol

com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol ,实现 AbstractProtocol 抽象类,Dubbo 协议实现类。

3.3.1 属性

属性相关,代码如下:

友情提示,仅包含本文涉及的属性。

// ... 省略部分和本文无关的属性。

/**
* 通信服务器集合
*
* key: 服务器地址。格式为:host:port
*/
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
  • serverMap 属性,通信服务器集合。其中,Key 为服务器地址,格式为 host:port

3.3.2 export

本文涉及的 #export(invoker) 方法,代码如下:

 1: public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2: URL url = invoker.getUrl();
3:
4: // 创建 DubboExporter 对象,并添加到 `exporterMap` 。
5: // export service.
6: String key = serviceKey(url);
7: DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
8: exporterMap.put(key, exporter);
9:
10: // TODO 【8033 参数回调】
11: //export an stub service for dispatching event
12: Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
13: Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
14: if (isStubSupportEvent && !isCallbackservice) {
15: String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
16: if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
17: if (logger.isWarnEnabled()) {
18: logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
19: "], has set stubproxy support event ,but no stub methods founded."));
20: }
21: } else {
22: stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
23: }
24: }
25:
26: // 启动服务器
27: openServer(url);
28:
29: // 初始化序列化优化器
30: optimizeSerialization(url);
31: return exporter;
32: }

3.3.3 openServer

友情提示:本小节的内容,胖友先看过 《精尽 Dubbo 源码分析 —— NIO 服务器》 所有的文章。

#openServer(url) 方法,启动通信服务器。代码如下:

/**
* 通信客户端集合
*
* key: 服务器地址。格式为:host:port
*/
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>

1: /**
2: * 启动服务器
3: *
4: * @param url URL
5: */
6: private void openServer(URL url) {
7: // find server.
8: String key = url.getAddress();
9: //client can export a service which's only for server to invoke
10: boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
11: if (isServer) {
12: ExchangeServer server = serverMap.get(key);
13: if (server == null) {
14: serverMap.put(key, createServer(url));
15: } else {
16: // server supports reset, use together with override
17: server.reset(url);
18: }
19: }
20: }
  • 第 8 行:获得服务器地址。
  • 第 10 行:配置项 isserver ,可以暴露一个仅当前 JVM 可调用的服务。目前该配置项已经不存在。
  • 第 12 行:从 serverMap 获得对应服务器地址已存在的通信服务器。即,不重复创建
  • 第 13 至 14 行:通信服务器不存在,调用 #createServer(url) 方法,创建服务器。
  • 第 15 至 18 行:通信服务器已存在,调用 Server#reset(url) 方法,重置服务器的属性。
    • 为什么会存在呢?因为键是 host:port ,那么例如,多个 Service 共用同一个 Protocol ,服务器是同一个对象。

3.3.4 createServer

#createServer(url) 方法,创建并启动通信服务器。代码如下:

 1: private ExchangeServer createServer(URL url) {
2: // 默认开启 server 关闭时发送 READ_ONLY 事件
3: // send readonly event when server closes, it's enabled by default
4: url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
5: // 默认开启 heartbeat
6: // enable heartbeat by default
7: url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
8:
9: // 校验 Server 的 Dubbo SPI 拓展是否存在
10: String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
11: if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
12: throw new RpcException("Unsupported server type: " + str + ", url: " + url);
13: }
14:
15: // 设置编解码器为 `"Dubbo"`
16: url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
17:
18: // 启动服务器
19: ExchangeServer server;
20: try {
21: server = Exchangers.bind(url, requestHandler);
22: } catch (RemotingException e) {
23: throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
24: }
25:
26: // 校验 Client 的 Dubbo SPI 拓展是否存在
27: str = url.getParameter(Constants.CLIENT_KEY);
28: if (str != null && str.length() > 0) {
29: Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
30: if (!supportedTypes.contains(str)) {
31: throw new RpcException("Unsupported client type: " + str);
32: }
33: }
34: return server;
35: }
  • 第 4 行:默认开启 Server 关闭时,发送 READ_ONLY 事件。
  • 第 7 行:默认开启心跳功能。
  • 第 9 至 13 行:校验配置的 Server 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。
  • 第 16 行:设置编解码器为 "Dubbo" 协议,即 DubboCountCodec 。
  • 第 18 至 24 行:调用 Exchangers#bind(url, handler) 方法,启动服务器。具体 requestHanlder 属性值的实现,我们放在 Dubbo 协议下的 RPC 的文章里分享。requestHanlder 属性的简化代码如下:

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    // .... 省略具体实现代码
    return invoker.invoke(inv);
    }
    }
  • 第 26 至 33 行:校验配置的 Client 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。默认情况下,未配置,所以不会校验。

  • 第 34 行:返回通信服务器。

4. Exporter

本文涉及的 Exporter 类图如下:

Exporter 类图

4.1 ExporterChangeableWrapper

com.alibaba.dubbo.registry.integration.RegistryProtocol.ExporterChangeableWrapper ,实现 Exporter 接口,Exporter 可变的包装器。

  • 建立【返回的 Exporter】与【Protocol export 出的 Exporter】的对应关系。
  • 在 override 时可以进行关系修改。

代码如下:

private class ExporterChangeableWrapper<T> implements Exporter<T> {

/**
* 原 Invoker 对象
*/
private final Invoker<T> originInvoker;
/**
* 暴露的 Exporter 对象
*/
private Exporter<T> exporter;

public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}

public Invoker<T> getOriginInvoker() {
return originInvoker;
}

public Invoker<T> getInvoker() {
return exporter.getInvoker();
}

public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}

public void unexport() {
String key = getCacheKey(this.originInvoker);
// 移除出 `bounds`
bounds.remove(key);
// 取消暴露
exporter.unexport();
}
}

4.2 DestroyableExporter

com.alibaba.dubbo.registry.integration.RegistryProtocol.DestroyableExporter ,实现 Exporter 接口,可销毁的 Exporter 实现类。

private class ExporterChangeableWrapper<T> implements Exporter<T> {

/**
* 原 Invoker 对象
*/
private final Invoker<T> originInvoker;
/**
* 暴露的 Exporter 对象
*/
private Exporter<T> exporter;

public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}

public Invoker<T> getOriginInvoker() {
return originInvoker;
}

@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}

// 可以重新设置 Exporter 对象
public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}

@Override
public void unexport() {
String key = getCacheKey(this.originInvoker);
// 移除出 `bounds`
bounds.remove(key);
// 取消暴露
exporter.unexport();
}

}

4.3 DubboExporter

com.alibaba.dubbo.rpc.protocol.dubbo.DubboExporter ,实现 AbstractExporter 抽象类,Dubbo Exporter 实现类。代码如下:

public class DubboExporter<T> extends AbstractExporter<T> {

/**
* 服务键
*/
private final String key;
/**
* Exporter 集合
*
* key: 服务键
*
* 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;

public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}

@Override
public void unexport() {
// 取消暴露
super.unexport();
// 移除
exporterMap.remove(key);
}

}
  • key 属性,服务键。
  • #exporterMap 属性,Exporter 集合。在上文 DubboProtocol#export(invoker) 方法中,我们可以看到,该属性就是 AbstractProtocol.exporterMap 属性。
    • 构造方法发起暴露,将自己添加到 exporterMap 中。
    • #unexport() 方法,取消暴露,将自己移除出 exporterMap 中。

666. 彩蛋

知识星球

写的有点神情恍惚。

一度以为自己在老家。

夜,02:02 ,睡觉….

总访客数 && 总访问量